From a5c5efbf8ca56c059709953f7fedb647dadaed06 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 27 May 2010 15:28:09 +0000
Subject: [PATCH] Fix for issues #3395 and #3998. The changes improves the replica initialization protocol, especially flow control and handling connection outage.

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java |  288 ++++++++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 220 insertions(+), 68 deletions(-)

diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index c9901b6..07dceca 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Copyright 2006-2010 Sun Microsystems, Inc.
  */
 package org.opends.server.replication;
 
@@ -61,10 +61,13 @@
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.replication.service.ReplicationBroker;
+import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.plugin.LDAPReplicationDomain;
 import org.opends.server.replication.protocol.DoneMsg;
 import org.opends.server.replication.protocol.EntryMsg;
 import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.HeartbeatThread;
 import org.opends.server.replication.protocol.InitializeRequestMsg;
 import org.opends.server.replication.protocol.InitializeTargetMsg;
 import org.opends.server.replication.protocol.ReplicationMsg;
@@ -146,9 +149,11 @@
   boolean emptyOldChanges = true;
   LDAPReplicationDomain replDomain = null;
 
+  int initWindow = 100;
+
   private void log(String s)
   {
-    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+    logError(Message.raw(Category.SYNC, Severity.NOTICE,
         "InitOnLineTests/" + s));
     if (debugEnabled())
     {
@@ -183,7 +188,8 @@
     // clear it.
     LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
 
-    updatedEntries = newLDIFEntries();
+    // For most tests, a limited number of entries is enough
+    updatedEntries = newLDIFEntries(2);
 
     // Create an internal connection in order to provide operations
     // to DS to populate the db -
@@ -283,6 +289,7 @@
   private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
       long expectedLeft, long expectedDone)
   {
+    log("waitTaskCompleted " + taskEntry.toLDIFString());
     try
     {
       // FIXME - Factorize with TasksTestCase
@@ -398,6 +405,22 @@
       for (String ldifEntry : updatedEntries)
       {
         Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
+        addTestEntryToDB(entry);
+        // They will be removed at the end of the test
+        entryList.addLast(entry.getDN());
+      }
+      log("addTestEntriesToDB : " + updatedEntries.length + " successfully added to DB");
+    }
+    catch(Exception e)
+    {
+      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+    }
+  }
+
+  private void addTestEntryToDB(Entry entry)
+  {
+    try
+    {
         AddOperationBasis addOp = new AddOperationBasis(connection,
             InternalClientConnection.nextOperationID(), InternalClientConnection
             .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
@@ -411,18 +434,17 @@
 
         // They will be removed at the end of the test
         entryList.addLast(entry.getDN());
-      }
     }
     catch(Exception e)
     {
-      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
+      fail("addTestEntryToDB Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
     }
   }
 
   /*
    * Creates entries necessary to the test.
    */
-  private String[] newLDIFEntries()
+  private String[] newLDIFEntries(int entriesCnt)
   {
     // It is relevant to test ReplLDIFInputStream
     // and ReplLDIFOutputStream with big entries
@@ -430,46 +452,76 @@
     for (int i=0; i<bigAttributeValue.length; i++)
       bigAttributeValue[i] = Integer.toString(i).charAt(0);
 
-    String[] entries =
-    {
+    String[] entries = new String[entriesCnt + 2];
+    String filler = "000000000000000000000000000000000000";
+
+    entries[0] = new String(
         "dn: " + EXAMPLE_DN + "\n"
         + "objectClass: top\n"
         + "objectClass: domain\n"
         + "dc: example\n"
         + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
-        + "\n",
+        + "\n");
+    entries[1] = new String(
           "dn: ou=People," + EXAMPLE_DN + "\n"
         + "objectClass: top\n"
         + "objectClass: organizationalUnit\n"
         + "ou: People\n"
         + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
-        + "\n",
-          "dn: cn=Fiona Jensen,ou=people," + EXAMPLE_DN + "\n"
+        + "\n");
+
+    for (int i=0; i<entriesCnt; i++)
+    {
+      String useri="0000"+i;
+      entries[i+2] = new String(
+          "dn: cn="+useri+",ou=people," + EXAMPLE_DN + "\n"
         + "objectclass: top\n"
         + "objectclass: person\n"
         + "objectclass: organizationalPerson\n"
         + "objectclass: inetOrgPerson\n"
-        + "cn: Fiona Jensen\n"
-        + "sn: Jensen\n"
-        + "uid: fiona\n"
+        + "cn: "+useri+"_cn"+"\n"
+        + "sn: "+useri+"_sn"+"\n"
+        + "uid: "+useri+"_uid"+"\n"
         + "telephonenumber:: "+ Base64.encode(
             new String(bigAttributeValue).getBytes())+"\n"
-        + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
-        + "\n",
-          "dn: cn=Robert Langman,ou=people," + EXAMPLE_DN + "\n"
+        + "entryUUID: 21111111-1111-1111-1111-"+useri+
+        filler.substring(0, 12-useri.length())+"\n"
+        + "\n");
+    };
+
+    return entries;
+  }
+
+  /*
+   * Creates entries necessary to the test.
+   */
+  private String newLDIFEntry(int entryCnt)
+  {
+    // It is relevant to test ReplLDIFInputStream
+    // and ReplLDIFOutputStream with big entries
+    char bigAttributeValue[] = new char[30240];
+    for (int i=0; i<bigAttributeValue.length; i++)
+      bigAttributeValue[i] = Integer.toString(i).charAt(0);
+
+    String filler = "000000000000000000000000000000000000";
+
+    String useri="0000"+entryCnt;
+
+    return  new String(
+        "dn: cn="+useri+",ou=people," + EXAMPLE_DN + "\n"
         + "objectclass: top\n"
         + "objectclass: person\n"
         + "objectclass: organizationalPerson\n"
         + "objectclass: inetOrgPerson\n"
-        + "cn: Robert Langman\n"
-        + "sn: Langman\n"
-        + "uid: robert\n"
-        + "telephonenumber: "+ new String(bigAttributeValue)+"\n"
-        + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
-        + "\n"
-        };
+        + "cn: "+useri+"_cn"+"\n"
+        + "sn: "+useri+"_sn"+"\n"
+        + "uid: "+useri+"_uid"+"\n"
+        + "telephonenumber:: "+ Base64.encode(
+            new String(bigAttributeValue).getBytes())+"\n"
+            + "entryUUID: 21111111-1111-1111-1111-"+useri+
+            filler.substring(0, 12-useri.length())+"\n"
+            + "\n");
 
-    return entries;
   }
 
   /**
@@ -488,15 +540,16 @@
       RoutableMsg initTargetMessage =
         new InitializeTargetMsg(
           EXAMPLE_DN, server2ID, destinationServerID, requestorID,
-          updatedEntries.length);
+          updatedEntries.length, initWindow);
       broker.publish(initTargetMessage);
 
+      int cnt = 0;
       for (String entry : updatedEntries)
       {
-        log("Broker will pusblish 1 entry: bytes:"+ entry.length());
+        log("Broker will publish 1 entry: bytes:"+ entry.length());
 
         EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID,
-            entry.getBytes());
+            entry.getBytes(), ++cnt);
         broker.publish(entryMsg);
       }
 
@@ -559,7 +612,7 @@
       }
       catch (SocketTimeoutException e)
       {
-        log("SocketTimeoutException while waiting fro entries" +
+        log("SocketTimeoutException while waiting for entries" +
             stackTraceToSingleLineString(e));
       }
       catch(Exception e)
@@ -571,6 +624,11 @@
     assertTrue(entriesReceived == updatedEntries.length,
         " Received entries("+entriesReceived +
         ") == Expected entries("+updatedEntries.length+")");
+
+    broker.setGenerationID(EMPTY_DN_GENID);
+    broker.reStart(true);    
+    try { Thread.sleep(500); } catch(Exception e) {}
+
   }
 
   /**
@@ -643,6 +701,11 @@
    */
   private void connectServer1ToChangelog(int changelogID)
   {
+    connectServer1ToChangelog(changelogID, 0);
+  }
+
+  private void connectServer1ToChangelog(int changelogID, int heartbeat)
+  {
     // Connect DS to the replicationServer
     try
     {
@@ -651,7 +714,7 @@
       String synchroServerLdif =
         "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
       + "objectClass: top\n"
-      + "objectC7lass: ds-cfg-synchronization-provider\n"
+      + "objectClass: ds-cfg-synchronization-provider\n"
       + "objectClass: ds-cfg-replication-domain\n"
       + "cn: " + testName + "\n"
       + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
@@ -659,7 +722,7 @@
       + getChangelogPort(changelogID)+"\n"
       + "ds-cfg-server-id: " + server1ID + "\n"
       + "ds-cfg-receive-status: true\n"
-//    + "ds-cfg-heartbeat-interval: 0 ms\n"
+      + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
       + "ds-cfg-window-size: " + WINDOW_SIZE;
 
 
@@ -706,11 +769,19 @@
 
   /**
    * Tests the import side of the Initialize task
+   * Test steps : 
+   * - create a task 'InitFromS2' in S1
+   * - make S2 export its entries
+   * - test that S1 has succesfully imported the entries and completed the task.
+   * 
+   * TODO: Error case: make S2 crash/disconnect in the middle of the export 
+   * and test that, on S1 side, the task ends with an error. 
+   * State of the backend on S1 partially initialized: ?
    */
   @Test(enabled=true, groups="slow")
   public void initializeImport() throws Exception
   {
-    String testCase = "initializeImport";
+    String testCase = "initializeImport ";
 
     log("Starting "+testCase);
 
@@ -740,8 +811,8 @@
       InitializeRequestMsg initMsg = (InitializeRequestMsg)msg;
 
       // S2 publishes entries to S1
-      makeBrokerPublishEntries(server2, server2ID, initMsg.getsenderID(),
-          initMsg.getsenderID());
+      makeBrokerPublishEntries(server2, server2ID, initMsg.getSenderID(),
+          initMsg.getSenderID());
 
       // Wait for task (import) completion in S1
       waitTaskCompleted(taskInitFromS2, TaskState.COMPLETED_SUCCESSFULLY,
@@ -757,12 +828,16 @@
       fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
     } finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
   /**
    * Tests the export side of the Initialize task
+   * Test steps : 
+   * - add entries in S1, make S2 publish InitRequest
+   * - test that S1 has succesfully exported the entries (by receiving them
+   *   on S2 side).
    */
   @Test(enabled=true, groups="slow")
   public void initializeExport() throws Exception
@@ -789,20 +864,27 @@
       // Thread.sleep(3000);
 
       InitializeRequestMsg initMsg = new InitializeRequestMsg(EXAMPLE_DN,
-        server2ID, server1ID);
+        server2ID, server1ID, 100);
       server2.publish(initMsg);
 
+      // Signal RS we just entered the full update status
+      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
+
       receiveUpdatedEntries(server2, server2ID, updatedEntries);
 
       log("Successfully ending " + testCase);
     } finally
     {
-      afterTest();
+      afterTest(testCase);
     }
 }
 
   /**
    * Tests the import side of the InitializeTarget task
+   * Test steps : 
+   * - add entries in S1 and create a task 'InitTargetS2' in S1
+   * - wait task completed
+   * - test that S2 has succesfully received the entries
    */
   @Test(enabled=true, groups="slow")
   public void initializeTargetExport() throws Exception
@@ -832,21 +914,33 @@
       // Launch in S1 the task that will initialize S2
       addTask(taskInitTargetS2, ResultCode.SUCCESS, null);
 
-      // Wait for task completion
-      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
+      // Signal RS we just entered the full update status
+      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
 
       // Tests that entries have been received by S2
       receiveUpdatedEntries(server2, server2ID, updatedEntries);
 
+      // Wait for task completion
+      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
+
       log("Successfully ending " + testCase);
     } finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
   /**
    * Tests the import side of the InitializeTarget task
+   * Test steps : 
+   * - addEntries in S1, create a task 'InitAll' in S1
+   * - wait task completed on S1
+   * - test that S2 and S3 have succesfully imported the entries.
+   * 
+   * TODO: Error case: make S1 crash in the middle of the export and test that
+   * the task ends with an error. State of the backend on both S2 and S3: ?
+   *
+   * TODO: Error case: make S2 crash in the middle of the import and test what??
    */
   @Test(enabled=true, groups="slow")
   public void initializeTargetExportAll() throws Exception
@@ -879,17 +973,22 @@
       // Launch in S1 the task that will initialize S2
       addTask(taskInitTargetAll, ResultCode.SUCCESS, null);
 
-      // Wait for task completion
-      waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, null);
-
       // Tests that entries have been received by S2
+
+      // Signal RS we just entered the full update status
+      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
+      server3.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
+
       receiveUpdatedEntries(server2, server2ID, updatedEntries);
       receiveUpdatedEntries(server3, server3ID, updatedEntries);
 
+      // Wait for task completion
+      waitTaskState(taskInitTargetAll, TaskState.COMPLETED_SUCCESSFULLY, null);
+
       log("Successfully ending " + testCase);
     } finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
@@ -921,13 +1020,20 @@
 
       // wait until the replication domain has expected generationID
       // this should indicate that the import occured correctly.
-      for (int count = 0; count < 100; count++)
+      for (int count = 0; count < 120; count++)
       {
-        if (replDomain.getGenerationID() == 56869)
+        if (replDomain.getGenerationID() == 53235)
           break;
-        Thread.sleep(200);
+        log(testCase + " genId=" + replDomain.getGenerationID());
+        Thread.sleep(1000);
       }
 
+      if (replDomain.getGenerationID() != 53235)
+      {
+        fail(testCase + " Import success waited longer than expected \n" + 
+            TestCaseUtils.threadStacksToString());
+      }
+      
       // Test that entries have been imported in S1
       testEntriesInDb();
 
@@ -938,7 +1044,7 @@
       fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
     } finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
@@ -993,7 +1099,7 @@
       fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
     } finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
@@ -1052,7 +1158,8 @@
           "ds-task-initialize-domain-dn: " + baseDn,
           "ds-task-initialize-replica-server-id: -3");
       addTask(taskInit, ResultCode.OTHER,
-          ERR_INVALID_IMPORT_SOURCE.get());
+          ERR_INVALID_IMPORT_SOURCE.get(baseDn.toNormalizedString(), 
+              Integer.toString(server1ID),"-3",""));
 
       // Scope containing a serverID absent from the domain
       // createTask(taskInitTargetS2);
@@ -1064,7 +1171,7 @@
       fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
     } finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
@@ -1156,23 +1263,24 @@
 
     // TODO Test ReplicationServerDomain.getDestinationServers method.
 
+      log("Successfully ending " + testCase);
+
     } finally
     {
       if (broker2 != null)
         broker2.stop();
       if (broker3 != null)
         broker3.stop();
-      afterTest();
+      afterTest(testCase);
     }
   }
 
   @Test(enabled=true, groups="slow")
   public void initializeTargetExportMultiSS() throws Exception
   {
+    String testCase = "initializeTargetExportMultiSS";
     try
     {
-      String testCase = "initializeTargetExportMultiSS";
-
       log("Starting " + testCase);
 
       // Create 2 changelogs
@@ -1190,6 +1298,7 @@
       // connected to changelog2
       if (server2 == null)
       {
+        log(testCase + " Will connect server 2 to " + changelog2ID);
         server2 = openReplicationSession(DN.decode(EXAMPLE_DN),
             server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
       }
@@ -1197,19 +1306,40 @@
      // Thread.sleep(1000);
 
       // Launch in S1 the task that will initialize S2
+      log(testCase + " add task " + Thread.currentThread());
       addTask(taskInitTargetS2, ResultCode.SUCCESS, null);
 
-      // Wait for task completion
-      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
+      log(testCase + " " + server2.getServerId() + " wait target " + Thread.currentThread());
+      ReplicationMsg msgrcv;
+      do
+      {
+        msgrcv = server2.receive();
+        log(testCase + " " + server2.getServerId() + " receives " + msgrcv);
+      }
+      while(!(msgrcv instanceof InitializeTargetMsg));
+      assertTrue(msgrcv instanceof InitializeTargetMsg, msgrcv.getClass().getCanonicalName());
+
+      // Signal RS we just entered the full update status
+      log(testCase + " change status");
+      server2.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
 
       // Tests that entries have been received by S2
+      log(testCase + " receive entries");
       receiveUpdatedEntries(server2, server2ID, updatedEntries);
 
+      // Wait for task completion
+      log(testCase + " wait task completed");
+      waitTaskState(taskInitTargetS2, TaskState.COMPLETED_SUCCESSFULLY, null);
+
       log("Successfully ending " + testCase);
     }
+    catch(Exception e)
+    {
+      log(testCase + e.getLocalizedMessage());      
+    }
     finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
@@ -1263,17 +1393,34 @@
       // S3 sends init request
       log(testCase + " server 3 Will send reqinit to " + server1ID);
       InitializeRequestMsg initMsg =
-        new InitializeRequestMsg(EXAMPLE_DN, server3ID, server1ID);
+        new InitializeRequestMsg(EXAMPLE_DN, server3ID, server1ID, 100);
       server3.publish(initMsg);
 
       // S3 should receive target, entries & done
+      log(testCase + " Wait for InitializeTargetMsg");
+
+      ReplicationMsg msgrcv = null;
+      do
+      {
+        msgrcv = server3.receive();
+        log(testCase + " receives  "+ msgrcv);
+      }
+      while (!(msgrcv instanceof InitializeTargetMsg));
+      assertTrue(msgrcv instanceof InitializeTargetMsg,msgrcv.getClass().getCanonicalName() +
+      msgrcv);
+
+      // Signal RS we just entered the full update status
+      server3.signalStatusChange(ServerStatus.FULL_UPDATE_STATUS);
+
       log(testCase + " Will verify server 3 has received expected entries");
       receiveUpdatedEntries(server3, server3ID, updatedEntries);
 
+      log(testCase + " Will verify no more msgs");
       while (true)
       {
         try
         {
+          log(testCase + " Will receive");
           ReplicationMsg msg = server3.receive();
           fail("Receive unexpected message " + msg);
         } catch (SocketTimeoutException e)
@@ -1282,11 +1429,11 @@
           break;
         }
       }
-
       log("Successfully ending " + testCase);
-    } finally
+    }
+    finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
@@ -1318,7 +1465,8 @@
       addTask(taskInit, ResultCode.SUCCESS, null);
 
       waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
-        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
+        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
+            baseDn.toString(), "20"));
 
       // Test 2
       taskInit = TestCaseUtils.makeEntry(
@@ -1331,7 +1479,9 @@
         "ds-task-initialize-domain-dn: " + baseDn,
         "ds-task-initialize-replica-server-id: " + server1ID);
 
-      addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get());
+      addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get(
+          baseDn.toNormalizedString(), 
+          Integer.toString(server1ID),"20",""));
 
       if (replDomain != null)
       {
@@ -1342,7 +1492,7 @@
       log("Successfully ending " + testCase);
     } finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
@@ -1376,7 +1526,7 @@
       addTask(taskInit, ResultCode.SUCCESS, null);
 
       waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR,
-        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
+        ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDn.toString(), "0"));
 
       if (replDomain != null)
       {
@@ -1387,7 +1537,7 @@
       log("Successfully ending " + testCase);
     } finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
@@ -1483,14 +1633,15 @@
       log("Successfully ending " + testCase);
     } finally
     {
-      afterTest();
+      afterTest(testCase);
     }
   }
 
   /**
    * Disconnect broker and remove entries from the local DB
+   * @param testCase The name of the test case.
    */
-  protected void afterTest()
+  protected void afterTest(String testCase)
   {
 
     // Check that the domain has completed the import/export task.
@@ -1564,6 +1715,7 @@
     {
       replServerPort[i] = 0;
     }
+    log("Successfully cleaned " + testCase);
   }
 
     /**

--
Gitblit v1.10.0