From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java |  137 +++++++++++++++++++++++++++++----------------
 1 files changed, 87 insertions(+), 50 deletions(-)

diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index a119ab3..12ddbfc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -48,6 +48,7 @@
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.net.SocketTimeoutException;
 
 import org.opends.server.TestCaseUtils;
 import org.opends.server.backends.task.TaskState;
@@ -124,14 +125,14 @@
   boolean ssShutdownRequested = false;
   protected String[] updatedEntries;
   boolean externalDS = false;
-  private static final short server1ID = 11;
-  private static final short server2ID = 21;
-  private static final short server3ID = 31;
-  private static final short changelog1ID = 1;
-  private static final short changelog2ID = 2;
-  private static final short changelog3ID = 3;
+  private static final short server1ID = 1;
+  private static final short server2ID = 2;
+  private static final short server3ID = 3;
+  private static final short changelog1ID =  8;
+  private static final short changelog2ID =  9;
+  private static final short changelog3ID = 10;
 
-  private static int[] replServerPort = new int[4];
+  private static int[] replServerPort = new int[20];
   
   private DN baseDn;
   ReplicationBroker server2 = null;
@@ -140,7 +141,7 @@
   ReplicationServer changelog2 = null;
   ReplicationServer changelog3 = null;
   boolean emptyOldChanges = true;
-  ReplicationDomain sd = null;
+  ReplicationDomain replDomain = null;
 
   private void log(String s)
   {
@@ -169,7 +170,6 @@
 
     // This test suite depends on having the schema available.
     TestCaseUtils.startServer();
-
     baseDn = DN.decode("dc=example,dc=com");
 
     updatedEntries = newLDIFEntries();
@@ -756,9 +756,14 @@
       servers.add("localhost:" + getChangelogPort(changelog2ID));
       servers.add("localhost:" + getChangelogPort(changelog3ID));
 
-      int chPort = getChangelogPort(changelogId);
       ReplServerFakeConfiguration conf =
-        new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
+        new ReplServerFakeConfiguration(
+            getChangelogPort(changelogId), 
+            "rsdbdirname" + getChangelogPort(changelogId), 
+            0, 
+            changelogId, 
+            0, 
+            100,
             servers);
       ReplicationServer replicationServer = new ReplicationServer(conf);
       Thread.sleep(1000);
@@ -804,17 +809,18 @@
         assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
         "Unable to add the synchronized server");
 
-        sd = ReplicationDomain.retrievesReplicationDomain(baseDn);
+        replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
 
         // Clear the backend
         ReplicationDomain.clearJEBackend(false,
-            sd.getBackend().getBackendID(),
+            replDomain.getBackend().getBackendID(),
             baseDn.toNormalizedString());
 
       }
-      if (sd != null)
+      if (replDomain != null)
       {
-         log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
+         assertTrue(!replDomain.ieRunning(),
+           "ReplicationDomain: Import/Export is not expected to be running");
       }
     }
     catch(Exception e)
@@ -890,7 +896,7 @@
       // Test import result in S1
       testEntriesInDb();
 
-      cleanEntries();
+      afterTest();
 
       log("Successfully ending " + testCase);
     }
@@ -929,7 +935,7 @@
 
     receiveUpdatedEntries(server2, server2ID, updatedEntries);
 
-    cleanEntries();
+    afterTest();
 
     log("Successfully ending "+testCase);
 }
@@ -968,7 +974,7 @@
     // Tests that entries have been received by S2
     receiveUpdatedEntries(server2, server2ID, updatedEntries);
 
-    cleanEntries();
+    afterTest();
 
     log("Successfully ending " + testCase);
 
@@ -1012,7 +1018,7 @@
     receiveUpdatedEntries(server2, server2ID, updatedEntries);
     receiveUpdatedEntries(server3, server3ID, updatedEntries);
 
-    cleanEntries();
+    afterTest();
 
     log("Successfully ending " + testCase);
 
@@ -1049,7 +1055,7 @@
       // Test that entries have been imported in S1
       testEntriesInDb();
 
-      cleanEntries();
+      afterTest();
 
       log("Successfully ending " + testCase);
     }
@@ -1103,7 +1109,7 @@
       // Scope containing a serverID absent from the domain
       // createTask(taskInitTargetS2);
 
-      cleanEntries();
+      afterTest();
 
       log("Successfully ending " + testCase);
     }
@@ -1172,7 +1178,7 @@
       // Scope containing a serverID absent from the domain
       // createTask(taskInitTargetS2);
 
-      cleanEntries();
+      afterTest();
 
       log("Successfully ending " + testCase);
     }
@@ -1231,25 +1237,25 @@
 
     // Check that the list of connected LDAP servers is correct
     // in each replication servers
-    List<String> l1 = changelog1.getReplicationCache(baseDn).
+    List<String> l1 = changelog1.getReplicationCache(baseDn, false).
       getConnectedLDAPservers();
     assertEquals(l1.size(), 1);
     assertEquals(l1.get(0), String.valueOf(server1ID));
     
     List<String> l2;
-    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
     assertEquals(l2.size(), 2);
     assertEquals(l2.get(0), String.valueOf(server3ID));
     assertEquals(l2.get(1), String.valueOf(server2ID));
         
     List<String> l3;
-    l3 = changelog3.getReplicationCache(baseDn).getConnectedLDAPservers();
+    l3 = changelog3.getReplicationCache(baseDn, false).getConnectedLDAPservers();
     assertEquals(l3.size(), 0);
 
     // Test updates
     broker3.stop();
     Thread.sleep(1000);
-    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
     assertEquals(l2.size(), 1);
     assertEquals(l2.get(0), String.valueOf(server2ID));
 
@@ -1257,7 +1263,7 @@
         server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
     broker2.stop();
     Thread.sleep(1000);
-    l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
     assertEquals(l2.size(), 1);
     assertEquals(l2.get(0), String.valueOf(server3ID));
 
@@ -1266,7 +1272,7 @@
     broker2.stop();
     broker3.stop();
 
-    cleanEntries();
+    afterTest();
 
     changelog3.shutdown();
     changelog3 = null;
@@ -1313,7 +1319,7 @@
     // Tests that entries have been received by S2
     receiveUpdatedEntries(server2, server2ID, updatedEntries);
 
-    cleanEntries();
+    afterTest();
 
     changelog2.shutdown();
     changelog2 = null;
@@ -1335,42 +1341,64 @@
     changelog2 = createChangelogServer(changelog2ID);
     Thread.sleep(1000);
 
-    changelog3 = createChangelogServer(changelog3ID);
-    Thread.sleep(1000);
-
     // Connect DS to the replicationServer 1
     connectServer1ToChangelog(changelog1ID);
 
     // Put entries in DB
+    log(testCase + " Will add entries");
     addTestEntriesToDB();
 
     // Connect a broker acting as server 2 to Repl Server 2
     if (server2 == null)
     {
+      log(testCase + " Will connect server 2 to " + changelog2ID);
       server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
         server2ID, 100, getChangelogPort(changelog2ID),
-        1000, emptyOldChanges);
+        1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
     }
 
     // Connect a broker acting as server 3 to Repl Server 3
+    log(testCase + " Will create replServer " + changelog3ID);
+    changelog3 = createChangelogServer(changelog3ID);
+    Thread.sleep(500);
     if (server3 == null)
     {
+      log(testCase + " Will connect server 3 to " + changelog3ID);
       server3 = openReplicationSession(DN.decode("dc=example,dc=com"),
         server3ID, 100, getChangelogPort(changelog3ID),
-        1000, emptyOldChanges);
+        1000, emptyOldChanges, changelog1.getGenerationId(baseDn));
     }
 
-    Thread.sleep(3000);
+    Thread.sleep(500);
 
-    // S2 sends init request
+    // S3 sends init request
+    log(testCase + " server 3 Will send reqinit to " + server1ID);
     InitializeRequestMessage initMsg =
-      new InitializeRequestMessage(baseDn, server2ID, server1ID);
-    server2.publish(initMsg);
+      new InitializeRequestMessage(baseDn, server3ID, server1ID);
+    server3.publish(initMsg);
 
-    // S2 should receive target, entries & done
-    receiveUpdatedEntries(server2, server2ID, updatedEntries);
+    // S3 should receive target, entries & done
+    log(testCase + " Will verify server 3 has received expected entries");
+    receiveUpdatedEntries(server3, server3ID, updatedEntries);
 
-    cleanEntries();
+    while(true)
+    {
+      try
+      {
+        ReplicationMessage msg = server3.receive();
+        fail("Receive unexpected message " + msg);
+      }
+      catch(SocketTimeoutException e)
+      {
+        // Test is a success
+        break;
+      }
+    }
+    
+    afterTest();
+
+    changelog3.shutdown();
+    changelog3 = null;
 
     changelog2.shutdown();
     changelog2 = null;
@@ -1419,9 +1447,10 @@
 
     addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get());
 
-    if (sd != null)
+    if (replDomain != null)
     {
-       log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
+       assertTrue(!replDomain.ieRunning(),
+         "ReplicationDomain: Import/Export is not expected to be running");
     }
 
     log("Successfully ending "+testCase);
@@ -1458,9 +1487,10 @@
     waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
         ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
 
-    if (sd != null)
+    if (replDomain != null)
     {
-       log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
+       assertTrue(!replDomain.ieRunning(),
+         "ReplicationDomain: Import/Export is not expected to be running");
     }
 
     log("Successfully ending "+testCase);
@@ -1554,7 +1584,7 @@
     waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
         null);
 
-    cleanEntries();
+    afterTest();
 
     log("Successfully ending "+testCase);
 
@@ -1563,23 +1593,30 @@
   /**
    * Disconnect broker and remove entries from the local DB
    */
-  protected void cleanEntries()
+  protected void afterTest()
   {
 
-    if (sd != null)
+    if (replDomain != null)
     {
-       log("ReplicationDomain: Import/Export is running ? " + sd.ieRunning());
+       assertTrue(!replDomain.ieRunning(),
+         "ReplicationDomain: Import/Export is not expected to be running");
     }
 
     // Clean brokers
     if (server2 != null)
     {
       server2.stop();
-
       TestCaseUtils.sleep(100); // give some time to the broker to disconnect
       // from the replicationServer.
       server2 = null;
     }
+    if (server3 != null)
+    {
+      server3.stop();
+      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
+      // from the replicationServer.
+      server3 = null;
+    }
     super.cleanRealEntries();
   }
 }

--
Gitblit v1.10.0