From 2dd63b9cc119a57154c7d7a752698493eea24903 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Tue, 16 Oct 2007 07:10:39 +0000
Subject: [PATCH] When a replica connects to a replication server that is "late" compared to the replica data , the replica must update the replication server with the missing changes. 1 line fix , the entryUUID attribute must be part of the searched attributes in order to rebuild the operations - and created unit tests for this.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java                                        |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java                                 |   76 ++++++++--
 opendj-sdk/opends/src/messages/messages/replication.properties                                                           |    8 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java     |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java                 |    2 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java |  303 +++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 370 insertions(+), 23 deletions(-)

diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 0f08170..47f7f6b 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -170,8 +170,8 @@
  This replication server will now shutdown
 SEVERE_ERR_REPLICATION_COULD_NOT_CONNECT_61=The Replication is configured for \
  suffix  %s but was not able to connect to any Replication Server
-NOTICE_NOW_FOUND_CHANGELOG_62=Replication Server %s now used for Replication \
- Domain %s
+NOTICE_NOW_FOUND_SAME_GENERATION_CHANGELOG_62=Replication is up and running \
+ for domain %s with replication server %s - data generation is %s
 NOTICE_DISCONNECTED_FROM_CHANGELOG_63=The connection to Replication Server %s \
  has been dropped by the Replication Server
 SEVERE_ERR_CHANGELOG_ERROR_SENDING_INFO_64=An unexpected error occurred \
@@ -239,3 +239,7 @@
  the replication server backend
 SEVERE_ERR_UNKNOWN_DN_95=The base DN %s is not stored by any of the \
  Directory Server backend
+NOTICE_NOW_FOUND_BAD_GENERATION_CHANGELOG_96=Replication is up but degraded \
+ for domain %s with replication server %s - local data generation is %s \
+ - replication server data generation is %s - This may be only temporary \
+  or require a full resynchronization
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java
index 2f740df..42b2f5b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java
@@ -84,7 +84,7 @@
   /**
    * The name of the entryuuid attribute.
    */
-  static final String ENTRYUIDNAME = "entryuuid";
+  public static final String ENTRYUIDNAME = "entryuuid";
 
 
   /*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
index 675f517..6709a78 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
@@ -60,7 +60,7 @@
    * Compare two ByteString values containing historical information.
    * @param value1 first value to compare
    * @param value2 second value to compare
-   * @return 0 when equals, -1 ot 1 to establish order
+   * @return 0 when equals, -1 or 1 to establish order
    */
   @Override
   public int compareValues(ByteString value1, ByteString value2)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index e561fb7..52e0ac4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -202,7 +202,7 @@
    */
   private void connect()
   {
-    ReplServerStartMessage replServerStartMsg;
+    ReplServerStartMessage replServerStartMsg = null;
 
     // Stop any existing heartbeat monitor from a previous session.
     if (heartbeatMonitor != null)
@@ -331,21 +331,11 @@
 
                 /*
                  * Get all the changes that have not been seen by this
-                 * replicationServer and update it
+                 * replicationServer and populate the replayOperations
+                 * list.
                  */
-                InternalClientConnection conn =
-                  InternalClientConnection.getRootConnection();
-                LDAPFilter filter = LDAPFilter.decode(
-                    "("+ Historical.HISTORICALATTRIBUTENAME +
-                    ">=dummy:" + replServerMaxChangeNumber + ")");
-                LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
-                attrs.add(Historical.HISTORICALATTRIBUTENAME);
-                InternalSearchOperation op = conn.processSearch(
-                    new ASN1OctetString(baseDn.toString()),
-                    SearchScope.WHOLE_SUBTREE,
-                    DereferencePolicy.NEVER_DEREF_ALIASES,
-                    0, 0, false, filter,
-                    attrs, this);
+                InternalSearchOperation op = seachForChangedEntries(
+                    baseDn, replServerMaxChangeNumber, this);
                 if (op.getResultCode() != ResultCode.SUCCESS)
                 {
                   /*
@@ -451,9 +441,27 @@
           sendWindow.release(Integer.MAX_VALUE);
         this.sendWindow = new Semaphore(maxSendWindow);
         connectPhaseLock.notify();
-        Message message =
-            NOTE_NOW_FOUND_CHANGELOG.get(replicationServer, baseDn.toString());
-        logError(message);
+
+        if ((replServerStartMsg.getGenerationId() == this.generationId) ||
+           (replServerStartMsg.getGenerationId() == -1))
+        {
+          Message message =
+            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
+                baseDn.toString(),
+                replicationServer,
+                Long.toString(this.generationId));
+          logError(message);
+        }
+        else
+        {
+          Message message =
+            NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
+                baseDn.toString(),
+                replicationServer,
+                Long.toString(this.generationId),
+                Long.toString(replServerStartMsg.getGenerationId()));
+          logError(message);
+        }
       }
       else
       {
@@ -476,6 +484,38 @@
   }
 
   /**
+   * Search for the changes that happened since fromChangeNumber
+   * based on the historical attribute.
+   * @param baseDn the base DN
+   * @param fromChangeNumber The change number from which we want the changes
+   * @param resultListener that will process the entries returned.
+   * @return the internal search operation
+   * @throws Exception when raised.
+   */
+  public static InternalSearchOperation seachForChangedEntries(
+      DN baseDn,
+      ChangeNumber fromChangeNumber,
+      InternalSearchListener resultListener)
+  throws Exception
+  {
+    InternalClientConnection conn =
+      InternalClientConnection.getRootConnection();
+    LDAPFilter filter = LDAPFilter.decode(
+        "("+ Historical.HISTORICALATTRIBUTENAME +
+        ">=dummy:" + fromChangeNumber + ")");
+    LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+    attrs.add(Historical.HISTORICALATTRIBUTENAME);
+    attrs.add(Historical.ENTRYUIDNAME);
+    return conn.processSearch(
+        new ASN1OctetString(baseDn.toString()),
+        SearchScope.WHOLE_SUBTREE,
+        DereferencePolicy.NEVER_DEREF_ALIASES,
+        0, 0, false, filter,
+        attrs,
+        resultListener);
+  }
+
+  /**
    * Start the heartbeat monitor thread.
    */
   private void startHeartBeat()
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
new file mode 100644
index 0000000..2f31563
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -0,0 +1,303 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.net.ServerSocket;
+import java.util.Iterator;
+import java.util.List;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.core.AddOperationBasis;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchResultEntry;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test the usage of the historical data of the replication.
+ */
+public class HistoricalCsnOrderingTest
+extends ReplicationTestCase
+{
+  /**
+   * A "person" entry
+   */
+  protected Entry personEntry;
+  private int replServerPort;
+
+  /**
+   * Set up the environment for performing the tests in this Class.
+   *
+   * @throws Exception
+   *           If the environment could not be set up.
+   */
+  @BeforeClass
+  @Override
+  public void setUp() throws Exception
+  {
+    super.setUp(); 
+
+    // Create backend top level entries
+    String[] topEntries = new String[2];
+    topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
+    + "objectClass: domain\n";
+    topEntries[1] = "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n"
+    + "objectClass: organizationalUnit\n"
+    + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
+    for (String entryStr : topEntries)
+    {
+      addEntry(TestCaseUtils.entryFromLdifString(entryStr));
+    }
+
+    // top level synchro provider
+    String synchroStringDN = "cn=Synchronization Providers,cn=config";
+
+    // Multimaster Synchro plugin
+    synchroPluginStringDN = "cn=Multimaster Synchronization, "
+      + synchroStringDN;
+
+    // find  a free port for the replicationServer
+    ServerSocket socket = TestCaseUtils.bindFreePort();
+    replServerPort = socket.getLocalPort();
+    socket.close();
+
+    // replication server
+    String replServerLdif =
+      "dn: cn=Replication Server, " + synchroPluginStringDN + "\n"
+      + "objectClass: top\n"
+      + "objectClass: ds-cfg-replication-server\n"
+      + "cn: Replication Server\n"
+      + "ds-cfg-replication-port: " + replServerPort + "\n"
+      + "ds-cfg-replication-server-id: 1\n";
+    replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif);
+
+    // suffix synchronized
+    String synchroServerLdif =
+      "dn: cn=example, cn=domains, " + synchroPluginStringDN + "\n"
+      + "objectClass: top\n"
+      + "objectClass: ds-cfg-replication-domain\n"
+      + "cn: example\n"
+      + "ds-cfg-base-dn: ou=People,dc=example,dc=com\n"
+      + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
+      + "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n";
+    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+
+    String personLdif = "dn: uid=user.1,ou=People,dc=example,dc=com\n"
+      + "objectClass: top\n" + "objectClass: person\n"
+      + "objectClass: organizationalPerson\n"
+      + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+      + "homePhone: 951-245-7634\n"
+      + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+      + "mobile: 027-085-0537\n"
+      + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+      + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
+      + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+      + "street: 17984 Thirteenth Street\n"
+      + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+      + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+      + "userPassword: password\n" + "initials: AA\n";
+    personEntry = TestCaseUtils.entryFromLdifString(personLdif);
+
+    configureReplication();
+  }
+
+  /**
+   * Add an entry in the datatbase
+   *
+   */
+  private void addEntry(Entry entry) throws Exception
+  {
+    AddOperationBasis addOp = new AddOperationBasis(connection,
+        InternalClientConnection.nextOperationID(), InternalClientConnection
+        .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
+        entry.getUserAttributes(), entry.getOperationalAttributes());
+    addOp.setInternalOperation(true);
+    addOp.run();
+    assertNotNull(getEntry(entry.getDN(), 1000, true));
+  }
+
+  /**
+   * Check the basic comparator on the HistoricalCsnOrderingMatchingRule
+   */
+  @Test()
+  public void basicRuleTest()
+  throws Exception
+  {
+    // Creates a rule
+    HistoricalCsnOrderingMatchingRule r = 
+      new HistoricalCsnOrderingMatchingRule();
+
+    ChangeNumber del1 = new ChangeNumber(1, (short) 0, (short) 1);
+    ChangeNumber del2 = new ChangeNumber(1, (short) 1, (short) 1);
+
+    ByteString v1 = new ASN1OctetString("a"+":"+del1.toString());
+    ByteString v2 = new ASN1OctetString("a"+":"+del2.toString());
+
+    int cmp = r.compareValues(v1, v1);
+    assertTrue(cmp == 0);
+
+    cmp = r.compareValues(v1, v2);
+    assertTrue(cmp == -1);
+
+    cmp = r.compareValues(v2, v1);
+    assertTrue(cmp == 1);
+  }
+
+  /**
+   * Test that we can retrieve the entries that were missed by 
+   * a replication server and can  re-build operations from the historical
+   * informations.
+   */
+  @Test()
+  public void changesCmpTest()
+  throws Exception
+  {
+    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+    final DN dn1 = DN.decode("cn=test1," + baseDn.toString());
+    final AttributeType histType =
+      DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
+
+    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+    "Starting replication test : changesCmpTest"));
+
+    // Add the first test entry.
+    TestCaseUtils.addEntry(
+        "dn: cn=test1," + baseDn.toString(),
+        "displayname: Test1",
+        "objectClass: top",
+        "objectClass: person",
+        "objectClass: organizationalPerson",
+        "objectClass: inetOrgPerson",
+        "cn: test1",
+        "sn: test"
+    );
+
+    // Perform a first modification to update the historical attribute
+    int resultCode = TestCaseUtils.applyModifications(
+        "dn: cn=test1," + baseDn.toString(),
+        "changetype: modify",
+        "add: description",
+    "description: foo");
+    assertEquals(resultCode, 0);
+
+    // Read the entry back to get its historical and included changeNumber
+    Entry entry = DirectoryServer.getEntry(dn1);
+    List<Attribute> attrs1 = entry.getAttribute(histType);
+
+    assertTrue(attrs1 != null);
+    assertTrue(attrs1.isEmpty() != true);
+
+    String histValue =
+      attrs1.get(0).getValues().iterator().next().getStringValue();
+
+    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+        "First historical value:" + histValue));
+
+    // Perform a 2nd modification to update the hist attribute with 
+    // a second value
+    resultCode = TestCaseUtils.applyModifications(
+        "dn: cn=test1," + baseDn.toString(),
+        "changetype: modify",
+        "add: description",
+    "description: bar");
+    assertEquals(resultCode, 0);
+
+    Entry entry2 = DirectoryServer.getEntry(dn1);
+    List<Attribute> attrs2 = entry2.getAttribute(histType);
+
+    assertTrue(attrs2 != null);
+    assertTrue(attrs2.isEmpty() != true);
+
+    Iterator<AttributeValue> iav = attrs2.get(0).getValues().iterator();
+    try
+    {
+      while (true)
+      {
+        AttributeValue av = iav.next();
+        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+            "Secnd historical value:" + av.getNormalizedStringValue()));
+      }
+    }
+    catch(Exception e)
+    {
+    }
+
+    // Build a change number from the first modification
+    String hv[] = histValue.split(":");
+    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+        hv[1]));
+    ChangeNumber fromChangeNumber =
+      new ChangeNumber(hv[1]);
+
+    // Retrieves the entries that have changed since the first modification
+    InternalSearchOperation op =
+      ReplicationBroker.seachForChangedEntries(baseDn, fromChangeNumber, null);
+
+    // The expected result is one entry .. the one previously modified
+    assertTrue(op.getResultCode() == ResultCode.SUCCESS);
+    assertTrue(op.getSearchEntries().size()==1);
+
+    // From the historical of this entry, rebuild operations
+    // Since there have been 2 modifications, there should be 2
+    // operations rebuild from this state.
+    int updatesCnt = 0;
+    for (SearchResultEntry searchEntry : op.getSearchEntries())
+    {
+      logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+          searchEntry.toString()));
+      Iterable<FakeOperation> updates =
+        Historical.generateFakeOperations(searchEntry);
+      for (FakeOperation fop : updates)
+      {
+        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+            fop.generateMessage().toString()));
+        updatesCnt++;
+      }
+    }
+    assertTrue(updatesCnt == 2);    
+  }
+}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 165d085..e23c770 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -1260,7 +1260,7 @@
      {
        server1.publish(msg);
      }
-     Thread.sleep(200);
+     Thread.sleep(500);
 
      // Sets manually the association backend-replication server since
      // no config object exist for our replication server.

--
Gitblit v1.10.0