From 3a9e211d36ee94ff99941943b3b51e0f768624f5 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 06 Nov 2009 09:11:40 +0000
Subject: [PATCH] In order to support a more clever algorithm for the DS to choose his RS,  we introduce:

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java |  203 +++++++++++++++++++++++++++-----------------------
 1 files changed, 109 insertions(+), 94 deletions(-)

diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 9e1aae1..161679c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -47,7 +47,6 @@
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
 import org.opends.messages.Severity;
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
@@ -65,9 +64,10 @@
 import org.opends.server.replication.plugin.LDAPReplicationDomain;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.plugin.PersistentServerState;
-import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.schema.DirectoryStringSyntax;
@@ -260,34 +260,6 @@
       broker.setSoTimeout(timeout);
     checkConnection(30, broker, port); // give some time to the broker to connect
                                        // to the replicationServer.
-    if (emptyOldChanges)
-    {
-      /*
-       * loop receiving update until there is nothing left
-       * to make sure that message from previous tests have been consumed.
-       */
-      try
-      {
-        while (true)
-        {
-          ReplicationMsg rMsg = broker.receive();
-          if (rMsg instanceof ErrorMsg)
-          {
-            ErrorMsg eMsg = (ErrorMsg)rMsg;
-            logError(new MessageBuilder(
-                "ReplicationTestCase/openReplicationSession ").append(
-                " received ErrorMessage when emptying old changes ").append(
-                eMsg.getDetails()).toMessage());
-          }
-        }
-      }
-      catch (Exception e)
-      {
-        logError(new MessageBuilder(
-            "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
-            .append(" when emptying old changes").toMessage());
-      }
-    }
     return broker;
   }
 
@@ -313,32 +285,6 @@
       broker.setSoTimeout(timeout);
     checkConnection(30, broker, port); // give some time to the broker to connect
                                        // to the replicationServer.
-    if (emptyOldChanges)
-    {
-      // loop receiving update until there is nothing left
-      // to make sure that message from previous tests have been consumed.
-      try
-      {
-        while (true)
-        {
-          ReplicationMsg rMsg = broker.receive();
-          if (rMsg instanceof ErrorMsg)
-          {
-            ErrorMsg eMsg = (ErrorMsg)rMsg;
-            logError(new MessageBuilder(
-                "ReplicationTestCase/openReplicationSession ").append(
-                " received ErrorMessage when emptying old changes ").append(
-                eMsg.getDetails()).toMessage());
-          }
-        }
-      }
-      catch (Exception e)
-      {
-        logError(new MessageBuilder(
-            "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
-            .append(" when emptying old changes").toMessage());
-      }
-    }
     return broker;
   }
   */
@@ -435,17 +381,6 @@
       boolean emptyOldChanges)
       throws Exception, SocketException
   {
-    return openReplicationSession(baseDn, serverId, window_size,
-        port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges,
-        getGenerationId(baseDn));
-  }
-
-  protected ReplicationBroker openReplicationSession(
-      final DN baseDn, int serverId, int window_size,
-        int port, int timeout, int maxSendQueue, int maxRcvQueue,
-        boolean emptyOldChanges, long generationId)
-            throws Exception, SocketException
-  {
     ServerState state = new ServerState();
 
     if (emptyOldChanges)
@@ -453,37 +388,13 @@
 
     ReplicationBroker broker = new ReplicationBroker(null,
         state, baseDn.toNormalizedString(), serverId, window_size,
-        generationId, 0, getReplSessionSecurity(), (byte)1, 500);
+        getGenerationId(baseDn), 0, getReplSessionSecurity(), (byte)1, 500);
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:" + port);
     broker.start(servers);
     checkConnection(30, broker, port);
     if (timeout != 0)
       broker.setSoTimeout(timeout);
-    if (emptyOldChanges)
-    {
-      /*
-       * loop receiving update until there is nothing left
-       * to make sure that message from previous tests have been consumed.
-       */
-      try
-      {
-        while (true)
-        {
-          ReplicationMsg rMsg = broker.receive();
-          if (rMsg instanceof ErrorMsg)
-          {
-            ErrorMsg eMsg = (ErrorMsg)rMsg;
-            logError(new MessageBuilder(
-                "ReplicationTestCase/openReplicationSession ").append(
-                " received ErrorMessage when emptying old changes ").append(
-                eMsg.getDetails()).toMessage());
-          }
-        }
-      }
-      catch (Exception e)
-      { }
-    }
     return broker;
   }
 
@@ -575,11 +486,14 @@
     logError(Message.raw(Category.SYNC, Severity.NOTICE,
       " ##### Calling ReplicationTestCase.classCleanUp ##### "));
 
+    // Clean RS databases
+    cleanUpReplicationServersDB();
+
     cleanConfigEntries();
-    configEntryList = null;
+    configEntryList = new LinkedList<DN>();
 
     cleanRealEntries();
-    entryList = null;
+    entryList = new LinkedList<DN>();
 
     // Clear the test backend (TestCaseUtils.TEST_ROOT_DN_STRING)
     // (in case our test created some emtries in it)
@@ -631,6 +545,10 @@
     assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-server)",
       "Found unexpected replication server config left");
 
+    // Be sure that no replication server instance is left
+    List<ReplicationServer> allRSInstances = ReplicationServer.getAllInstances();
+    assertTrue(allRSInstances.size() == 0, "Some replication servers left: " + allRSInstances);
+
     // Check for config entries for replication domain
     assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-domain)",
       "Found unexpected replication domain config left");
@@ -648,6 +566,17 @@
   }
 
   /**
+   * Cleanup databases of the currently instantiated replication servers in the
+   * VM
+   */
+  protected void cleanUpReplicationServersDB() {
+
+    for (ReplicationServer rs : ReplicationServer.getAllInstances()) {
+      rs.clearDb();
+    }
+  }
+
+  /**
    * Performs a search on the config backend with the specified filter.
    * Fails if a config entry is found.
    * @param filter The filter to apply for the search
@@ -1266,4 +1195,90 @@
       // done
     }
   }
+
+  /**
+   * Wait for the arrival of a specific message type on the provided session
+   * before going in timeout and failing.
+   * @param session Session from which we should receive the message.
+   * @param msgType Class of the message we are waiting for.
+   * @return The expected message if it comes in time or fails (assertion).
+   */
+  protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) {
+
+    ReplicationMsg replMsg = null;
+
+    int timeOut = 5000; // 5 seconds max to wait for the desired message
+    long startTime = System.currentTimeMillis();
+    long curTime = startTime;
+    int nMsg = 0;
+    while ((curTime - startTime) <= timeOut)
+    {
+      try
+      {
+        replMsg = session.receive();
+      } catch (Exception ex)
+      {
+        fail("Exception waiting for " + msgType + " message : " +
+          ex.getClass().getName()  + " : " + ex.getMessage());
+      }
+      // Get message type
+      String rcvMsgType = replMsg.getClass().getName();
+      if (rcvMsgType.equals(msgType))
+      {
+        // Ok, got it, let's return the expected message
+        return replMsg;
+      }
+      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
+      nMsg++;
+      curTime = System.currentTimeMillis();
+    }
+    // Timeout
+    fail("Failed to receive an expected " + msgType +
+      " message after 5 seconds : also received " + nMsg +
+      " other messages during wait time.");
+    return null;
+  }
+
+  /**
+   * Wait for the arrival of a specific message type on the provided broker
+   * before going in timeout and failing.
+   * @param broker Broker from which we should receive the message.
+   * @param msgType Class of the message we are waiting for.
+   * @return The expected message if it comes in time or fails (assertion).
+   */
+  protected static ReplicationMsg waitForSpecificMsg(ReplicationBroker broker, String msgType) {
+
+    ReplicationMsg replMsg = null;
+
+    int timeOut = 5000; // 5 seconds max to wait for the desired message
+    long startTime = System.currentTimeMillis();
+    long curTime = startTime;
+    int nMsg = 0;
+    while ((curTime - startTime) <= timeOut)
+    {
+      try
+      {
+        replMsg = broker.receive();
+      } catch (Exception ex)
+      {
+        fail("Exception waiting for " + msgType + " message : " +
+          ex.getClass().getName()  + " : " + ex.getMessage());
+      }
+      // Get message type
+      String rcvMsgType = replMsg.getClass().getName();
+      if (rcvMsgType.equals(msgType))
+      {
+        // Ok, got it, let's return the expected message
+        return replMsg;
+      }
+      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
+      nMsg++;
+      curTime = System.currentTimeMillis();
+    }
+    // Timeout
+    fail("Failed to receive an expected " + msgType +
+      " message after 5 seconds : also received " + nMsg +
+      " other messages during wait time.");
+    return null;
+  }
 }

--
Gitblit v1.10.0