From 0d566a1779183a479c33447ac6b0224b705a33c9 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 18 Jun 2007 06:50:18 +0000
Subject: [PATCH] Fix for 1797 : dead locks during replication testing

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java |   52 +++++++++++++++++++++++++++++++++++-----------------
 1 files changed, 35 insertions(+), 17 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 3d76049..c7e9295 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -170,17 +170,22 @@
   /**
    * This object is used to store the list of update currently being
    * done on the local database.
-   * It contain both the update that are done directly on this server
-   * and the updates that was done on another server, transmitted
-   * by the replication server and that are currently replayed.
-   * It is usefull to make sure that dependencies between operations
-   * are correctly fullfilled, that the local operations are sent in a
+   * Is is usefull to make sure that the local operations are sent in a
    * correct order to the replication server and that the ServerState
    * is not updated too early.
    */
   private PendingChanges pendingChanges;
 
   /**
+   * It contain the updates that were done on other servers, transmitted
+   * by the replication server and that are currently replayed.
+   * It is usefull to make sure that dependencies between operations
+   * are correctly fullfilled and to to make sure that the ServerState is
+   * not updated too early.
+   */
+  private RemotePendingChanges remotePendingChanges;
+
+  /**
    * The time in milliseconds between heartbeats from the replication
    * server.  Zero means heartbeats are off.
    */
@@ -373,10 +378,15 @@
      * ChangeNumberGenerator is used to create new unique ChangeNumbers
      * for each operation done on the replication domain.
      */
+    ChangeNumberGenerator generator =
+      new ChangeNumberGenerator(serverId, state);
+
     pendingChanges =
       new PendingChanges(new ChangeNumberGenerator(serverId, state),
                          broker, state);
 
+    remotePendingChanges = new RemotePendingChanges(generator, state);
+
     // listen for changes on the configuration
     configuration.addChangeListener(this);
   }
@@ -679,7 +689,7 @@
    */
   public UpdateMessage receive()
   {
-    UpdateMessage update = pendingChanges.getNextUpdate();
+    UpdateMessage update = remotePendingChanges.getNextUpdate();
 
     if (update == null)
     {
@@ -774,7 +784,7 @@
    */
   public void receiveUpdate(UpdateMessage update)
   {
-    pendingChanges.putRemoteUpdate(update);
+    remotePendingChanges.putRemoteUpdate(update);
     numRcvdUpdates.incrementAndGet();
   }
 
@@ -847,7 +857,14 @@
     {
       try
       {
-        pendingChanges.commit(curChangeNumber, op, msg);
+        if (op.isSynchronizationOperation())
+        {
+          remotePendingChanges.commit(curChangeNumber);
+        }
+        else
+        {
+          pendingChanges.commit(curChangeNumber, msg);
+        }
       }
       catch  (NoSuchElementException e)
       {
@@ -880,8 +897,11 @@
       }
     }
 
-    int pushedChanges = pendingChanges.pushCommittedChanges();
-    numSentUpdates.addAndGet(pushedChanges);
+    if (!op.isSynchronizationOperation())
+    {
+      int pushedChanges = pendingChanges.pushCommittedChanges();
+      numSentUpdates.addAndGet(pushedChanges);
+    }
 
     // Wait for acknowledgement of an assured message.
     if (msg != null && isAssured)
@@ -1111,7 +1131,7 @@
           if (op instanceof ModifyOperation)
           {
             ModifyOperation newOp = (ModifyOperation) op;
-            dependency = pendingChanges.checkDependencies(newOp);
+            dependency = remotePendingChanges.checkDependencies(newOp);
             if (!dependency)
             {
               done = solveNamingConflict(newOp, msg);
@@ -1120,7 +1140,7 @@
           else if (op instanceof DeleteOperation)
           {
             DeleteOperation newOp = (DeleteOperation) op;
-            dependency = pendingChanges.checkDependencies(newOp);
+            dependency = remotePendingChanges.checkDependencies(newOp);
             if ((!dependency) && (!firstTry))
             {
               done = solveNamingConflict(newOp, msg);
@@ -1130,7 +1150,7 @@
           {
             AddOperation newOp = (AddOperation) op;
             AddMsg addMsg = (AddMsg) msg;
-            dependency = pendingChanges.checkDependencies(newOp);
+            dependency = remotePendingChanges.checkDependencies(newOp);
             if (!dependency)
             {
               done = solveNamingConflict(newOp, addMsg);
@@ -1139,7 +1159,7 @@
           else if (op instanceof ModifyDNOperation)
           {
             ModifyDNMsg newMsg = (ModifyDNMsg) msg;
-            dependency = pendingChanges.checkDependencies(newMsg);
+            dependency = remotePendingChanges.checkDependencies(newMsg);
             if (!dependency)
             {
               ModifyDNOperation newOp = (ModifyDNOperation) op;
@@ -1253,9 +1273,7 @@
    */
   public void updateError(ChangeNumber changeNumber)
   {
-    pendingChanges.commit(changeNumber);
-    int pushedChanges = pendingChanges.pushCommittedChanges();
-    numSentUpdates.addAndGet(pushedChanges);
+    remotePendingChanges.commit(changeNumber);
   }
 
   /**

--
Gitblit v1.10.0