From 67405dde9ba213331dab1fc46cb18c485070fd5b Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Fri, 05 Jun 2009 09:04:50 +0000
Subject: [PATCH] svn merge -r5333:5417 https://opends.dev.java.net/svn/opends/branches/b2.0

---
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java |   60 ++++++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 46 insertions(+), 14 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 78bda99..76d15fc 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -58,7 +58,7 @@
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
-import java.util.TreeSet;
+import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -226,8 +226,8 @@
 
   // This list is used to temporary store operations that needs
   // to be replayed at session establishment time.
-  private final TreeSet<FakeOperation> replayOperations  =
-    new TreeSet<FakeOperation>(new FakeOperationComparator());;
+  private final TreeMap<ChangeNumber, FakeOperation> replayOperations  =
+    new TreeMap<ChangeNumber, FakeOperation>();;
 
   /**
    * The isolation policy that this domain is going to use.
@@ -542,7 +542,23 @@
       String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
       ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
       deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
+
+      synchronized (replayOperations)
+      {
+        int size = replayOperations.size();
+        if (size >= 10000)
+        {
+          replayOperations.remove(replayOperations.firstKey());
+        }
+        replayOperations.put(
+            changeNumber,
+            new FakeDelOperation(
+                deleteOperation.getEntryDN().toString(),
+                changeNumber,modifiedEntryUUID ));
+      }
+
     }
+
     return new SynchronizationProviderResult.ContinueProcessing();
   }
 
@@ -2863,25 +2879,41 @@
           logError(message);
         } else
         {
-          for (FakeOperation replayOp : replayOperations)
+          for (FakeOperation replayOp :
+            replayOperations.tailMap(replServerMaxChangeNumber).values())
           {
             ChangeNumber cn = replayOp.getChangeNumber();
             /*
              * Because the entry returned by the search operation
              * can contain old historical information, it is
              * possible that some of the FakeOperation are
-             * actually older than the
-             * Only send the Operation if it was newer than
-             * the last ChangeNumber known by the Replication Server.
+             * actually older than the last ChangeNumber known by
+             * the Replication Server.
+             * In such case don't send the operation.
              */
-            if (cn.newer(replServerMaxChangeNumber))
+            if (!cn.newer(replServerMaxChangeNumber))
             {
-              message =
-                DEBUG_SENDING_CHANGE.get(
-                    replayOp.getChangeNumber().toString());
-              logError(message);
-              session.publish(replayOp.generateMessage());
+              continue;
             }
+
+            /*
+             * Check if the DeleteOperation has been abandoned before
+             * being processed. This is necessary because the replayOperation
+             *
+             */
+            if (replayOp instanceof FakeDelOperation)
+            {
+              FakeDelOperation delOp = (FakeDelOperation) replayOp;
+              if (findEntryDN(delOp.getUUID()) != null)
+              {
+                continue;
+              }
+            }
+            message =
+              DEBUG_SENDING_CHANGE.get(
+                  replayOp.getChangeNumber().toString());
+            logError(message);
+            session.publish(replayOp.generateMessage());
           }
           message = DEBUG_CHANGES_SENT.get();
           logError(message);
@@ -2958,7 +2990,7 @@
       Historical.generateFakeOperations(searchEntry);
     for (FakeOperation op : updates)
     {
-      replayOperations.add(op);
+      replayOperations.put(op.getChangeNumber(), op);
     }
   }
 

--
Gitblit v1.10.0