From 2cf4412179a4ca8610d7fbb2108040377290bf82 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 06 Jun 2014 13:12:34 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3697) Change time heart beat change numbers should be synced with updates

---
 opends/src/server/org/opends/server/replication/plugin/PendingChanges.java |   59 ++++++++++++++++++++++++-----------------------------------
 1 files changed, 24 insertions(+), 35 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index b66f7b6..42ee696 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,8 +26,8 @@
  */
 package org.opends.server.replication.plugin;
 
+import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.SortedMap;
 import java.util.TreeMap;
 
 import org.opends.server.replication.common.CSN;
@@ -51,19 +51,19 @@
   /**
    * A map used to store the pending changes.
    */
-  private SortedMap<CSN, PendingChange> pendingChanges =
-    new TreeMap<CSN, PendingChange>();
+  private final TreeMap<CSN, PendingChange> pendingChanges =
+      new TreeMap<CSN, PendingChange>();
 
   /**
    * The {@link CSNGenerator} to use to create new unique CSNs
    * for each operation done on the replication domain.
    */
-  private CSNGenerator csnGenerator;
+  private final CSNGenerator csnGenerator;
 
   /**
    * The ReplicationDomain that will be used to send UpdateMsg.
    */
-  private ReplicationDomain domain;
+  private final ReplicationDomain domain;
 
   private boolean recoveringOldChanges = false;
 
@@ -128,34 +128,32 @@
   synchronized CSN putLocalOperation(PluginOperation operation)
   {
     final CSN csn = csnGenerator.newCSN();
-    final PendingChange change = new PendingChange(csn, operation, null);
-    pendingChanges.put(csn, change);
+    if (!operation.isSynchronizationOperation())
+    {
+      pendingChanges.put(csn, new PendingChange(csn, operation, null));
+    }
     return csn;
   }
 
   /**
    * Push all committed local changes to the replicationServer service.
-   *
-   * @return The number of pushed updates.
    */
-  synchronized int pushCommittedChanges()
+  synchronized void pushCommittedChanges()
   {
-    int numSentUpdates = 0;
-    if (pendingChanges.isEmpty())
+    // peek the oldest change
+    Entry<CSN, PendingChange> firstEntry = pendingChanges.firstEntry();
+    if (firstEntry == null)
     {
-      return numSentUpdates;
+      return;
     }
 
-    // peek the oldest CSN
-    CSN firstCSN = pendingChanges.firstKey();
-    PendingChange firstChange = pendingChanges.get(firstCSN);
+    PendingChange firstChange = firstEntry.getValue();
 
     while (firstChange != null && firstChange.isCommitted())
     {
       final PluginOperation op = firstChange.getOp();
       if (op != null && !op.isSynchronizationOperation())
       {
-        numSentUpdates++;
         final LDAPUpdateMsg updateMsg = firstChange.getMsg();
         if (!recoveringOldChanges)
         {
@@ -168,20 +166,14 @@
           domain.getServerState().update(updateMsg.getCSN());
         }
       }
-      pendingChanges.remove(firstCSN);
 
-      if (pendingChanges.isEmpty())
-      {
-        firstChange = null;
-      }
-      else
-      {
-        // peek the oldest CSN
-        firstCSN = pendingChanges.firstKey();
-        firstChange = pendingChanges.get(firstCSN);
-      }
+      // false warning: firstEntry will not be null if firstChange is not null
+      pendingChanges.remove(firstEntry.getKey());
+
+      // peek the oldest change
+      firstEntry = pendingChanges.firstEntry();
+      firstChange = firstEntry != null ? firstEntry.getValue() : null;
     }
-    return numSentUpdates;
   }
 
   /**
@@ -189,16 +181,13 @@
    * push all committed local changes to the replicationServer service
    * in a single atomic operation.
    *
-   *
    * @param csn The CSN of the update message that must be set as committed.
-   * @param msg          The message associated to the update.
-   *
-   * @return The number of pushed updates.
+   * @param msg The message associated to the update.
    */
-  synchronized int commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
+  synchronized void commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
   {
     commit(csn, msg);
-    return pushCommittedChanges();
+    pushCommittedChanges();
   }
 
   /**

--
Gitblit v1.10.0