From a634c8d90fc2581a9486d91df07e874dda33b69e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 30 Aug 2013 10:07:10 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java | 153 +++++++++++++++++++++++++-------------------------
1 files changed, 77 insertions(+), 76 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 15017c9..b69eb3e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -252,7 +252,6 @@
{
while (activeConsumer)
{
- UpdateMsg msg;
if (!following)
{
/* this server is late with regard to some other masters
@@ -282,17 +281,17 @@
* restart as usual
* load this change on the delayList
*/
- NavigableSet<ReplicationDBCursor> sortedCursors = null;
+ NavigableSet<ReplicationDBCursor> sortedCursors = null;
try
{
sortedCursors = collectAllCursorsWithChanges();
- // fill the lateQueue
+ // fill the lateQueue
while (!sortedCursors.isEmpty()
&& lateQueue.count() < 100
&& lateQueue.bytesCount() < 50000)
{
- lateQueue.add(nextOldestUpdateMsg(sortedCursors));
+ lateQueue.add(nextOldestUpdateMsg(sortedCursors));
}
}
finally
@@ -315,15 +314,15 @@
following = true;
}
}
- }
- else
+ }
+ else
{
/*
* if the first change in the lateQueue is also on the regular
* queue, we can resume the processing from the regular queue
* -> set following to true and empty the lateQueue.
*/
- msg = lateQueue.first();
+ UpdateMsg msg = lateQueue.first();
synchronized (msgQueue)
{
if (msgQueue.contains(msg))
@@ -341,10 +340,11 @@
}
}
}
- }
- else
+ }
+ else
{
- // get the next change from the lateQueue
+ // get the next change from the lateQueue
+ UpdateMsg msg;
synchronized (msgQueue)
{
msg = lateQueue.removeFirst();
@@ -353,6 +353,8 @@
return msg;
}
}
+
+
synchronized (msgQueue)
{
if (following)
@@ -371,8 +373,7 @@
{
return null;
}
- msg = msgQueue.removeFirst();
-
+ UpdateMsg msg = msgQueue.removeFirst();
if (updateServerState(msg))
{
/*
@@ -393,38 +394,38 @@
return null;
}
- private UpdateMsg nextOldestUpdateMsg(
- NavigableSet<ReplicationDBCursor> sortedCursors)
- {
- /*
- * The cursors are sorted based on the currentChange of each cursor to
- * consider the next change across all servers.
- * To keep consistent the order of the cursors in the SortedSet,
- * it is necessary to remove and eventually add again a cursor (after moving
- * it forward).
- */
- final ReplicationDBCursor cursor = sortedCursors.pollFirst();
- final UpdateMsg result = cursor.getChange();
- cursor.next();
- addCursorIfNotEmpty(sortedCursors, cursor);
- return result;
- }
+ private UpdateMsg nextOldestUpdateMsg(
+ NavigableSet<ReplicationDBCursor> sortedCursors)
+ {
+ /*
+ * The cursors are sorted based on the currentChange of each cursor to
+ * consider the next change across all servers.
+ * To keep consistent the order of the cursors in the SortedSet,
+ * it is necessary to remove and eventually add again a cursor (after moving
+ * it forward).
+ */
+ final ReplicationDBCursor cursor = sortedCursors.pollFirst();
+ final UpdateMsg result = cursor.getChange();
+ cursor.next();
+ addCursorIfNotEmpty(sortedCursors, cursor);
+ return result;
+ }
- private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
- ReplicationDBCursor cursor)
- {
- if (cursor != null)
- {
- if (cursor.getChange() != null)
- {
- cursors.add(cursor);
- }
- else
- {
- close(cursor);
- }
- }
- }
+ private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
+ ReplicationDBCursor cursor)
+ {
+ if (cursor != null)
+ {
+ if (cursor.getChange() != null)
+ {
+ cursors.add(cursor);
+ }
+ else
+ {
+ close(cursor);
+ }
+ }
+ }
/**
* Get the older Change Number for that server.
@@ -460,49 +461,49 @@
the lateQueue when it will send the next update but we are not yet
there. So let's take the last change not sent directly from the db.
*/
- result = findOldestChangeNumberFromReplicationDBs();
+ result = findOldestChangeNumberFromReplicationDBs();
}
}
}
return result;
}
- private ChangeNumber findOldestChangeNumberFromReplicationDBs()
- {
- SortedSet<ReplicationDBCursor> sortedCursors = null;
- try
- {
- sortedCursors = collectAllCursorsWithChanges();
- UpdateMsg msg = sortedCursors.first().getChange();
- return msg.getChangeNumber();
- }
- catch (Exception e)
- {
- return null;
- }
- finally
- {
- close(sortedCursors);
- }
- }
-
- /**
- * Collects all the replication DB cursors that have changes and sort them
- * with the oldest {@link ChangeNumber} first.
- *
- * @return a List of cursors with changes sorted by their {@link ChangeNumber}
- * (oldest first)
- */
- private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
+ private ChangeNumber findOldestChangeNumberFromReplicationDBs()
{
- final NavigableSet<ReplicationDBCursor> results =
- new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
+ SortedSet<ReplicationDBCursor> sortedCursors = null;
+ try
+ {
+ sortedCursors = collectAllCursorsWithChanges();
+ UpdateMsg msg = sortedCursors.first().getChange();
+ return msg.getChangeNumber();
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
+ finally
+ {
+ close(sortedCursors);
+ }
+ }
+
+ /**
+ * Collects all the replication DB cursors that have changes and sort them
+ * with the oldest {@link ChangeNumber} first.
+ *
+ * @return a List of cursors with changes sorted by their {@link ChangeNumber}
+ * (oldest first)
+ */
+ private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
+ {
+ final NavigableSet<ReplicationDBCursor> results =
+ new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
for (int serverId : replicationServerDomain.getServerIds())
{
// get the last already sent CN from that server to get a cursor
- final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
- addCursorIfNotEmpty(results,
- replicationServerDomain.getCursorFrom(serverId, lastCsn));
+ final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
+ addCursorIfNotEmpty(results,
+ replicationServerDomain.getCursorFrom(serverId, lastCsn));
}
return results;
}
--
Gitblit v1.10.0