From cbe451be8db8a35dce815148d04dc47d49ef4742 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 15 Oct 2013 07:30:03 +0000
Subject: [PATCH] Fixing UpdateOperationTest.csnGeneratorAdjust() test failure in Continuous Integration.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                   |   80 +++++++++++--------
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java |  133 +++++++++++++++++---------------
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java                |    6 +
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                   |    9 ++
 4 files changed, 131 insertions(+), 97 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
index 1ae8b2c..4e46086 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -293,4 +293,10 @@
     return sslEncryption;
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + " " + (sslEncryption ? "with SSL" : "");
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 8bd15cb..e4bfe14 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -82,7 +82,7 @@
    */
   public final static String NO_CONNECTED_SERVER = "Not connected";
   private volatile String replicationServer = NO_CONNECTED_SERVER;
-  private volatile Session session = null;
+  private volatile Session session;
   private final ServerState state;
   private final DN baseDN;
   private final int serverId;
@@ -1284,7 +1284,8 @@
       // Send our Start Session
       StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
       startECLSessionMsg.setOperationId("-1");
-      session.publish(startECLSessionMsg);
+      final Session localSession = session;
+      localSession.publish(startECLSessionMsg);
 
       // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
       if (debugEnabled())
@@ -1294,7 +1295,7 @@
       }
 
       // Alright set the timeout to the desired value
-      session.setSoTimeout(timeout);
+      localSession.setSoTimeout(timeout);
       connected = true;
     } catch (Exception e)
     {
@@ -1319,8 +1320,6 @@
   private TopologyMsg performPhaseTwoHandshake(String server,
     ServerStatus initStatus)
   {
-    TopologyMsg topologyMsg;
-
     try
     {
       /*
@@ -1347,12 +1346,13 @@
         startSessionMsg =
           new StartSessionMsg(initStatus, new ArrayList<String>());
       }
-      session.publish(startSessionMsg);
+      final Session localSession = session;
+      localSession.publish(startSessionMsg);
 
       /*
        * Read the TopologyMsg that should come back.
        */
-      topologyMsg = (TopologyMsg) session.receive();
+      TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
 
       if (debugEnabled())
       {
@@ -1361,8 +1361,8 @@
       }
 
       // Alright set the timeout to the desired value
-      session.setSoTimeout(timeout);
-
+      localSession.setSoTimeout(timeout);
+      return topologyMsg;
     } catch (Exception e)
     {
       Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
@@ -1372,9 +1372,8 @@
       setSession(null);
 
       // Be sure to return null.
-      topologyMsg = null;
+      return null;
     }
-    return topologyMsg;
   }
 
   /**
@@ -1423,6 +1422,7 @@
      *   local DS
      * - replication server in the same VM as local DS one
      */
+    // TODO JNR log why an RS was evicted as best server
     Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
     /*
     The list of best replication servers is filtered with each criteria. At
@@ -2225,10 +2225,10 @@
             Check the session. If it has changed, some disconnection or
             reconnection happened and we need to restart from scratch.
             */
-
-            if (session != null && session == currentSession)
+            final Session localSession = session;
+            if (localSession != null && session == currentSession)
             {
-              session.publish(msg);
+              localSession.publish(msg);
               done = true;
             }
           }
@@ -2243,8 +2243,10 @@
             window update message was lost somehow...
             then loop to check again if connection was closed.
             */
-            if (session != null) {
-              session.publish(new WindowProbeMsg());
+            Session localSession = session;
+            if (localSession != null)
+            {
+              localSession.publish(new WindowProbeMsg());
             }
           }
         }
@@ -2330,8 +2332,8 @@
 
       // Save session information for later in case we need it for log messages
       // after the session has been closed and/or failed.
-      final Session savedSession = session;
-      if (savedSession == null)
+      final Session localSession = session;
+      if (localSession == null)
       {
         // Must be shutting down.
         break;
@@ -2340,7 +2342,7 @@
       final int previousRsServerID = rsServerId;
       try
       {
-        ReplicationMsg msg = savedSession.receive();
+        ReplicationMsg msg = localSession.receive();
         if (msg instanceof UpdateMsg)
         {
           synchronized (this)
@@ -2372,12 +2374,12 @@
         {
           // RS performs a proper disconnection
           Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
-              previousRsServerID, savedSession.getReadableRemoteAddress(),
+              previousRsServerID, localSession.getReadableRemoteAddress(),
               serverId, baseDN.toNormalizedString());
           logError(message);
 
           // Try to find a suitable RS
-          reStart(savedSession, true);
+          reStart(localSession, true);
         }
         else if (msg instanceof MonitorMsg)
         {
@@ -2436,14 +2438,15 @@
                 {
                   message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
                       serverId, previousRsServerID,
-                      savedSession.getReadableRemoteAddress(),
+                      localSession.getReadableRemoteAddress(),
                       baseDN.toNormalizedString());
                 }
                 else
                 {
+                  // TODO JNR log why an RS was evicted as best server
                   message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                       serverId, previousRsServerID,
-                      savedSession.getReadableRemoteAddress(),
+                      localSession.getReadableRemoteAddress(),
                       bestServerInfo.getServerId(),
                       baseDN.toNormalizedString());
                 }
@@ -2480,13 +2483,13 @@
             // We did not initiate the close on our side, log an error message.
             Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
                 serverId, baseDN.toNormalizedString(), previousRsServerID,
-                savedSession.getReadableRemoteAddress());
+                localSession.getReadableRemoteAddress());
             logError(message);
           }
 
           if (reconnectOnFailure)
           {
-            reStart(savedSession, true);
+            reStart(localSession, true);
           }
           else
           {
@@ -2546,9 +2549,10 @@
     try
     {
       updateDoneCount++;
-      if ((updateDoneCount >= halfRcvWindow) && (session != null))
+      final Session localSession = session;
+      if (updateDoneCount >= halfRcvWindow && localSession != null)
       {
-        session.publish(new WindowMsg(updateDoneCount));
+        localSession.publish(new WindowMsg(updateDoneCount));
         rcvWindow += updateDoneCount;
         updateDoneCount = 0;
       }
@@ -2598,9 +2602,10 @@
   public void setSoTimeout(int timeout) throws SocketException
   {
     this.timeout = timeout;
-    if (session != null)
+    final Session localSession = session;
+    if (localSession != null)
     {
-      session.setSoTimeout(timeout);
+      localSession.setSoTimeout(timeout);
     }
   }
 
@@ -2905,14 +2910,14 @@
     // Start a CSN heartbeat thread.
     if (changeTimeHeartbeatSendInterval > 0)
     {
-      String threadName = "Replica DS(" + getServerId()
+      final Session localSession = session;
+      final String threadName = "Replica DS(" + getServerId()
           + ") change time heartbeat publisher for domain \""
-          + this.baseDN + "\" to RS(" + getRsServerId()
-          + ") at " + session.getReadableRemoteAddress();
+          + baseDN + "\" to RS(" + getRsServerId()
+          + ") at " + localSession.getReadableRemoteAddress();
 
       ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
-          threadName, session, changeTimeHeartbeatSendInterval,
-          serverId);
+          threadName, localSession, changeTimeHeartbeatSendInterval, serverId);
       ctHeartbeatPublisherThread.start();
     } else
     {
@@ -3030,4 +3035,11 @@
       DirectoryServer.deregisterMonitorProvider(monitor);
     }
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + " " + baseDN + " " + serverId;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 5db7c3b..4cb07d3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -69,7 +69,7 @@
  *   should read the list of replication servers from the configuration,
  *   instantiate a {@link ServerState} then start the publish service
  *   by calling
- *   {@link #startPublishService(Collection, int, long, long)}.
+ *   {@link #startPublishService(Set, int, long, long)}.
  *   At this point it can start calling the {@link #publish(UpdateMsg)}
  *   method if needed.
  * <p>
@@ -3675,4 +3675,11 @@
   {
     return state.getCSN(serverID);
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID;
+  }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
index e65deab..64e72ed 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -27,6 +27,7 @@
  */
 package org.opends.server.replication;
 
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -375,14 +376,12 @@
 
       // Check that the modify has been replayed.
       found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
-          "description", "Description was changed",
-          10000, true);
+          "description", "Description was changed", 10000, true);
       assertTrue(found, "The second modification was not replayed.");
 
       // Delete the entries to clean the database.
-      DeleteMsg delMsg = new DeleteMsg(personWithUUIDEntry.getDN(), gen.newCSN(), user1entryUUID);
-      broker.publish(delMsg);
-
+      broker.publish(
+          new DeleteMsg(personWithUUIDEntry.getDN(), gen.newCSN(), user1entryUUID));
       assertNull(getEntry(personWithUUIDEntry.getDN(), 10000, false),
           "The DELETE replication message was not replayed");
     }
@@ -552,16 +551,12 @@
      * Open a session to the replicationServer using the ReplicationServer broker API.
      * This must use a serverId different from the LDAP server ID
      */
+    final int serverId = 2;
     ReplicationBroker broker =
-      openReplicationSession(baseDN, 2, 100, replServerPort, 1000, true);
-
+        openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
     try
     {
-      /*
-       * Create a CSN generator to generate new CSNs when we need to send
-       * operations messages to the replicationServer.
-       */
-      CSNGenerator gen = new CSNGenerator(2, 0);
+      CSNGenerator gen = new CSNGenerator(serverId, 0);
 
     /*
      * Test that the conflict resolution code is able to find entries
@@ -656,18 +651,17 @@
      * the same UUID has the entry that has been used in the tests above.
      * Finally check that the delete operation has been applied.
      */
-    // send a delete operation with a wrong dn but the unique ID of the entry
-    // used above
-      DN delDN = DN.decode("cn=anotherdn," + baseDN);
-    DeleteMsg delMsg = new DeleteMsg(delDN, gen.newCSN(), user1entryUUID);
-    updateMonitorCount(baseDN, resolvedMonitorAttr);
+      // send a delete operation with a wrong dn but the unique ID of the entry
+      // used above
+      updateMonitorCount(baseDN, resolvedMonitorAttr);
       alertCount = DummyAlertHandler.getAlertCount();
-    broker.publish(delMsg);
+      DN delDN = DN.decode("cn=anotherdn," + baseDN);
+      broker.publish(new DeleteMsg(delDN, gen.newCSN(), user1entryUUID));
 
-    // check that the delete operation has been applied
-    assertNull(getEntry(personWithUUIDEntry.getDN(), 10000, false),
-        "The DELETE replication message was not replayed");
-    assertEquals(getMonitorDelta(), 1);
+      // check that the delete operation has been applied
+      assertNull(getEntry(personWithUUIDEntry.getDN(), 10000, false),
+          "The DELETE replication message was not replayed");
+      assertEquals(getMonitorDelta(), 1);
       assertConflictAutomaticallyResolved(alertCount);
 
     /*
@@ -739,15 +733,15 @@
      * To achieve this send a delete operation with a correct DN
      * but a wrong unique ID.
      */
-    delMsg = new DeleteMsg(newPersonDN, gen.newCSN(), "11111111-9abc-def0-1234-1234567890ab");
-    updateMonitorCount(baseDN, resolvedMonitorAttr);
+      updateMonitorCount(baseDN, resolvedMonitorAttr);
       alertCount = DummyAlertHandler.getAlertCount();
-    broker.publish(delMsg);
+      broker.publish(
+          new DeleteMsg(newPersonDN, gen.newCSN(), "11111111-9abc-def0-1234-1234567890ab"));
 
-    // check that the delete operation has not been applied
+      // check that the delete operation has not been applied
       assertNotNull(getEntry(newPersonDN, 10000, true),
           "The DELETE replication message was replayed when it should not");
-    assertEquals(getMonitorDelta(), 1);
+      assertEquals(getMonitorDelta(), 1);
       assertConflictAutomaticallyResolved(alertCount);
 
 
@@ -823,20 +817,14 @@
       assertNewAlertsGenerated(alertCount, 1);
 
 
-    // delete the entries to clean the database
-    DN delDN2 = DN.decode("entryUUID = " + user1entrysecondUUID + "+"
-        + user1dn.getRDN() + "," + baseDN);
-    delMsg = new DeleteMsg(delDN2, gen.newCSN(), user1entrysecondUUID);
-    broker.publish(delMsg);
-
-    // check that the delete operation has been applied
+      // delete the entries to clean the database
+      DN delDN2 = DN.decode(
+          "entryUUID = " + user1entrysecondUUID + "+" + user1dn.getRDN() + "," + baseDN);
+      broker.publish(new DeleteMsg(delDN2, gen.newCSN(), user1entrysecondUUID));
       assertNull(getEntry(delDN2, 10000, false),
-        "The DELETE replication message was not replayed");
+          "The DELETE replication message was not replayed");
 
-    delMsg = new DeleteMsg(reallyNewDN, gen.newCSN(), user1entryUUID);
-    broker.publish(delMsg);
-
-      // check that the delete operation has been applied
+      broker.publish(new DeleteMsg(reallyNewDN, gen.newCSN(), user1entryUUID));
       assertNull(getEntry(reallyNewDN, 10000, false),
           "The DELETE replication message was not replayed");
 
@@ -859,7 +847,7 @@
       DN baseDN1 = DN.decode("ou=baseDn1," + baseDN);
       DN baseDN2 = DN.decode("ou=baseDn2," + baseDN);
 
-    // - create parent entry 1 with baseDn1
+      // - create parent entry 1 with baseDn1
       connection.processAdd(TestCaseUtils.entryFromLdifString(
           "dn: " + baseDN1 + "\n"
               + "objectClass: top\n"
@@ -926,12 +914,11 @@
     DN conflictDomain3dn = DN.decode(
         "entryUUID = " + domain3uid + "+dc=domain3," + baseDN);
 
-    updateMonitorCount(baseDN, unresolvedMonitorAttr);
+      updateMonitorCount(baseDN, unresolvedMonitorAttr);
       alertCount = DummyAlertHandler.getAlertCount();
 
-    // delete domain1
-    delMsg = new DeleteMsg(domain1dn, olderCSN, domain1uid);
-    broker.publish(delMsg);
+      // delete domain1
+      broker.publish(new DeleteMsg(domain1dn, olderCSN, domain1uid));
 
     // check that the domain1 has correctly been deleted
     assertNull(getEntry(domain1dn, 10000, false),
@@ -971,12 +958,11 @@
     gen.adjust(addCSN);
     domain3uid = getEntryUUID(domain3dn);
 
-    updateMonitorCount(baseDN, unresolvedMonitorAttr);
+      updateMonitorCount(baseDN, unresolvedMonitorAttr);
       alertCount = DummyAlertHandler.getAlertCount();
 
-    // delete domain1
-    delMsg = new DeleteMsg(domain1dn, gen.newCSN(), domain1uid);
-    broker.publish(delMsg);
+      // delete domain1
+      broker.publish(new DeleteMsg(domain1dn, gen.newCSN(), domain1uid));
 
     // check that the domain1 has correctly been deleted
     assertNull(getEntry(domain1dn, 10000, false),
@@ -1131,11 +1117,11 @@
     // Cleanup from previous run
     cleanupTest();
 
+    final int serverId = 27;
     ReplicationBroker broker =
-      openReplicationSession(baseDN,  27, 100, replServerPort, 2000, true);
-
+        openReplicationSession(baseDN, serverId, 100, replServerPort, 2000, true);
     try {
-      CSNGenerator gen = new CSNGenerator( 27, 0);
+      CSNGenerator gen = new CSNGenerator(serverId, 0);
 
       /*
        * Test that operations done on this server are sent to the
@@ -1303,12 +1289,12 @@
     logError(Message.raw(Category.SYNC, Severity.INFORMATION,
         "Starting replication test : infiniteReplayLoop"));
 
-    Thread.sleep(2000);
+    int serverId = 11;
     ReplicationBroker broker =
-      openReplicationSession(baseDN,  11, 100, replServerPort, 1000, true);
+        openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
     try
     {
-      CSNGenerator gen = new CSNGenerator( 11, 0);
+      CSNGenerator gen = new CSNGenerator(serverId, 0);
 
       // Create a test entry.
       Entry tmp = TestCaseUtils.entryFromLdifString(
@@ -1400,7 +1386,6 @@
   public void csnGeneratorAdjust() throws Exception
   {
     testSetUp("csnGeneratorAdjust");
-    int serverId = 88;
     logError(Message.raw(Category.SYNC, Severity.INFORMATION,
         "Starting synchronization test : CSNGeneratorAdjust"));
 
@@ -1408,16 +1393,13 @@
      * Open a session to the replicationServer using the broker API.
      * This must use a different serverId to that of the directory server.
      */
+    final int serverId = 88;
     ReplicationBroker broker =
-      openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
-
+        openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
+    consumeAllMessages(broker); // clean leftover messages from lostHeartbeatFailover()
     try
     {
-      /*
-       * Create a CSN generator to generate new CSNs
-       * when we need to send operation messages to the replicationServer.
-       */
-      long inTheFuture = System.currentTimeMillis() + (3600 * 1000);
+      final long inTheFuture = System.currentTimeMillis() + (3600 * 1000);
       CSNGenerator gen = new CSNGenerator(serverId, inTheFuture);
 
       // Create and publish an update message to add an entry.
@@ -1453,4 +1435,31 @@
       broker.stop();
     }
   }
+
+  /**
+   * Consumes all the messages sent to this broker. This is useful at the start
+   * of a test to avoid leftover messages from previous test runs.
+   */
+  private void consumeAllMessages(ReplicationBroker broker)
+  {
+    final List<ReplicationMsg> msgs = new ArrayList<ReplicationMsg>();
+    try
+    {
+      while (true)
+      {
+        msgs.add(broker.receive());
+      }
+    }
+    catch (SocketTimeoutException expectedAtSomeStage)
+    {
+      // this is expected to happen when there will not be any more messages to
+      // consume from the socket
+    }
+
+    if (!msgs.isEmpty())
+    {
+      logError(Message.raw(Category.SYNC, Severity.SEVERE_ERROR,
+          "Leftover messages from previous test runs " + msgs));
+    }
+  }
 }

--
Gitblit v1.10.0