From 0a51f5fbeb5e99c52f1be8973ae656de34fab75f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 14 Aug 2013 09:30:53 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/MessageHandler.java | 143 +++++++++++++++++++++++------------------------
1 files changed, 71 insertions(+), 72 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index ed652c7..f55298c 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -280,32 +280,13 @@
* unlock memory tree
* restart as usual
* load this change on the delayList
- *
*/
- SortedSet<ReplicationIterator> iteratorSortedSet =
- new TreeSet<ReplicationIterator>(
- new ReplicationIteratorComparator());
+ SortedSet<ReplicationIterator> iteratorSortedSet = null;
try
{
- /* fill the lateQueue */
- for (int serverId : replicationServerDomain.getServers())
- {
- ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
- ReplicationIterator iterator = replicationServerDomain
- .getChangelogIterator(serverId, lastCsn);
- if (iterator != null)
- {
- if (iterator.getChange() != null)
- {
- iteratorSortedSet.add(iterator);
- }
- else
- {
- iterator.releaseCursor();
- }
- }
- }
+ iteratorSortedSet = collectAllIteratorsWithChanges();
+ /* fill the lateQueue */
// The loop below relies on the fact that it is sorted based
// on the currentChange of each iterator to consider the next
// change across all servers.
@@ -320,22 +301,12 @@
ReplicationIterator iterator = iteratorSortedSet.first();
iteratorSortedSet.remove(iterator);
lateQueue.add(iterator.getChange());
- if (iterator.next())
- {
- iteratorSortedSet.add(iterator);
- }
- else
- {
- iterator.releaseCursor();
- }
+ addIteratorIfNotEmpty(iteratorSortedSet, iterator);
}
}
finally
{
- for (ReplicationIterator iterator : iteratorSortedSet)
- {
- iterator.releaseCursor();
- }
+ releaseAllIterators(iteratorSortedSet);
}
/*
@@ -343,7 +314,6 @@
* messages in the replication log so the remote serevr is not
* late anymore.
*/
-
if (lateQueue.isEmpty())
{
synchronized (msgQueue)
@@ -430,6 +400,19 @@
return null;
}
+ private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators,
+ ReplicationIterator iter)
+ {
+ if (iter.next())
+ {
+ iterators.add(iter);
+ }
+ else
+ {
+ iter.releaseCursor();
+ }
+ }
+
/**
* Get the older Change Number for that server.
* Returns null when the queue is empty.
@@ -450,7 +433,12 @@
}
else
{
- if (lateQueue.isEmpty())
+ if (!lateQueue.isEmpty())
+ {
+ UpdateMsg msg = lateQueue.first();
+ result = msg.getChangeNumber();
+ }
+ else
{
/*
following is false AND lateQueue is empty
@@ -460,36 +448,10 @@
there. So let's take the last change not sent directly from
the db.
*/
- SortedSet<ReplicationIterator> iteratorSortedSet =
- new TreeSet<ReplicationIterator>(
- new ReplicationIteratorComparator());
+ SortedSet<ReplicationIterator> iteratorSortedSet = null;
try
{
- // Build a list of candidates iterator (i.e. db i.e. server)
- for (int serverId : replicationServerDomain.getServers())
- {
- // get the last already sent CN from that server
- ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
- // get an iterator in this server db from that last change
- ReplicationIterator iterator =
- replicationServerDomain.getChangelogIterator(serverId, lastCsn);
- /*
- if that iterator has changes, then it is a candidate
- it is added in the sorted list at a position given by its
- current change (see ReplicationIteratorComparator).
- */
- if (iterator != null)
- {
- if (iterator.getChange() != null)
- {
- iteratorSortedSet.add(iterator);
- }
- else
- {
- iterator.releaseCursor();
- }
- }
- }
+ iteratorSortedSet = collectAllIteratorsWithChanges();
UpdateMsg msg = iteratorSortedSet.first().getChange();
result = msg.getChangeNumber();
} catch (Exception e)
@@ -497,21 +459,58 @@
result = null;
} finally
{
- for (ReplicationIterator iterator : iteratorSortedSet)
- {
- iterator.releaseCursor();
- }
+ releaseAllIterators(iteratorSortedSet);
}
- } else
- {
- UpdateMsg msg = lateQueue.first();
- result = msg.getChangeNumber();
}
}
}
return result;
}
+ private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges()
+ {
+ SortedSet<ReplicationIterator> results =
+ new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator());
+
+ // Build a list of candidates iterator (i.e. db i.e. server)
+ for (int serverId : replicationServerDomain.getServers())
+ {
+ // get the last already sent CN from that server
+ ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
+ // get an iterator in this server db from that last change
+ ReplicationIterator iter =
+ replicationServerDomain.getChangelogIterator(serverId, lastCsn);
+ /*
+ if that iterator has changes, then it is a candidate
+ it is added in the sorted list at a position given by its
+ current change (see ReplicationIteratorComparator).
+ */
+ if (iter != null)
+ {
+ if (iter.getChange() != null)
+ {
+ results.add(iter);
+ }
+ else
+ {
+ iter.releaseCursor();
+ }
+ }
+ }
+ return results;
+ }
+
+ private void releaseAllIterators(SortedSet<ReplicationIterator> iterators)
+ {
+ if (iterators != null)
+ {
+ for (ReplicationIterator iter : iterators)
+ {
+ iter.releaseCursor();
+ }
+ }
+ }
+
/**
* Get the count of updates sent to this server.
* @return The count of update sent to this server.
--
Gitblit v1.10.0