From 65f071d6b9adf6414c8074381f5e95acb1297565 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Tue, 16 Oct 2007 07:10:39 +0000
Subject: [PATCH] When a replica connects to a replication server that is "late" compared to the replica data , the replica must update the replication server with the missing changes. 1 line fix , the entryUUID attribute must be part of the searched attributes in order to rebuild the operations - and created unit tests for this.

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java |   76 +++++++++++++++++++++++++++++---------
 1 files changed, 58 insertions(+), 18 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index e561fb7..52e0ac4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -202,7 +202,7 @@
    */
   private void connect()
   {
-    ReplServerStartMessage replServerStartMsg;
+    ReplServerStartMessage replServerStartMsg = null;
 
     // Stop any existing heartbeat monitor from a previous session.
     if (heartbeatMonitor != null)
@@ -331,21 +331,11 @@
 
                 /*
                  * Get all the changes that have not been seen by this
-                 * replicationServer and update it
+                 * replicationServer and populate the replayOperations
+                 * list.
                  */
-                InternalClientConnection conn =
-                  InternalClientConnection.getRootConnection();
-                LDAPFilter filter = LDAPFilter.decode(
-                    "("+ Historical.HISTORICALATTRIBUTENAME +
-                    ">=dummy:" + replServerMaxChangeNumber + ")");
-                LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
-                attrs.add(Historical.HISTORICALATTRIBUTENAME);
-                InternalSearchOperation op = conn.processSearch(
-                    new ASN1OctetString(baseDn.toString()),
-                    SearchScope.WHOLE_SUBTREE,
-                    DereferencePolicy.NEVER_DEREF_ALIASES,
-                    0, 0, false, filter,
-                    attrs, this);
+                InternalSearchOperation op = seachForChangedEntries(
+                    baseDn, replServerMaxChangeNumber, this);
                 if (op.getResultCode() != ResultCode.SUCCESS)
                 {
                   /*
@@ -451,9 +441,27 @@
           sendWindow.release(Integer.MAX_VALUE);
         this.sendWindow = new Semaphore(maxSendWindow);
         connectPhaseLock.notify();
-        Message message =
-            NOTE_NOW_FOUND_CHANGELOG.get(replicationServer, baseDn.toString());
-        logError(message);
+
+        if ((replServerStartMsg.getGenerationId() == this.generationId) ||
+           (replServerStartMsg.getGenerationId() == -1))
+        {
+          Message message =
+            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
+                baseDn.toString(),
+                replicationServer,
+                Long.toString(this.generationId));
+          logError(message);
+        }
+        else
+        {
+          Message message =
+            NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
+                baseDn.toString(),
+                replicationServer,
+                Long.toString(this.generationId),
+                Long.toString(replServerStartMsg.getGenerationId()));
+          logError(message);
+        }
       }
       else
       {
@@ -476,6 +484,38 @@
   }
 
   /**
+   * Search for the changes that happened since fromChangeNumber
+   * based on the historical attribute.
+   * @param baseDn the base DN
+   * @param fromChangeNumber The change number from which we want the changes
+   * @param resultListener that will process the entries returned.
+   * @return the internal search operation
+   * @throws Exception when raised.
+   */
+  public static InternalSearchOperation seachForChangedEntries(
+      DN baseDn,
+      ChangeNumber fromChangeNumber,
+      InternalSearchListener resultListener)
+  throws Exception
+  {
+    InternalClientConnection conn =
+      InternalClientConnection.getRootConnection();
+    LDAPFilter filter = LDAPFilter.decode(
+        "("+ Historical.HISTORICALATTRIBUTENAME +
+        ">=dummy:" + fromChangeNumber + ")");
+    LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+    attrs.add(Historical.HISTORICALATTRIBUTENAME);
+    attrs.add(Historical.ENTRYUIDNAME);
+    return conn.processSearch(
+        new ASN1OctetString(baseDn.toString()),
+        SearchScope.WHOLE_SUBTREE,
+        DereferencePolicy.NEVER_DEREF_ALIASES,
+        0, 0, false, filter,
+        attrs,
+        resultListener);
+  }
+
+  /**
    * Start the heartbeat monitor thread.
    */
   private void startHeartBeat()

--
Gitblit v1.10.0