From 4936231f6b43a59233dc4ee909d9a2eeb3ced31a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 13:52:43 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/MessageHandler.java | 76 +++++++-------------------------------
1 files changed, 14 insertions(+), 62 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index ed243c4..07f2371 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,7 +27,8 @@
*/
package org.opends.server.replication.server;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
@@ -294,22 +295,19 @@
* restart as usual
* load this change on the delayList
*/
- NavigableSet<ReplicaDBCursor> sortedCursors = null;
+ ReplicaDBCursor cursor = null;
try
{
- sortedCursors = collectAllCursorsWithChanges();
-
// fill the lateQueue
- while (!sortedCursors.isEmpty()
- && lateQueue.count() < 100
- && lateQueue.bytesCount() < 50000)
+ cursor = replicationServerDomain.getCursorFrom(serverState);
+ while (cursor.next() && isLateQueueBelowThreshold())
{
- lateQueue.add(nextOldestUpdateMsg(sortedCursors));
+ lateQueue.add(cursor.getChange());
}
}
finally
{
- close(sortedCursors);
+ close(cursor);
}
/*
@@ -403,34 +401,9 @@
return null;
}
- private UpdateMsg nextOldestUpdateMsg(
- NavigableSet<ReplicaDBCursor> sortedCursors)
+ private boolean isLateQueueBelowThreshold()
{
- /*
- * The cursors are sorted based on the currentChange of each cursor to
- * consider the next change across all servers.
- * To keep consistent the order of the cursors in the SortedSet,
- * it is necessary to remove and eventually add again a cursor (after moving
- * it forward).
- */
- final ReplicaDBCursor cursor = sortedCursors.pollFirst();
- final UpdateMsg result = cursor.getChange();
- cursor.next();
- addCursorIfNotEmpty(sortedCursors, cursor);
- return result;
- }
-
- private void addCursorIfNotEmpty(Collection<ReplicaDBCursor> cursors,
- ReplicaDBCursor cursor)
- {
- if (cursor.getChange() != null)
- {
- cursors.add(cursor);
- }
- else
- {
- close(cursor);
- }
+ return lateQueue.count() < 100 && lateQueue.bytesCount() < 50000;
}
/**
@@ -476,12 +449,12 @@
private CSN findOldestCSNFromReplicaDBs()
{
- SortedSet<ReplicaDBCursor> sortedCursors = null;
+ ReplicaDBCursor cursor = null;
try
{
- sortedCursors = collectAllCursorsWithChanges();
- UpdateMsg msg = sortedCursors.first().getChange();
- return msg.getCSN();
+ cursor = replicationServerDomain.getCursorFrom(serverState);
+ cursor.next();
+ return cursor.getChange().getCSN();
}
catch (Exception e)
{
@@ -489,32 +462,11 @@
}
finally
{
- close(sortedCursors);
+ close(cursor);
}
}
/**
- * Collects all the {@link ReplicaDBCursor}s that have changes and sort them
- * with the oldest {@link CSN} first.
- *
- * @return a List of cursors with changes sorted by their {@link CSN}
- * (oldest first)
- */
- private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges()
- {
- final NavigableSet<ReplicaDBCursor> results =
- new TreeSet<ReplicaDBCursor>();
- for (int serverId : replicationServerDomain.getServerIds())
- {
- // get the last already sent CSN from that server to get a cursor
- final CSN lastCsn = serverState.getCSN(serverId);
- addCursorIfNotEmpty(results,
- replicationServerDomain.getCursorFrom(serverId, lastCsn));
- }
- return results;
- }
-
- /**
* Get the count of updates sent to this server.
* @return The count of update sent to this server.
*/
--
Gitblit v1.10.0