From d19acb303c4ff90e48fd98ce2d7ba739ca9ea2db Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 18 Nov 2009 16:55:52 +0000
Subject: [PATCH] Fix for Issue 4300 : stop replication server cause OutOfMemoryError

---
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java |  372 ++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 268 insertions(+), 104 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index ca1cf7d..c85caf4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -114,6 +114,7 @@
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.RoutableMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.replication.service.ReplicationMonitor;
 import org.opends.server.tasks.TaskUtils;
@@ -174,9 +175,62 @@
  */
 public class LDAPReplicationDomain extends ReplicationDomain
        implements ConfigurationChangeListener<ReplicationDomainCfg>,
-                  AlertGenerator, InternalSearchListener
+                  AlertGenerator
 {
   /**
+   * This class is used in the session establishment phase
+   * when no Replication Server with all the local changes has been found
+   * and we therefore need to recover them.
+   * A search is then performed on the database using this
+   * internalSearchListener.
+   */
+  private class ScanSearchListener implements InternalSearchListener
+  {
+    private ChangeNumber startingChangeNumber = null;
+    private ChangeNumber endChangeNumber = null;
+
+    public ScanSearchListener(
+        ChangeNumber startingChangeNumber,
+        ChangeNumber endChangeNumber)
+    {
+      this.startingChangeNumber = startingChangeNumber;
+      this.endChangeNumber = endChangeNumber;
+    }
+
+    @Override
+    public void handleInternalSearchEntry(
+        InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
+        throws DirectoryException
+    {
+      // Build the list of Operations that happened on this entry
+      // after startingChangeNumber and before endChangeNumber and
+      // add them to the replayOperations list
+      Iterable<FakeOperation> updates =
+        Historical.generateFakeOperations(searchEntry);
+
+      for (FakeOperation op : updates)
+      {
+        ChangeNumber cn = op.getChangeNumber();
+        if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber)))
+        {
+          synchronized (replayOperations)
+          {
+            replayOperations.put(cn, op);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void handleInternalSearchReference(
+        InternalSearchOperation searchOperation,
+        SearchResultReference searchReference) throws DirectoryException
+    {
+       // Nothing to do.
+    }
+  }
+
+  /**
    * The fully-qualified name of this class.
    */
   private static final String CLASS_NAME =
@@ -398,6 +452,80 @@
   }
 
   /**
+   * The thread that is responsible to update the RS to which this domain is
+   * connected in case it is late and there is no RS which is up to date.
+   */
+  private class RSUpdater extends DirectoryThread
+  {
+    private ChangeNumber startChangeNumber;
+    protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
+    {
+      super("Replication Server Updater for server id " +
+            serverId + " and domain " + baseDn.toString());
+      this.startChangeNumber = replServerMaxChangeNumber;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run()
+    {
+      // Replication server is missing some of our changes: let's
+      // send them to him.
+      Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
+      logError(message);
+
+      /*
+       * Get all the changes that have not been seen by this
+       * replication server and publish them.
+       */
+      try
+      {
+        if (buildAndPublishMissingChanges(startChangeNumber, broker))
+        {
+          message = DEBUG_CHANGES_SENT.get();
+          logError(message);
+          synchronized(replayOperations)
+          {
+            replayOperations.clear();
+          }
+        }
+        else
+        {
+          /*
+           * An error happened trying to search for the updates
+           * This server will start accepting again new updates but
+           * some inconsistencies will stay between servers.
+           * Log an error for the repair tool
+           * that will need to re-synchronize the servers.
+           */
+          message = ERR_CANNOT_RECOVER_CHANGES.get(
+              baseDn.toNormalizedString());
+          logError(message);
+        }
+      } catch (Exception e)
+      {
+        /*
+         * An error happened trying to search for the updates
+         * This server will start accepting again new updates but
+         * some inconsistencies will stay between servers.
+         * Log an error for the repair tool
+         * that will need to re-synchronize the servers.
+         */
+        message = ERR_CANNOT_RECOVER_CHANGES.get(
+            baseDn.toNormalizedString());
+        logError(message);
+      }
+      finally
+      {
+        broker.setRecoveryRequired(false);
+      }
+    }
+  }
+
+
+  /**
    * Creates a new ReplicationDomain using configuration from configEntry.
    *
    * @param configuration    The configuration of this ReplicationDomain.
@@ -490,9 +618,6 @@
       saveGenerationId(generationId);
     }
 
-    startPublishService(replicationServers, window, heartbeatInterval,
-        configuration.getChangetimeHeartbeatInterval());
-
     /*
      * ChangeNumberGenerator is used to create new unique ChangeNumbers
      * for each operation done on this replication domain.
@@ -505,6 +630,9 @@
     pendingChanges =
       new PendingChanges(generator, this);
 
+    startPublishService(replicationServers, window, heartbeatInterval,
+        configuration.getChangetimeHeartbeatInterval());
+
     remotePendingChanges = new RemotePendingChanges(getServerState());
 
     // listen for changes on the configuration
@@ -4356,74 +4484,9 @@
         if ((ourMaxChangeNumber != null) &&
             (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
         {
-
-          // Replication server is missing some of our changes: let's
-          // send them to him.
-
-          Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
-          logError(message);
-
-          /*
-           * Get all the changes that have not been seen by this
-           * replication server and populate the replayOperations
-           * list.
-           */
-          InternalSearchOperation op = searchForChangedEntries(
-              baseDn, replServerMaxChangeNumber, this);
-          if (op.getResultCode() != ResultCode.SUCCESS)
-          {
-            /*
-             * An error happened trying to search for the updates
-             * This server will start accepting again new updates but
-             * some inconsistencies will stay between servers.
-             * Log an error for the repair tool
-             * that will need to re-synchronize the servers.
-             */
-            message = ERR_CANNOT_RECOVER_CHANGES.get(
-                baseDn.toNormalizedString());
-            logError(message);
-          } else
-          {
-            for (FakeOperation replayOp :
-              replayOperations.tailMap(replServerMaxChangeNumber).values())
-            {
-              ChangeNumber cn = replayOp.getChangeNumber();
-              /*
-               * Because the entry returned by the search operation
-               * can contain old historical information, it is
-               * possible that some of the FakeOperation are
-               * actually older than the last ChangeNumber known by
-               * the Replication Server.
-               * In such case don't send the operation.
-               */
-              if (!cn.newer(replServerMaxChangeNumber))
-              {
-                continue;
-              }
-
-              /*
-               * Check if the DeleteOperation has been abandoned before
-               * being processed. This is necessary because the replayOperation
-               *
-               */
-              if (replayOp instanceof FakeDelOperation)
-              {
-                FakeDelOperation delOp = (FakeDelOperation) replayOp;
-                if (findEntryDN(delOp.getUUID()) != null)
-                {
-                  continue;
-                }
-              }
-              message =
-                DEBUG_SENDING_CHANGE.get(
-                    replayOp.getChangeNumber().toString());
-              logError(message);
-              session.publish(replayOp.generateMessage());
-            }
-            message = DEBUG_CHANGES_SENT.get();
-            logError(message);
-          }
-          replayOperations.clear();
+          pendingChanges.setRecovering(true);
+          broker.setRecoveryRequired(true);
+          new RSUpdater(replServerMaxChangeNumber).start();
         }
       }
     } catch (Exception e)
@@ -4437,19 +4500,124 @@
   }
 
   /**
+   * Build the list of changes that have been processed by this server
+   * after the ChangeNumber given as a parameter and publish them
+   * using the given session.
+   *
+   * @param startingChangeNumber  The ChangeNumber whe we need to start the
+   *                              search
+   * @param session               The session to use to publish the changes
+   *
+   * @return                      A boolean indicating he success of the
+   *                              operation.
+   * @throws Exception            if an Exception happens during the search.
+   */
+  public boolean buildAndPublishMissingChanges(
+      ChangeNumber startingChangeNumber,
+      ReplicationBroker session)
+      throws Exception
+  {
+    // Trim the changes in replayOperations that are older than
+    // the startingChangeNumber.
+    synchronized (replayOperations)
+    {
+      Iterator<ChangeNumber> it = replayOperations.keySet().iterator();
+      while (it.hasNext())
+      {
+        if (it.next().olderOrEqual(startingChangeNumber))
+        {
+          it.remove();
+        }
+        else
+        {
+          break;
+        }
+      }
+    }
+
+    ChangeNumber lastRetrievedChange = null;
+    long missingChangesDelta;
+    InternalSearchOperation op;
+    ChangeNumber currentStartChangeNumber = startingChangeNumber;
+    do
+    {
+      lastRetrievedChange = null;
+      // We can't do the search in one go because we need to
+      // store the results so that we are sure we send the operations
+      // in order and because the list might be large
+      // So we search by interval of 10 seconds
+      // and store the results in the replayOperations list
+      // so that they are sorted before sending them.
+      missingChangesDelta = currentStartChangeNumber.getTime() + 10000;
+      ChangeNumber endChangeNumber =
+        new ChangeNumber(
+            missingChangesDelta, 0xffffffff, serverId);
+
+      ScanSearchListener listener =
+        new ScanSearchListener(currentStartChangeNumber, endChangeNumber);
+      op = searchForChangedEntries(
+          baseDn, currentStartChangeNumber, endChangeNumber, listener);
+
+      // Publish and remove all the changes from the replayOperations list
+      // that are older than the endChangeNumber.
+      LinkedList<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
+      synchronized (replayOperations)
+      {
+        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
+        while (itOp.hasNext())
+        {
+          FakeOperation fakeOp = itOp.next();
+          if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber))
+              && state.cover(fakeOp.getChangeNumber()))
+          {
+            lastRetrievedChange = fakeOp.getChangeNumber();
+            opsToSend.add(fakeOp);
+            itOp.remove();
+          }
+          else
+          {
+            break;
+          }
+        }
+      }
+
+      for (FakeOperation opToSend : opsToSend)
+      {
+          session.publishRecovery(opToSend.generateMessage());
+      }
+      opsToSend.clear();
+      if (lastRetrievedChange != null)
+      {
+        currentStartChangeNumber = lastRetrievedChange;
+      }
+      else
+      {
+        currentStartChangeNumber = endChangeNumber;
+      }
+
+    } while (pendingChanges.RecoveryUntil(lastRetrievedChange) &&
+             (op.getResultCode().equals(ResultCode.SUCCESS)));
+
+    return op.getResultCode().equals(ResultCode.SUCCESS);
+  }
+
+
+  /**
    * Search for the changes that happened since fromChangeNumber
    * based on the historical attribute. The only changes that will
    * be send will be the one generated on the serverId provided in
    * fromChangeNumber.
    * @param baseDn the base DN
-   * @param fromChangeNumber The change number from which we want the changes
-   * @param resultListener that will process the entries returned.
+   * @param fromChangeNumber The ChangeNumber from which we want the changes
+   * @param lastChangeNumber The max ChangeNumber that the search should return
+   * @param resultListener   The listener that will process the entries returned
    * @return the internal search operation
    * @throws Exception when raised.
    */
   public static InternalSearchOperation searchForChangedEntries(
     DN baseDn,
     ChangeNumber fromChangeNumber,
+    ChangeNumber lastChangeNumber,
     InternalSearchListener resultListener)
     throws Exception
   {
@@ -4457,8 +4625,16 @@
       InternalClientConnection.getRootConnection();
     Integer serverId = fromChangeNumber.getServerId();
 
-    String maxValueForId = "ffffffffffffffff" +
-      String.format("%04x", serverId) + "ffffffff";
+    String maxValueForId;
+    if (lastChangeNumber == null)
+    {
+      maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId)
+                      + "ffffffff";
+    }
+    else
+    {
+      maxValueForId = lastChangeNumber.toString();
+    }
 
     LDAPFilter filter = LDAPFilter.decode(
        "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
@@ -4479,36 +4655,24 @@
   }
 
   /**
-   * {@inheritDoc}
+   * Search for the changes that happened since fromChangeNumber
+   * based on the historical attribute. The only changes that will
+   * be send will be the one generated on the serverId provided in
+   * fromChangeNumber.
+   * @param baseDn the base DN
+   * @param fromChangeNumber The change number from which we want the changes
+   * @param resultListener that will process the entries returned.
+   * @return the internal search operation
+   * @throws Exception when raised.
    */
-  public void handleInternalSearchEntry(
-    InternalSearchOperation searchOperation,
-    SearchResultEntry searchEntry)
+  public static InternalSearchOperation searchForChangedEntries(
+    DN baseDn,
+    ChangeNumber fromChangeNumber,
+    InternalSearchListener resultListener)
+    throws Exception
   {
-    /*
-     * This call back is called at session establishment phase
-     * for each entry that has been changed by this server and the changes
-     * have not been sent to any Replication Server.
-     * The role of this method is to build equivalent operation from
-     * the historical information and add them in the replayOperations
-     * table.
-     */
-    Iterable<FakeOperation> updates =
-      Historical.generateFakeOperations(searchEntry);
-    for (FakeOperation op : updates)
-    {
-      replayOperations.put(op.getChangeNumber(), op);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public void handleInternalSearchReference(
-    InternalSearchOperation searchOperation,
-    SearchResultReference searchReference)
-  {
-    // TODO to be implemented
+    return searchForChangedEntries(
+        baseDn, fromChangeNumber, null, resultListener);
   }
 
 

--
Gitblit v1.10.0