From d19acb303c4ff90e48fd98ce2d7ba739ca9ea2db Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 18 Nov 2009 16:55:52 +0000
Subject: [PATCH] Fix for Issue 4300 : stop replication server cause OutOfMemoryError

---
 opends/src/server/org/opends/server/replication/plugin/PendingChanges.java                                    |   52 +++
 opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java                             |   12 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java |  335 ++++++++++++++-------
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                             |  372 +++++++++++++++++------
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                                |   72 +++
 opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java                                |   18 +
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                |   12 
 7 files changed, 634 insertions(+), 239 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index ca1cf7d..c85caf4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -114,6 +114,7 @@
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.RoutableMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.replication.service.ReplicationMonitor;
 import org.opends.server.tasks.TaskUtils;
@@ -174,9 +175,62 @@
  */
 public class LDAPReplicationDomain extends ReplicationDomain
        implements ConfigurationChangeListener<ReplicationDomainCfg>,
-                  AlertGenerator, InternalSearchListener
+                  AlertGenerator
 {
   /**
+   * This class is used in the session establishment phase
+   * when no Replication Server with all the local changes has been found
+   * and we therefore need to recover them.
+   * A search is then performed on the database using this
+   * internalSearchListener.
+   */
+  private class ScanSearchListener implements InternalSearchListener
+  {
+    private ChangeNumber startingChangeNumber = null;
+    private ChangeNumber endChangeNumber = null;
+
+    public ScanSearchListener(
+        ChangeNumber startingChangeNumber,
+        ChangeNumber endChangeNumber)
+    {
+      this.startingChangeNumber = startingChangeNumber;
+      this.endChangeNumber = endChangeNumber;
+    }
+
+    @Override
+    public void handleInternalSearchEntry(
+        InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
+        throws DirectoryException
+    {
+      // Build the list of Operations that happened on this entry
+      // after startingChangeNumber and before endChangeNumber and
+      // add them to the replayOperations list
+      Iterable<FakeOperation> updates =
+        Historical.generateFakeOperations(searchEntry);
+
+      for (FakeOperation op : updates)
+      {
+        ChangeNumber cn = op.getChangeNumber();
+        if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber)))
+        {
+          synchronized (replayOperations)
+          {
+            replayOperations.put(cn, op);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void handleInternalSearchReference(
+        InternalSearchOperation searchOperation,
+        SearchResultReference searchReference) throws DirectoryException
+    {
+       // Nothing to do.
+    }
+  }
+
+  /**
    * The fully-qualified name of this class.
    */
   private static final String CLASS_NAME =
@@ -398,6 +452,80 @@
   }
 
   /**
+   * The thread that is responsible to update the RS to which this domain is
+   * connected in case it is late and there is no RS which is up to date.
+   */
+  private class RSUpdater extends DirectoryThread
+  {
+    private ChangeNumber startChangeNumber;
+    protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
+    {
+      super("Replication Server Updater for server id " +
+            serverId + " and domain " + baseDn.toString());
+      this.startChangeNumber = replServerMaxChangeNumber;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run()
+    {
+      // Replication server is missing some of our changes: let's
+      // send them to him.
+      Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
+      logError(message);
+
+      /*
+       * Get all the changes that have not been seen by this
+       * replication server and publish them.
+       */
+      try
+      {
+        if (buildAndPublishMissingChanges(startChangeNumber, broker))
+        {
+          message = DEBUG_CHANGES_SENT.get();
+          logError(message);
+          synchronized(replayOperations)
+          {
+            replayOperations.clear();
+          }
+        }
+        else
+        {
+          /*
+           * An error happened trying to search for the updates
+           * This server will start accepting again new updates but
+           * some inconsistencies will stay between servers.
+           * Log an error for the repair tool
+           * that will need to re-synchronize the servers.
+           */
+          message = ERR_CANNOT_RECOVER_CHANGES.get(
+              baseDn.toNormalizedString());
+          logError(message);
+        }
+      } catch (Exception e)
+      {
+        /*
+         * An error happened trying to search for the updates
+         * This server will start accepting again new updates but
+         * some inconsistencies will stay between servers.
+         * Log an error for the repair tool
+         * that will need to re-synchronize the servers.
+         */
+        message = ERR_CANNOT_RECOVER_CHANGES.get(
+            baseDn.toNormalizedString());
+        logError(message);
+      }
+      finally
+      {
+        broker.setRecoveryRequired(false);
+      }
+    }
+  }
+
+
+  /**
    * Creates a new ReplicationDomain using configuration from configEntry.
    *
    * @param configuration    The configuration of this ReplicationDomain.
@@ -490,9 +618,6 @@
       saveGenerationId(generationId);
     }
 
-    startPublishService(replicationServers, window, heartbeatInterval,
-        configuration.getChangetimeHeartbeatInterval());
-
     /*
      * ChangeNumberGenerator is used to create new unique ChangeNumbers
      * for each operation done on this replication domain.
@@ -505,6 +630,9 @@
     pendingChanges =
       new PendingChanges(generator, this);
 
+    startPublishService(replicationServers, window, heartbeatInterval,
+        configuration.getChangetimeHeartbeatInterval());
+
     remotePendingChanges = new RemotePendingChanges(getServerState());
 
     // listen for changes on the configuration
@@ -4356,74 +4484,9 @@
         if ((ourMaxChangeNumber != null) &&
             (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
         {
-
-          // Replication server is missing some of our changes: let's
-          // send them to him.
-
-          Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
-          logError(message);
-
-          /*
-           * Get all the changes that have not been seen by this
-           * replication server and populate the replayOperations
-           * list.
-           */
-          InternalSearchOperation op = searchForChangedEntries(
-              baseDn, replServerMaxChangeNumber, this);
-          if (op.getResultCode() != ResultCode.SUCCESS)
-          {
-            /*
-             * An error happened trying to search for the updates
-             * This server will start accepting again new updates but
-             * some inconsistencies will stay between servers.
-             * Log an error for the repair tool
-             * that will need to re-synchronize the servers.
-             */
-            message = ERR_CANNOT_RECOVER_CHANGES.get(
-                baseDn.toNormalizedString());
-            logError(message);
-          } else
-          {
-            for (FakeOperation replayOp :
-              replayOperations.tailMap(replServerMaxChangeNumber).values())
-            {
-              ChangeNumber cn = replayOp.getChangeNumber();
-              /*
-               * Because the entry returned by the search operation
-               * can contain old historical information, it is
-               * possible that some of the FakeOperation are
-               * actually older than the last ChangeNumber known by
-               * the Replication Server.
-               * In such case don't send the operation.
-               */
-              if (!cn.newer(replServerMaxChangeNumber))
-              {
-                continue;
-              }
-
-              /*
-               * Check if the DeleteOperation has been abandoned before
-               * being processed. This is necessary because the replayOperation
-               *
-               */
-              if (replayOp instanceof FakeDelOperation)
-              {
-                FakeDelOperation delOp = (FakeDelOperation) replayOp;
-                if (findEntryDN(delOp.getUUID()) != null)
-                {
-                  continue;
-                }
-              }
-              message =
-                DEBUG_SENDING_CHANGE.get(
-                    replayOp.getChangeNumber().toString());
-              logError(message);
-              session.publish(replayOp.generateMessage());
-            }
-            message = DEBUG_CHANGES_SENT.get();
-            logError(message);
-          }
-          replayOperations.clear();
+          pendingChanges.setRecovering(true);
+          broker.setRecoveryRequired(true);
+          new RSUpdater(replServerMaxChangeNumber).start();
         }
       }
     } catch (Exception e)
@@ -4437,19 +4500,124 @@
   }
 
   /**
+   * Build the list of changes that have been processed by this server
+   * after the ChangeNumber given as a parameter and publish them
+   * using the given session.
+   *
+   * @param startingChangeNumber  The ChangeNumber whe we need to start the
+   *                              search
+   * @param session               The session to use to publish the changes
+   *
+   * @return                      A boolean indicating he success of the
+   *                              operation.
+   * @throws Exception            if an Exception happens during the search.
+   */
+  public boolean buildAndPublishMissingChanges(
+      ChangeNumber startingChangeNumber,
+      ReplicationBroker session)
+      throws Exception
+  {
+    // Trim the changes in replayOperations that are older than
+    // the startingChangeNumber.
+    synchronized (replayOperations)
+    {
+      Iterator<ChangeNumber> it = replayOperations.keySet().iterator();
+      while (it.hasNext())
+      {
+        if (it.next().olderOrEqual(startingChangeNumber))
+        {
+          it.remove();
+        }
+        else
+        {
+          break;
+        }
+      }
+    }
+
+    ChangeNumber lastRetrievedChange = null;
+    long missingChangesDelta;
+    InternalSearchOperation op;
+    ChangeNumber currentStartChangeNumber = startingChangeNumber;
+    do
+    {
+      lastRetrievedChange = null;
+      // We can't do the search in one go because we need to
+      // store the results so that we are sure we send the operations
+      // in order and because the list might be large
+      // So we search by interval of 10 seconds
+      // and store the results in the replayOperations list
+      // so that they are sorted before sending them.
+      missingChangesDelta = currentStartChangeNumber.getTime() + 10000;
+      ChangeNumber endChangeNumber =
+        new ChangeNumber(
+            missingChangesDelta, 0xffffffff, serverId);
+
+      ScanSearchListener listener =
+        new ScanSearchListener(currentStartChangeNumber, endChangeNumber);
+      op = searchForChangedEntries(
+          baseDn, currentStartChangeNumber, endChangeNumber, listener);
+
+      // Publish and remove all the changes from the replayOperations list
+      // that are older than the endChangeNumber.
+      LinkedList<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
+      synchronized (replayOperations)
+      {
+        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
+        while (itOp.hasNext())
+        {
+          FakeOperation fakeOp = itOp.next();
+          if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber))
+              && state.cover(fakeOp.getChangeNumber()))
+          {
+            lastRetrievedChange = fakeOp.getChangeNumber();
+            opsToSend.add(fakeOp);
+            itOp.remove();
+          }
+          else
+          {
+            break;
+          }
+        }
+      }
+
+      for (FakeOperation opToSend : opsToSend)
+      {
+          session.publishRecovery(opToSend.generateMessage());
+      }
+      opsToSend.clear();
+      if (lastRetrievedChange != null)
+      {
+        currentStartChangeNumber = lastRetrievedChange;
+      }
+      else
+      {
+        currentStartChangeNumber = endChangeNumber;
+      }
+
+    } while (pendingChanges.RecoveryUntil(lastRetrievedChange) &&
+             (op.getResultCode().equals(ResultCode.SUCCESS)));
+
+    return op.getResultCode().equals(ResultCode.SUCCESS);
+  }
+
+
+  /**
    * Search for the changes that happened since fromChangeNumber
    * based on the historical attribute. The only changes that will
    * be send will be the one generated on the serverId provided in
    * fromChangeNumber.
    * @param baseDn the base DN
-   * @param fromChangeNumber The change number from which we want the changes
-   * @param resultListener that will process the entries returned.
+   * @param fromChangeNumber The ChangeNumber from which we want the changes
+   * @param lastChangeNumber The max ChangeNumber that the search should return
+   * @param resultListener   The listener that will process the entries returned
    * @return the internal search operation
    * @throws Exception when raised.
    */
   public static InternalSearchOperation searchForChangedEntries(
     DN baseDn,
     ChangeNumber fromChangeNumber,
+    ChangeNumber lastChangeNumber,
     InternalSearchListener resultListener)
     throws Exception
   {
@@ -4457,8 +4625,16 @@
       InternalClientConnection.getRootConnection();
     Integer serverId = fromChangeNumber.getServerId();
 
-    String maxValueForId = "ffffffffffffffff" +
-      String.format("%04x", serverId) + "ffffffff";
+    String maxValueForId;
+    if (lastChangeNumber == null)
+    {
+      maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId)
+                      + "ffffffff";
+    }
+    else
+    {
+      maxValueForId = lastChangeNumber.toString();
+    }
 
     LDAPFilter filter = LDAPFilter.decode(
        "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
@@ -4479,36 +4655,24 @@
   }
 
   /**
-   * {@inheritDoc}
+   * Search for the changes that happened since fromChangeNumber
+   * based on the historical attribute. The only changes that will
+   * be send will be the one generated on the serverId provided in
+   * fromChangeNumber.
+   * @param baseDn the base DN
+   * @param fromChangeNumber The change number from which we want the changes
+   * @param resultListener that will process the entries returned.
+   * @return the internal search operation
+   * @throws Exception when raised.
    */
-  public void handleInternalSearchEntry(
-    InternalSearchOperation searchOperation,
-    SearchResultEntry searchEntry)
+  public static InternalSearchOperation searchForChangedEntries(
+    DN baseDn,
+    ChangeNumber fromChangeNumber,
+    InternalSearchListener resultListener)
+    throws Exception
   {
-    /*
-     * This call back is called at session establishment phase
-     * for each entry that has been changed by this server and the changes
-     * have not been sent to any Replication Server.
-     * The role of this method is to build equivalent operation from
-     * the historical information and add them in the replayOperations
-     * table.
-     */
-    Iterable<FakeOperation> updates =
-      Historical.generateFakeOperations(searchEntry);
-    for (FakeOperation op : updates)
-    {
-      replayOperations.put(op.getChangeNumber(), op);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public void handleInternalSearchReference(
-    InternalSearchOperation searchOperation,
-    SearchResultReference searchReference)
-  {
-    // TODO to be implemented
+    return searchForChangedEntries(
+        baseDn, fromChangeNumber, null, resultListener);
   }
 
 
diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index ad1207d..72414a3 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -212,7 +212,14 @@
       {
         numSentUpdates++;
         LDAPUpdateMsg updateMsg = firstChange.getMsg();
-        domain.publish(updateMsg);
+        if (!recoveringOldChanges)
+        {
+          domain.publish(updateMsg);
+        }
+        else
+        {
+          domain.getServerState().update(updateMsg.getChangeNumber());
+        }
       }
       pendingChanges.remove(firstChangeNumber);
 
@@ -248,4 +255,47 @@
     _commit(changeNumber, msg);
     return _pushCommittedChanges();
   }
+
+  private boolean recoveringOldChanges = false;
+  /**
+   * Set the PendingChangesList structure in a mode where it is
+   * waiting for the RS to receive all the previous changes to
+   * be sent before starting to process the changes normally.
+   * In this mode, The Domain does not publish the changes from
+   * the pendingChanges because there are older changes that
+   * need to be published before.
+   *
+   * @param b The recovering status that must be set.
+   */
+  public void setRecovering(boolean b)
+  {
+    recoveringOldChanges = b;
+  }
+
+  /**
+   * Allows to update the recovery situation by comparing the ChangeNumber of
+   * the last change that was sent to the ReplicationServer with the
+   * ChangeNumber of the last operation that was taken out of the
+   * PendingChanges list.
+   * If he two match then the recovery is completed and normal procedure can
+   * restart. Otherwise the RSUpdate thread must continue to look for
+   * older changes and no changes can be committed from the pendingChanges list.
+   *
+   * @param recovered  The ChangeNumber of the last change that was published
+   *                   to the ReplicationServer.
+   *
+   * @return           A boolean indicating if the recovery is completed (false)
+   *                   or must continue (true).
+   */
+
+  public synchronized boolean RecoveryUntil(ChangeNumber recovered)
+  {
+    ChangeNumber lastLocalChange = domain.getLastLocalChange();
+
+    if ((recovered != null) && (recovered.newerOrEquals(lastLocalChange)))
+    {
+      recoveringOldChanges = false;
+    }
+    return recoveringOldChanges;
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java b/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
index 1f67c46..a4010f9 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -123,6 +123,18 @@
   }
 
   /**
+   * Checks that the ChangeNumber given as a parameter is in this ServerState.
+   *
+   * @param   covered The ChangeNumber that should be checked.
+   * @return  A boolean indicating if this ServerState contains the ChangeNumber
+   *          given in parameter.
+   */
+  public boolean cover(ChangeNumber covered)
+  {
+    return state.cover(covered);
+  }
+
+  /**
    * Update the Server State with a ChangeNumber.
    * All operations with smaller CSN and the same serverID must be committed
    * before calling this method.
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c1eb5e5..24fc481 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -188,6 +188,7 @@
 
   private long generationID;
   private int updateDoneCount = 0;
+  private boolean connectRequiresRecovery = false;
 
   /**
    * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -694,6 +695,21 @@
                 rsServerId = serverInfo.getServerId();
                 rsServerUrl = bestServer;
 
+                receiveTopo(topologyMsg);
+
+                // Log a message to let the administrator know that the failure
+                // was resolved.
+                // Wakeup all the thread that were waiting on the window
+                // on the previous connection.
+                connectionError = false;
+                if (sendWindow != null)
+                {
+                  sendWindow.release(Integer.MAX_VALUE);
+                }
+                sendWindow = new Semaphore(maxSendWindow);
+                rcvWindow = maxRcvWindow;
+                connected = true;
+
                 // May have created a broker with null replication domain for
                 // unit test purpose.
                 if (domain != null)
@@ -703,8 +719,7 @@
                       serverInfo.getGenerationId(),
                       session);
                 }
-                receiveTopo(topologyMsg);
-                connected = true;
+
                 if (getRsGroupId() != groupId)
                 {
                  // Connected to replication server with wrong group id:
@@ -766,17 +781,6 @@
 
       if (connected)
       {
-        // Log a message to let the administrator know that the failure was
-        // resolved.
-        // Wakeup all the thread that were waiting on the window
-        // on the previous connection.
-        connectionError = false;
-        if (sendWindow != null)
-        {
-          sendWindow.release(Integer.MAX_VALUE);
-        }
-        sendWindow = new Semaphore(maxSendWindow);
-        rcvWindow = maxRcvWindow;
         connectPhaseLock.notify();
 
         if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
@@ -1786,6 +1790,25 @@
    */
   public void publish(ReplicationMsg msg)
   {
+    _publish(msg, false);
+  }
+
+  /**
+   * Publish a recovery message to the other servers.
+   * @param msg the message to publish
+   */
+  public void publishRecovery(ReplicationMsg msg)
+  {
+    _publish(msg, true);
+  }
+
+  /**
+   * Publish a message to the other servers.
+   * @param msg the message to publish
+   * @param recoveryMsg the message is a recovery Message
+   */
+  void _publish(ReplicationMsg msg, boolean recoveryMsg)
+  {
     boolean done = false;
 
     while (!done && !shutdown)
@@ -1825,6 +1848,15 @@
           currentWindowSemaphore = sendWindow;
         }
 
+        // If the Replication domain has decided that there is a need to
+        // recover some changes then it is not allowed to send this
+        // change but it will be the responsibility of the recovery thread to
+        // do it.
+        if (!recoveryMsg & connectRequiresRecovery)
+        {
+          return;
+        }
+
         if (msg instanceof UpdateMsg)
         {
           // Acquiring the window credit must be done outside of the
@@ -2548,4 +2580,18 @@
       ctHeartbeatPublisherThread = null;
     }
   }
+
+  /**
+   * Set the connectRequiresRecovery to the provided value.
+   * This flag is used to indicate if a recovery of Update is necessary
+   * after a reconnection to a RS.
+   * It is the responsibility of the ReplicationDomain to set it during the
+   * sessionInitiated phase.
+   *
+   * @param b the new value of the connectRequiresRecovery.
+   */
+  public void setRecoveryRequired(boolean b)
+  {
+    connectRequiresRecovery = b;
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 893bfa7..96bc116 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -2909,4 +2909,16 @@
   {
     return eClIncludes;
   }
+
+  /**
+   * Returns the ChangeNUmber of the last Change that was fully processed
+   * by this ReplicationDomain.
+   *
+   * @return The ChangeNUmber of the last Change that was fully processed
+   *         by this ReplicationDomain.
+   */
+  public ChangeNumber getLastLocalChange()
+  {
+    return state.getMaxChangeNumber(serverID);
+  }
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index a46136a..7ef92f1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1231,17 +1231,31 @@
    * Deletes the provided entry from the Directory Server using an
    * internal operation.
    *
-   * @param  entry  The entry to be added.
+   * @param  entry  The entry to be deleted.
    *
    * @throws  Exception  If an unexpected problem occurs.
    */
   public static void deleteEntry(Entry entry)
          throws Exception
   {
+    deleteEntry(entry.getDN());
+  }
+
+  /**
+   * Deletes the provided entry from the Directory Server using an
+   * internal operation.
+   *
+   * @param  dn  The dn of entry to be deleted
+   *
+   * @throws  Exception  If an unexpected problem occurs.
+   */
+  public static void deleteEntry(DN dn)
+         throws Exception
+  {
     InternalClientConnection conn =
          InternalClientConnection.getRootConnection();
 
-    DeleteOperation deleteOperation = conn.processDelete(entry.getDN());
+    DeleteOperation deleteOperation = conn.processDelete(dn);
     assertEquals(deleteOperation.getResultCode(), ResultCode.SUCCESS);
   }
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
index e92b6f9..8f19763 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -26,127 +26,71 @@
  */
 package org.opends.server.replication.plugin;
 
+import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
 import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
+import java.io.IOException;
 import java.net.ServerSocket;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
 import org.opends.server.TestCaseUtils;
-import org.opends.server.core.AddOperationBasis;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
+import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
-import org.opends.server.protocols.internal.InternalClientConnection;
-import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
+import org.opends.server.replication.protocol.ModifyMsg;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeType;
 import org.opends.server.types.AttributeValue;
 import org.opends.server.types.ByteString;
 import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SearchResultEntry;
-import org.testng.annotations.BeforeClass;
+import org.opends.server.util.TimeThread;
 import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
 
 /**
  * Test the usage of the historical data of the replication.
  */
 public class HistoricalCsnOrderingTest
-extends ReplicationTestCase
+       extends ReplicationTestCase
 {
-  /**
-   * A "person" entry
-   */
-  protected Entry personEntry;
-  private int replServerPort;
+  final int serverId = 123;
 
-  /**
-   * Set up the environment for performing the tests in this Class.
-   *
-   * @throws Exception
-   *           If the environment could not be set up.
-   */
-  @BeforeClass
-  @Override
-  public void setUp() throws Exception
+  public class TestBroker extends ReplicationBroker
   {
-    super.setUp();
+    LinkedList<ReplicationMsg> list = null;
 
-    // Create necessary backend top level entry
-    String topEntry = "dn: ou=People," + TEST_ROOT_DN_STRING + "\n"
-        + "objectClass: top\n"
-        + "objectClass: organizationalUnit\n"
-        + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
-    addEntry(TestCaseUtils.entryFromLdifString(topEntry));
+    public TestBroker(LinkedList<ReplicationMsg> list)
+    {
+      super(null, null, null, 0, 0, (long) 0, (long) 0, null, (byte) 0, (long) 0);
+      this.list = list;
+    }
 
-    // find  a free port for the replicationServer
-    ServerSocket socket = TestCaseUtils.bindFreePort();
-    replServerPort = socket.getLocalPort();
-    socket.close();
+    public void publishRecovery(ReplicationMsg msg)
+    {
+      list.add(msg);
+    }
 
-    // replication server
-    String replServerLdif =
-      "dn: cn=Replication Server, " + SYNCHRO_PLUGIN_DN + "\n"
-      + "objectClass: top\n"
-      + "objectClass: ds-cfg-replication-server\n"
-      + "cn: Replication Server\n"
-      + "ds-cfg-replication-port: " + replServerPort + "\n"
-      + "ds-cfg-replication-db-directory: HistoricalCsnOrderingTestDb\n"
-      + "ds-cfg-replication-server-id: 101\n";
-    replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif);
 
-    // suffix synchronized
-    String testName = "historicalCsnOrderingTest";
-    String synchroServerLdif =
-      "dn: cn=" + testName + ", cn=domains, " + SYNCHRO_PLUGIN_DN + "\n"
-      + "objectClass: top\n"
-      + "objectClass: ds-cfg-replication-domain\n"
-      + "cn: " + testName + "\n"
-      + "ds-cfg-base-dn: ou=People," + TEST_ROOT_DN_STRING + "\n"
-      + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
-      + "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n";
-    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
-
-    String personLdif = "dn: uid=user.1,ou=People," + TEST_ROOT_DN_STRING + "\n"
-      + "objectClass: top\n" + "objectClass: person\n"
-      + "objectClass: organizationalPerson\n"
-      + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
-      + "homePhone: 951-245-7634\n"
-      + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
-      + "mobile: 027-085-0537\n"
-      + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
-      + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
-      + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
-      + "street: 17984 Thirteenth Street\n"
-      + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
-      + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
-      + "userPassword: password\n" + "initials: AA\n";
-    personEntry = TestCaseUtils.entryFromLdifString(personLdif);
-
-    configureReplication();
-  }
-
-  /**
-   * Add an entry in the database
-   *
-   */
-  private void addEntry(Entry entry) throws Exception
-  {
-    AddOperationBasis addOp = new AddOperationBasis(connection,
-        InternalClientConnection.nextOperationID(), InternalClientConnection
-        .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
-        entry.getUserAttributes(), entry.getOperationalAttributes());
-    addOp.setInternalOperation(true);
-    addOp.run();
-    assertNotNull(getEntry(entry.getDN(), 1000, true));
   }
 
   /**
@@ -182,10 +126,19 @@
    * informations.
    */
   @Test()
-  public void changesCmpTest()
+  public void buildAndPublishMissingChangesOneEntryTest()
   throws Exception
   {
-    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
+    final int serverId = 123;
+    final DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
+    TestCaseUtils.initializeTestBackend(true);
+    ReplicationServer rs = createReplicationServer();
+    // Create Replication Server and Domain
+    LDAPReplicationDomain rd1 = createReplicationDomain(serverId);
+
+    try
+    {
+      long startTime = TimeThread.getTime();
     final DN dn1 = DN.decode("cn=test1," + baseDn.toString());
     final AttributeType histType =
       DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
@@ -246,39 +199,183 @@
           "Second historical value:" + av.getValue().toString()));
     }
 
+    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
+    TestBroker session = new TestBroker(opList);
+
+    boolean result =
+      rd1.buildAndPublishMissingChanges(
+          new ChangeNumber(startTime, 0, serverId),
+          session);
+    assertTrue(result, "buildAndPublishMissingChanges has failed");
+    assertEquals(opList.size(), 3, "buildAndPublishMissingChanges should return 3 operations");
+    assertTrue(opList.getFirst().getClass().equals(AddMsg.class));
+
+
     // Build a change number from the first modification
     String hv[] = histValue.split(":");
-    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
-        hv[1]));
-    ChangeNumber fromChangeNumber =
-      new ChangeNumber(hv[1]);
+    logError(Message.raw(Category.SYNC, Severity.INFORMATION, hv[1]));
+    ChangeNumber fromChangeNumber = new ChangeNumber(hv[1]);
 
-    // Retrieves the entries that have changed since the first modification
-    InternalSearchOperation op =
-      LDAPReplicationDomain.searchForChangedEntries(
-          baseDn, fromChangeNumber, null);
+    opList = new LinkedList<ReplicationMsg>();
+    session = new TestBroker(opList);
 
-    // The expected result is one entry .. the one previously modified
-    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
-    assertEquals(op.getSearchEntries().size(), 1);
-
-    // From the historical of this entry, rebuild operations
-    // Since there have been 2 modifications and 1 add, there should be 3
-    // operations rebuild from this state.
-    int updatesCnt = 0;
-    for (SearchResultEntry searchEntry : op.getSearchEntries())
-    {
-      logError(Message.raw(Category.SYNC, Severity.INFORMATION,
-          searchEntry.toString()));
-      Iterable<FakeOperation> updates =
-        Historical.generateFakeOperations(searchEntry);
-      for (FakeOperation fop : updates)
-      {
-        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
-            fop.generateMessage().toString()));
-        updatesCnt++;
-      }
+    result =
+      rd1.buildAndPublishMissingChanges(
+          fromChangeNumber,
+          session);
+    assertTrue(result, "buildAndPublishMissingChanges has failed");
+    assertEquals(opList.size(), 1, "buildAndPublishMissingChanges should return 1 operation");
+    assertTrue(opList.getFirst().getClass().equals(ModifyMsg.class));
     }
-    assertTrue(updatesCnt == 3);
+    finally
+    {
+      MultimasterReplication.deleteDomain(baseDn);
+      rs.remove();
+    }
+  }
+
+  /**
+   * Test that we can retrieve the entries that were missed by
+   * a replication server and can  re-build operations from the historical
+   * informations.
+   */
+  @Test()
+  public void buildAndPublishMissingChangesSeveralEntriesTest()
+  throws Exception
+  {
+    final DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
+    TestCaseUtils.initializeTestBackend(true);
+    ReplicationServer rs = createReplicationServer();
+    // Create Replication Server and Domain
+    LDAPReplicationDomain rd1 = createReplicationDomain(serverId);
+    long startTime = TimeThread.getTime();
+
+    try
+    {
+    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
+    "Starting replication test : changesCmpTest"));
+
+    // Add 3 entries.
+    String dnTest1 = "cn=test1," + baseDn.toString();
+    String dnTest2 = "cn=test2," + baseDn.toString();
+    String dnTest3 = "cn=test3," + baseDn.toString();
+    TestCaseUtils.addEntry(
+        "dn: " + dnTest3,
+        "displayname: Test1",
+        "objectClass: top",
+        "objectClass: person",
+        "objectClass: organizationalPerson",
+        "objectClass: inetOrgPerson",
+        "cn: test1",
+        "sn: test"
+    );
+    TestCaseUtils.addEntry(
+        "dn: " + dnTest1,
+        "displayname: Test1",
+        "objectClass: top",
+        "objectClass: person",
+        "objectClass: organizationalPerson",
+        "objectClass: inetOrgPerson",
+        "cn: test1",
+        "sn: test"
+    );
+    TestCaseUtils.deleteEntry(DN.decode(dnTest3));
+    TestCaseUtils.addEntry(
+        "dn: " + dnTest2,
+        "displayname: Test1",
+        "objectClass: top",
+        "objectClass: person",
+        "objectClass: organizationalPerson",
+        "objectClass: inetOrgPerson",
+        "cn: test1",
+        "sn: test"
+    );
+
+    // Perform modifications on the 2 entries
+    int resultCode = TestCaseUtils.applyModifications(false,
+        "dn: cn=test2," + baseDn.toString(),
+        "changetype: modify",
+        "add: description",
+    "description: foo");
+    resultCode = TestCaseUtils.applyModifications(false,
+        "dn: cn=test1," + baseDn.toString(),
+        "changetype: modify",
+        "add: description",
+    "description: foo");
+    assertEquals(resultCode, 0);
+
+    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
+    TestBroker session = new TestBroker(opList);
+
+    // Call the buildAndPublishMissingChanges and check that this method
+    // correctly generates the 4 operations in the correct order.
+    boolean result =
+      rd1.buildAndPublishMissingChanges(
+          new ChangeNumber(startTime, 0, serverId),
+          session);
+    assertTrue(result, "buildAndPublishMissingChanges has failed");
+    assertEquals(opList.size(), 5, "buildAndPublishMissingChanges should return 5 operations");
+    ReplicationMsg msg = opList.removeFirst();
+    assertTrue(msg.getClass().equals(AddMsg.class));
+    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest1);
+    msg = opList.removeFirst();
+    assertTrue(msg.getClass().equals(DeleteMsg.class));
+    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest3);
+    msg = opList.removeFirst();
+    assertTrue(msg.getClass().equals(AddMsg.class));
+    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest2);
+    msg = opList.removeFirst();
+    assertTrue(msg.getClass().equals(ModifyMsg.class));
+    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest2);
+    msg = opList.removeFirst();
+    assertTrue(msg.getClass().equals(ModifyMsg.class));
+    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest1);
+    }
+    finally
+    {
+      MultimasterReplication.deleteDomain(baseDn);
+      rs.remove();
+    }
+  }
+
+  SortedSet<String> replServers = new TreeSet<String>();
+  private ReplicationServer createReplicationServer() throws ConfigException
+  {
+    int rsPort;
+    try
+    {
+      ServerSocket socket1 = TestCaseUtils.bindFreePort();
+      rsPort = socket1.getLocalPort();
+      socket1.close();
+      replServers.add("localhost:" + rsPort);
+
+
+      ReplServerFakeConfiguration conf =
+        new ReplServerFakeConfiguration(rsPort, "HistoricalCsnOrdering",
+            0, 1, 0, 100, replServers, 1, 1000, 5000);
+      ReplicationServer replicationServer = new ReplicationServer(conf);
+      replicationServer.clearDb();
+      return replicationServer;
+    }
+    catch (IOException e)
+    {
+      fail("Unable to determinate some free ports " +
+          stackTraceToSingleLineString(e));
+      return null;
+    }
+  }
+
+  private LDAPReplicationDomain createReplicationDomain(int dsId)
+          throws DirectoryException, ConfigException
+  {
+    DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
+    DomainFakeCfg domainConf =
+      new DomainFakeCfg(baseDn, dsId, replServers, AssuredType.NOT_ASSURED,
+      2, 1, 0, null);
+    LDAPReplicationDomain replicationDomain =
+      MultimasterReplication.createNewDomain(domainConf);
+    replicationDomain.start();
+
+    return replicationDomain;
   }
 }

--
Gitblit v1.10.0