From 2cf4412179a4ca8610d7fbb2108040377290bf82 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 06 Jun 2014 13:12:34 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3697) Change time heart beat change numbers should be synced with updates
---
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java | 59 ++++++++++++++++++++++++-----------------------------------
1 files changed, 24 insertions(+), 35 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index b66f7b6..42ee696 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,8 +26,8 @@
*/
package org.opends.server.replication.plugin;
+import java.util.Map.Entry;
import java.util.NoSuchElementException;
-import java.util.SortedMap;
import java.util.TreeMap;
import org.opends.server.replication.common.CSN;
@@ -51,19 +51,19 @@
/**
* A map used to store the pending changes.
*/
- private SortedMap<CSN, PendingChange> pendingChanges =
- new TreeMap<CSN, PendingChange>();
+ private final TreeMap<CSN, PendingChange> pendingChanges =
+ new TreeMap<CSN, PendingChange>();
/**
* The {@link CSNGenerator} to use to create new unique CSNs
* for each operation done on the replication domain.
*/
- private CSNGenerator csnGenerator;
+ private final CSNGenerator csnGenerator;
/**
* The ReplicationDomain that will be used to send UpdateMsg.
*/
- private ReplicationDomain domain;
+ private final ReplicationDomain domain;
private boolean recoveringOldChanges = false;
@@ -128,34 +128,32 @@
synchronized CSN putLocalOperation(PluginOperation operation)
{
final CSN csn = csnGenerator.newCSN();
- final PendingChange change = new PendingChange(csn, operation, null);
- pendingChanges.put(csn, change);
+ if (!operation.isSynchronizationOperation())
+ {
+ pendingChanges.put(csn, new PendingChange(csn, operation, null));
+ }
return csn;
}
/**
* Push all committed local changes to the replicationServer service.
- *
- * @return The number of pushed updates.
*/
- synchronized int pushCommittedChanges()
+ synchronized void pushCommittedChanges()
{
- int numSentUpdates = 0;
- if (pendingChanges.isEmpty())
+ // peek the oldest change
+ Entry<CSN, PendingChange> firstEntry = pendingChanges.firstEntry();
+ if (firstEntry == null)
{
- return numSentUpdates;
+ return;
}
- // peek the oldest CSN
- CSN firstCSN = pendingChanges.firstKey();
- PendingChange firstChange = pendingChanges.get(firstCSN);
+ PendingChange firstChange = firstEntry.getValue();
while (firstChange != null && firstChange.isCommitted())
{
final PluginOperation op = firstChange.getOp();
if (op != null && !op.isSynchronizationOperation())
{
- numSentUpdates++;
final LDAPUpdateMsg updateMsg = firstChange.getMsg();
if (!recoveringOldChanges)
{
@@ -168,20 +166,14 @@
domain.getServerState().update(updateMsg.getCSN());
}
}
- pendingChanges.remove(firstCSN);
- if (pendingChanges.isEmpty())
- {
- firstChange = null;
- }
- else
- {
- // peek the oldest CSN
- firstCSN = pendingChanges.firstKey();
- firstChange = pendingChanges.get(firstCSN);
- }
+ // false warning: firstEntry will not be null if firstChange is not null
+ pendingChanges.remove(firstEntry.getKey());
+
+ // peek the oldest change
+ firstEntry = pendingChanges.firstEntry();
+ firstChange = firstEntry != null ? firstEntry.getValue() : null;
}
- return numSentUpdates;
}
/**
@@ -189,16 +181,13 @@
* push all committed local changes to the replicationServer service
* in a single atomic operation.
*
- *
* @param csn The CSN of the update message that must be set as committed.
- * @param msg The message associated to the update.
- *
- * @return The number of pushed updates.
+ * @param msg The message associated to the update.
*/
- synchronized int commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
+ synchronized void commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
{
commit(csn, msg);
- return pushCommittedChanges();
+ pushCommittedChanges();
}
/**
--
Gitblit v1.10.0