From 88e5620001d65afa8d0d8e07d1361fa44705743e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 11 May 2007 13:19:28 +0000
Subject: [PATCH] This code allows the replication code to replay operation in the correct order when operation have dependencies (like adding child entry after parent)

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java |  428 ++++++++++++++++++++++++++---------------------------
 1 files changed, 209 insertions(+), 219 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index e0a9cb7..93d7ba1 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -49,6 +49,7 @@
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -128,17 +129,14 @@
 {
   private ReplicationMonitor monitor;
 
-  private ChangeNumberGenerator changeNumberGenerator;
   private ReplicationBroker broker;
 
   private List<ListenerThread> synchroThreads =
     new ArrayList<ListenerThread>();
-  private final SortedMap<ChangeNumber, PendingChange> pendingChanges =
-    new TreeMap<ChangeNumber, PendingChange>();
   private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
     new TreeMap<ChangeNumber, UpdateMessage>();
-  private int numRcvdUpdates = 0;
-  private int numSentUpdates = 0;
+  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
+  private AtomicInteger numSentUpdates = new AtomicInteger(0);
   private AtomicInteger numProcessedUpdates = new AtomicInteger();
   private int debugCount = 0;
   private PersistentServerState state;
@@ -150,6 +148,19 @@
   private int maxSendDelay = 0;
 
   /**
+   * This object is used to store the list of update currently being
+   * done on the local database.
+   * It contain both the update that are done directly on this server
+   * and the updates that was done on another server, transmitted
+   * by the replication server and that are currently replayed.
+   * It is usefull to make sure that dependencies between operations
+   * are correctly fullfilled, that the local operations are sent in a
+   * correct order to the replication server and that the ServerState
+   * is not updated too early.
+   */
+  private PendingChanges pendingChanges;
+
+  /**
    * The time in milliseconds between heartbeats from the replication
    * server.  Zero means heartbeats are off.
    */
@@ -245,7 +256,7 @@
   private ConfigEntry backendConfigEntry;
   private List<DN> branches = new ArrayList<DN>(0);
 
-  private int listenerThreadNumber = 1;
+  private int listenerThreadNumber = 10;
   private boolean receiveStatus = true;
 
   private Collection<String> replicationServers;
@@ -317,12 +328,6 @@
     DirectoryServer.registerMonitorProvider(monitor);
 
     /*
-     * ChangeNumberGenerator is used to create new unique ChangeNumbers
-     * for each operation done on the replication domain.
-     */
-    changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
-
-    /*
      * create the broker object used to publish and receive changes
      */
     try
@@ -348,6 +353,14 @@
       */
     }
 
+    /*
+     * ChangeNumberGenerator is used to create new unique ChangeNumbers
+     * for each operation done on the replication domain.
+     */
+    pendingChanges =
+      new PendingChanges(new ChangeNumberGenerator(serverId, state),
+                         broker, state);
+
     // listen for changes on the configuration
     configuration.addChangeListener(this);
   }
@@ -444,23 +457,37 @@
        * of the parent entry
        */
 
-      // There is a potential of perfs improvement here
-      // if we could avoid the following parent entry retrieval
-      DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
-
-      if (parentDnFromCtx != null)
+      String parentUid = ctx.getParentUid();
+      // root entry have no parent,
+      // there is no need to check for it.
+      if (parentUid != null)
       {
-        DN entryDN = addOperation.getEntryDN();
-        DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
-        if ((parentDnFromEntryDn != null)
-            && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
+        // There is a potential of perfs improvement here
+        // if we could avoid the following parent entry retrieval
+        DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
+
+        if (parentDnFromCtx == null)
         {
-          // parentEntry has been renamed
-          // replication name conflict resolution is expected to fix that
-          // later in the flow
+          // The parent does not exist with the specified unique id
+          // stop the operation with NO_SUCH_OBJECT and let the
+          // conflict resolution or the dependency resolution solve this.
           addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
           return new SynchronizationProviderResult(false);
         }
+        else
+        {
+          DN entryDN = addOperation.getEntryDN();
+          DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
+          if ((parentDnFromEntryDn != null)
+              && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
+          {
+            // parentEntry has been renamed
+            // replication name conflict resolution is expected to fix that
+            // later in the flow
+            addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+            return new SynchronizationProviderResult(false);
+          }
+        }
       }
     }
     return new SynchronizationProviderResult(true);
@@ -633,89 +660,96 @@
    */
   public UpdateMessage receive()
   {
-    synchronized (broker)
+    UpdateMessage update = pendingChanges.getNextUpdate();
+
+    if (update == null)
     {
-      UpdateMessage update = null;
-      while (update == null)
+      synchronized (broker)
       {
-        ReplicationMessage msg;
-        try
+        while (update == null)
         {
-          msg = broker.receive();
-          if (msg == null)
+          ReplicationMessage msg;
+          try
           {
-            // The server is in the shutdown process
-            return null;
-          }
-          log("Broker received message :" + msg);
-          if (msg instanceof AckMessage)
-          {
-            AckMessage ack = (AckMessage) msg;
-            receiveAck(ack);
-          }
-          else if (msg instanceof UpdateMessage)
-          {
-            update = (UpdateMessage) msg;
-            receiveUpdate(update);
-          }
-          else if (msg instanceof InitializeRequestMessage)
-          {
-            // Another server requests us to provide entries
-            // for a total update
-            InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
-            try
+            msg = broker.receive();
+            if (msg == null)
             {
-              initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
-                  null);
+              // The server is in the shutdown process
+              return null;
             }
-            catch(DirectoryException de)
+            log("Broker received message :" + msg);
+            if (msg instanceof AckMessage)
             {
-              // Returns an error message to notify the sender
-              int msgID = de.getMessageID();
-              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
-                  msgID, de.getMessage());
-              broker.publish(errorMsg);
+              AckMessage ack = (AckMessage) msg;
+              receiveAck(ack);
             }
-          }
-          else if (msg instanceof InitializeTargetMessage)
-          {
-            // Another server is exporting its entries to us
-            InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
+            else if (msg instanceof InitializeRequestMessage)
+            {
+              // Another server requests us to provide entries
+              // for a total update
+              InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
+              try
+              {
+                initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+                                 null);
+              }
+              catch(DirectoryException de)
+              {
+                // Returns an error message to notify the sender
+                int msgID = de.getMessageID();
+                ErrorMessage errorMsg =
+                  new ErrorMessage(initMsg.getsenderID(),
+                                   msgID, de.getMessage());
+                broker.publish(errorMsg);
+              }
+            }
+            else if (msg instanceof InitializeTargetMessage)
+            {
+              // Another server is exporting its entries to us
+              InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
 
-            try
-            {
-              importBackend(initMsg);
+              try
+              {
+                importBackend(initMsg);
+              }
+              catch(DirectoryException de)
+              {
+                // Return an error message to notify the sender
+                int msgID = de.getMessageID();
+                ErrorMessage errorMsg =
+                  new ErrorMessage(initMsg.getsenderID(),
+                                   msgID, de.getMessage());
+                log(getMessage(msgID,
+                               backend.getBackendID()) + de.getMessage());
+                broker.publish(errorMsg);
+              }
             }
-            catch(DirectoryException de)
+            else if (msg instanceof ErrorMessage)
             {
-              // Return an error message to notify the sender
-              int msgID = de.getMessageID();
-              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
-                  msgID, de.getMessage());
-              log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
-              broker.publish(errorMsg);
+              if (ieContext != null)
+              {
+                // This is an error termination for the 2 following cases :
+                // - either during an export
+                // - or before an import really started
+                //   For example, when we publish a request and the
+                //  replicationServer did not find any import source.
+                abandonImportExport((ErrorMessage)msg);
+              }
+            }
+            else if (msg instanceof UpdateMessage)
+            {
+              update = (UpdateMessage) msg;
+              receiveUpdate(update);
             }
           }
-          else if (msg instanceof ErrorMessage)
+          catch (SocketTimeoutException e)
           {
-            if (ieContext != null)
-            {
-              // This is an error termination for the 2 following cases :
-              // - either during an export
-              // - or before an import really started
-              //   For example, when we publish a request and the
-              //  replicationServer did not find any import source.
-              abandonImportExport((ErrorMessage)msg);
-            }
+            // just retry
           }
-        } catch (SocketTimeoutException e)
-        {
-          // just retry
         }
-
       }
-      return update;
     }
+    return update;
   }
 
   /**
@@ -725,21 +759,8 @@
    */
   public void receiveUpdate(UpdateMessage update)
   {
-    ChangeNumber changeNumber = update.getChangeNumber();
-
-    synchronized (pendingChanges)
-    {
-      if (pendingChanges.containsKey(changeNumber))
-      {
-        /*
-         * This should never happen,
-         * TODO log error and throw exception
-         */
-      }
-      pendingChanges.put(changeNumber,
-          new PendingChange(changeNumber, null, update));
-      numRcvdUpdates++;
-    }
+    pendingChanges.putRemoteUpdate(update);
+    numRcvdUpdates.incrementAndGet();
   }
 
   /**
@@ -752,10 +773,9 @@
     UpdateMessage update;
     ChangeNumber changeNumber = ack.getChangeNumber();
 
-    synchronized (pendingChanges)
+    synchronized (waitingAckMsgs)
     {
-      update = waitingAckMsgs.get(changeNumber);
-      waitingAckMsgs.remove(changeNumber);
+      update = waitingAckMsgs.remove(changeNumber);
     }
     if (update != null)
     {
@@ -798,61 +818,55 @@
          * This is an operation type that we do not know about
          * It should never happen.
          */
-        synchronized (pendingChanges)
-        {
-          pendingChanges.remove(curChangeNumber);
-          int    msgID   = MSGID_UNKNOWN_TYPE;
-          String message = getMessage(msgID, op.getOperationType().toString());
-          logError(ErrorLogCategory.SYNCHRONIZATION,
-                   ErrorLogSeverity.SEVERE_ERROR,
-                   message, msgID);
-          return;
-        }
+        pendingChanges.remove(curChangeNumber);
+        int    msgID   = MSGID_UNKNOWN_TYPE;
+        String message = getMessage(msgID, op.getOperationType().toString());
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+                 ErrorLogSeverity.SEVERE_ERROR,
+                 message, msgID);
+        return;
       }
     }
 
-    synchronized(pendingChanges)
+    if (result == ResultCode.SUCCESS)
     {
-      if (result == ResultCode.SUCCESS)
+      try
       {
-        PendingChange curChange = pendingChanges.get(curChangeNumber);
-        if (curChange == null)
-        {
-          // This should never happen
-          int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
-          String message = getMessage(msgID, curChangeNumber.toString(),
-              op.toString());
-          logError(ErrorLogCategory.SYNCHRONIZATION,
-              ErrorLogSeverity.SEVERE_ERROR,
-              message, msgID);
-          return;
-        }
-        curChange.setCommitted(true);
+        pendingChanges.commit(curChangeNumber, op, msg);
+      }
+      catch  (NoSuchElementException e)
+      {
+        int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
+        String message = getMessage(msgID, curChangeNumber.toString(),
+                                    op.toString());
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+                 ErrorLogSeverity.SEVERE_ERROR,
+                 message, msgID);
+        return;
+      }
 
-        if (op.isSynchronizationOperation())
-          curChange.setOp(op);
-        else
-          curChange.setMsg(msg);
-
-        if (msg != null && isAssured)
+      if (msg != null && isAssured)
+      {
+        synchronized (waitingAckMsgs)
         {
-          // Add the assured message to the list of those whose acknowledgements
-          // we are awaiting.
+          // Add the assured message to the list of update that are
+          // waiting acknowledgements
           waitingAckMsgs.put(curChangeNumber, msg);
         }
       }
-      else if (!op.isSynchronizationOperation())
-      {
-        // Remove an unsuccessful non-replication operation from the pending
-        // changes list.
-        if (curChangeNumber != null)
-        {
-          pendingChanges.remove(curChangeNumber);
-        }
-      }
-
-      pushCommittedChanges();
     }
+    else if (!op.isSynchronizationOperation())
+    {
+      // Remove an unsuccessful non-replication operation from the pending
+      // changes list.
+      if (curChangeNumber != null)
+      {
+        pendingChanges.remove(curChangeNumber);
+      }
+    }
+
+    int pushedChanges = pendingChanges.pushCommittedChanges();
+    numSentUpdates.addAndGet(pushedChanges);
 
     // Wait for acknowledgement of an assured message.
     if (msg != null && isAssured)
@@ -880,7 +894,7 @@
    */
   public int getNumRcvdUpdates()
   {
-    return numRcvdUpdates;
+    return numRcvdUpdates.get();
   }
 
   /**
@@ -890,11 +904,11 @@
    */
   public int getNumSentUpdates()
   {
-    return numSentUpdates;
+    return numSentUpdates.get();
   }
 
   /**
-   * get the number of updates in the pending list.
+   * Get the number of updates in the pending list.
    *
    * @return The number of updates in the pending list
    */
@@ -1052,45 +1066,63 @@
   {
     Operation op = null;
     boolean done = false;
+    boolean dependency = false;
     ChangeNumber changeNumber = null;
     int retryCount = 10;
+    boolean firstTry = true;
 
     try
     {
-      while (!done && retryCount-- > 0)
+      while ((!dependency) && (!done) && (retryCount-- > 0))
       {
         op = msg.createOperation(conn);
 
         op.setInternalOperation(true);
         op.setSynchronizationOperation(true);
         changeNumber = OperationContext.getChangeNumber(op);
-        if (changeNumber != null)
-          changeNumberGenerator.adjust(changeNumber);
 
         op.run();
 
         ResultCode result = op.getResultCode();
+
         if (result != ResultCode.SUCCESS)
         {
           if (op instanceof ModifyOperation)
           {
             ModifyOperation newOp = (ModifyOperation) op;
-            done = solveNamingConflict(newOp, msg);
+            dependency = pendingChanges.checkDependencies(newOp);
+            if (!dependency)
+            {
+              done = solveNamingConflict(newOp, msg);
+            }
           }
           else if (op instanceof DeleteOperation)
           {
             DeleteOperation newOp = (DeleteOperation) op;
-            done = solveNamingConflict(newOp, msg);
+            dependency = pendingChanges.checkDependencies(newOp);
+            if ((!dependency) && (!firstTry))
+            {
+              done = solveNamingConflict(newOp, msg);
+            }
           }
           else if (op instanceof AddOperation)
           {
             AddOperation newOp = (AddOperation) op;
-            done = solveNamingConflict(newOp, msg);
-
-          } else if (op instanceof ModifyDNOperation)
+            dependency = pendingChanges.checkDependencies(newOp);
+            if (!dependency)
+            {
+              done = solveNamingConflict(newOp, msg);
+            }
+          }
+          else if (op instanceof ModifyDNOperation)
           {
-            ModifyDNOperation newOp = (ModifyDNOperation) op;
-            done = solveNamingConflict(newOp, msg);
+            ModifyDNMsg newMsg = (ModifyDNMsg) msg;
+            dependency = pendingChanges.checkDependencies(newMsg);
+            if (!dependency)
+            {
+              ModifyDNOperation newOp = (ModifyDNOperation) op;
+              done = solveNamingConflict(newOp, msg);
+            }
           }
           else
           {
@@ -1108,9 +1140,10 @@
         {
           done = true;
         }
+        firstTry = false;
       }
 
-      if (!done)
+      if (!done && !dependency)
       {
         // Continue with the next change but the servers could now become
         // inconsistent.
@@ -1119,6 +1152,7 @@
         String message = getMessage(msgID, op.toString());
         logError(ErrorLogCategory.SYNCHRONIZATION,
             ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+
         updateError(changeNumber);
       }
     }
@@ -1177,9 +1211,12 @@
     }
     finally
     {
-      if (msg.isAssured())
-        ack(msg.getChangeNumber());
-      incProcessedUpdates();
+      if (!dependency)
+      {
+        if (msg.isAssured())
+          ack(msg.getChangeNumber());
+        incProcessedUpdates();
+      }
     }
   }
 
@@ -1193,12 +1230,9 @@
    */
   public void updateError(ChangeNumber changeNumber)
   {
-    synchronized (pendingChanges)
-    {
-      PendingChange change = pendingChanges.get(changeNumber);
-      change.setCommitted(true);
-      pushCommittedChanges();
-    }
+    pendingChanges.commit(changeNumber);
+    int pushedChanges = pendingChanges.pushCommittedChanges();
+    numSentUpdates.addAndGet(pushedChanges);
   }
 
   /**
@@ -1210,15 +1244,7 @@
    */
   private ChangeNumber generateChangeNumber(Operation operation)
   {
-    ChangeNumber changeNumber;
-
-    changeNumber = changeNumberGenerator.NewChangeNumber();
-    PendingChange change = new PendingChange(changeNumber, operation, null);
-    synchronized(pendingChanges)
-    {
-      pendingChanges.put(changeNumber, change);
-    }
-    return changeNumber;
+    return pendingChanges.putLocalOperation(operation);
   }
 
 
@@ -1604,42 +1630,6 @@
   }
 
   /**
-   * Push all committed local changes to the replicationServer service.
-   * PRECONDITION : The pendingChanges lock must be held before calling
-   * this method.
-   */
-  private void pushCommittedChanges()
-  {
-    if (pendingChanges.isEmpty())
-      return;
-
-    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
-    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
-
-    while ((firstChange != null) && firstChange.isCommitted())
-    {
-      if ((firstChange.getOp() != null ) &&
-          (firstChange.getOp().isSynchronizationOperation() == false))
-      {
-        numSentUpdates++;
-        broker.publish(firstChange.getMsg());
-      }
-      state.update(firstChangeNumber);
-      pendingChanges.remove(firstChangeNumber);
-
-      if (pendingChanges.isEmpty())
-      {
-        firstChange = null;
-      }
-      else
-      {
-        firstChangeNumber = pendingChanges.firstKey();
-        firstChange = pendingChanges.get(firstChangeNumber);
-      }
-    }
-  }
-
-  /**
    * Get the maximum receive window size.
    *
    * @return The maximum receive window size.

--
Gitblit v1.10.0