From a634c8d90fc2581a9486d91df07e874dda33b69e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 30 Aug 2013 10:07:10 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java | 26 ++--
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java | 153 +++++++++++++++---------------
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java | 24 ++--
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java | 34 +++---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 23 ++--
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java | 17 +-
6 files changed, 138 insertions(+), 139 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 15017c9..b69eb3e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -252,7 +252,6 @@
{
while (activeConsumer)
{
- UpdateMsg msg;
if (!following)
{
/* this server is late with regard to some other masters
@@ -282,17 +281,17 @@
* restart as usual
* load this change on the delayList
*/
- NavigableSet<ReplicationDBCursor> sortedCursors = null;
+ NavigableSet<ReplicationDBCursor> sortedCursors = null;
try
{
sortedCursors = collectAllCursorsWithChanges();
- // fill the lateQueue
+ // fill the lateQueue
while (!sortedCursors.isEmpty()
&& lateQueue.count() < 100
&& lateQueue.bytesCount() < 50000)
{
- lateQueue.add(nextOldestUpdateMsg(sortedCursors));
+ lateQueue.add(nextOldestUpdateMsg(sortedCursors));
}
}
finally
@@ -315,15 +314,15 @@
following = true;
}
}
- }
- else
+ }
+ else
{
/*
* if the first change in the lateQueue is also on the regular
* queue, we can resume the processing from the regular queue
* -> set following to true and empty the lateQueue.
*/
- msg = lateQueue.first();
+ UpdateMsg msg = lateQueue.first();
synchronized (msgQueue)
{
if (msgQueue.contains(msg))
@@ -341,10 +340,11 @@
}
}
}
- }
- else
+ }
+ else
{
- // get the next change from the lateQueue
+ // get the next change from the lateQueue
+ UpdateMsg msg;
synchronized (msgQueue)
{
msg = lateQueue.removeFirst();
@@ -353,6 +353,8 @@
return msg;
}
}
+
+
synchronized (msgQueue)
{
if (following)
@@ -371,8 +373,7 @@
{
return null;
}
- msg = msgQueue.removeFirst();
-
+ UpdateMsg msg = msgQueue.removeFirst();
if (updateServerState(msg))
{
/*
@@ -393,38 +394,38 @@
return null;
}
- private UpdateMsg nextOldestUpdateMsg(
- NavigableSet<ReplicationDBCursor> sortedCursors)
- {
- /*
- * The cursors are sorted based on the currentChange of each cursor to
- * consider the next change across all servers.
- * To keep consistent the order of the cursors in the SortedSet,
- * it is necessary to remove and eventually add again a cursor (after moving
- * it forward).
- */
- final ReplicationDBCursor cursor = sortedCursors.pollFirst();
- final UpdateMsg result = cursor.getChange();
- cursor.next();
- addCursorIfNotEmpty(sortedCursors, cursor);
- return result;
- }
+ private UpdateMsg nextOldestUpdateMsg(
+ NavigableSet<ReplicationDBCursor> sortedCursors)
+ {
+ /*
+ * The cursors are sorted based on the currentChange of each cursor to
+ * consider the next change across all servers.
+ * To keep consistent the order of the cursors in the SortedSet,
+ * it is necessary to remove and eventually add again a cursor (after moving
+ * it forward).
+ */
+ final ReplicationDBCursor cursor = sortedCursors.pollFirst();
+ final UpdateMsg result = cursor.getChange();
+ cursor.next();
+ addCursorIfNotEmpty(sortedCursors, cursor);
+ return result;
+ }
- private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
- ReplicationDBCursor cursor)
- {
- if (cursor != null)
- {
- if (cursor.getChange() != null)
- {
- cursors.add(cursor);
- }
- else
- {
- close(cursor);
- }
- }
- }
+ private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors,
+ ReplicationDBCursor cursor)
+ {
+ if (cursor != null)
+ {
+ if (cursor.getChange() != null)
+ {
+ cursors.add(cursor);
+ }
+ else
+ {
+ close(cursor);
+ }
+ }
+ }
/**
* Get the older Change Number for that server.
@@ -460,49 +461,49 @@
the lateQueue when it will send the next update but we are not yet
there. So let's take the last change not sent directly from the db.
*/
- result = findOldestChangeNumberFromReplicationDBs();
+ result = findOldestChangeNumberFromReplicationDBs();
}
}
}
return result;
}
- private ChangeNumber findOldestChangeNumberFromReplicationDBs()
- {
- SortedSet<ReplicationDBCursor> sortedCursors = null;
- try
- {
- sortedCursors = collectAllCursorsWithChanges();
- UpdateMsg msg = sortedCursors.first().getChange();
- return msg.getChangeNumber();
- }
- catch (Exception e)
- {
- return null;
- }
- finally
- {
- close(sortedCursors);
- }
- }
-
- /**
- * Collects all the replication DB cursors that have changes and sort them
- * with the oldest {@link ChangeNumber} first.
- *
- * @return a List of cursors with changes sorted by their {@link ChangeNumber}
- * (oldest first)
- */
- private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
+ private ChangeNumber findOldestChangeNumberFromReplicationDBs()
{
- final NavigableSet<ReplicationDBCursor> results =
- new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
+ SortedSet<ReplicationDBCursor> sortedCursors = null;
+ try
+ {
+ sortedCursors = collectAllCursorsWithChanges();
+ UpdateMsg msg = sortedCursors.first().getChange();
+ return msg.getChangeNumber();
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
+ finally
+ {
+ close(sortedCursors);
+ }
+ }
+
+ /**
+ * Collects all the replication DB cursors that have changes and sort them
+ * with the oldest {@link ChangeNumber} first.
+ *
+ * @return a List of cursors with changes sorted by their {@link ChangeNumber}
+ * (oldest first)
+ */
+ private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges()
+ {
+ final NavigableSet<ReplicationDBCursor> results =
+ new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator());
for (int serverId : replicationServerDomain.getServerIds())
{
// get the last already sent CN from that server to get a cursor
- final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
- addCursorIfNotEmpty(results,
- replicationServerDomain.getCursorFrom(serverId, lastCsn));
+ final ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
+ addCursorIfNotEmpty(results,
+ replicationServerDomain.getCursorFrom(serverId, lastCsn));
}
return results;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index c88e37c..3b0b472 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -49,7 +49,6 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
import org.opends.server.replication.server.changelog.je.DbHandler;
-import org.opends.server.replication.server.changelog.je.ReplicationDB;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -1263,17 +1262,17 @@
}
/**
- * Creates and returns a cursor. When the cursor is not used anymore, the
- * caller MUST call the {@link ReplicationDBCursor#close()} method to free the
- * resources and locks used by the cursor.
- *
- * @param serverId
- * Identifier of the server for which the cursor is created.
- * @param startAfterCN
- * Starting point for the cursor.
- * @return the created {@link ReplicationDB}. Null when no DB is available or
- * the DB is empty for the provided serverId .
- */
+ * Creates and returns a cursor. When the cursor is not used anymore, the
+ * caller MUST call the {@link ReplicationDBCursor#close()} method to free the
+ * resources and locks used by the cursor.
+ *
+ * @param serverId
+ * Identifier of the server for which the cursor is created.
+ * @param startAfterCN
+ * Starting point for the cursor.
+ * @return the created {@link ReplicationDBCursor}. Null when no DB is
+ * available or the DB is empty for the provided serverId .
+ */
public ReplicationDBCursor getCursorFrom(int serverId,
ChangeNumber startAfterCN)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java
index 74e69d4..2e126da 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java
@@ -38,24 +38,24 @@
{
/**
- * Get the UpdateMsg where the cursor is currently set.
- *
- * @return The UpdateMsg where the cursor is currently set.
- */
+ * Get the UpdateMsg where the cursor is currently set.
+ *
+ * @return The UpdateMsg where the cursor is currently set.
+ */
UpdateMsg getChange();
/**
- * Go to the next change in the ReplicationDB or in the server Queue.
- *
- * @return false if the cursor is already on the last change before this call.
- */
+ * Go to the next change in the ReplicationDB or in the server Queue.
+ *
+ * @return false if the cursor is already on the last change before this call.
+ */
boolean next();
/**
- * Release the resources and locks used by this cursor. This method must be
- * called when the cursor is no longer used. Failure to do it could cause DB
- * deadlock.
- */
+ * Release the resources and locks used by this cursor. This method must be
+ * called when the cursor is no longer used. Failure to do it could cause DB
+ * deadlock.
+ */
@Override
void close();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java
index a0d9069..a716210 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java
@@ -30,7 +30,6 @@
import java.util.Comparator;
import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.protocol.UpdateMsg;
/**
* This class defines a {@link Comparator} that allows to know which
@@ -41,14 +40,14 @@
implements Comparator<ReplicationDBCursor>
{
/**
- * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}.
- *
- * @param o1
- * first cursor.
- * @param o2
- * second cursor.
- * @return result of the comparison.
- */
+ * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}.
+ *
+ * @param o1
+ * first cursor.
+ * @param o2
+ * second cursor.
+ * @return result of the comparison.
+ */
@Override
public int compare(ReplicationDBCursor o1, ReplicationDBCursor o2)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
index e4f4dd8..ea2991b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -44,7 +44,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
-import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
+import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
@@ -260,18 +260,18 @@
}
/**
- * Generate a new {@link ReplicationDBCursor} that allows to browse the db
- * managed by this dbHandler and starting at the position defined by a given
- * changeNumber.
- *
- * @param startAfterCN
- * The position where the cursor must start.
- * @return a new {@link ReplicationDBCursor} that allows to browse the db
- * managed by this dbHandler and starting at the position defined by a
- * given changeNumber.
- * @throws ChangelogException
- * if a database problem happened.
- */
+ * Generate a new {@link ReplicationDBCursor} that allows to browse the db
+ * managed by this dbHandler and starting at the position defined by a given
+ * changeNumber.
+ *
+ * @param startAfterCN
+ * The position where the cursor must start.
+ * @return a new {@link ReplicationDBCursor} that allows to browse the db
+ * managed by this dbHandler and starting at the position defined by a
+ * given changeNumber.
+ * @throws ChangelogException
+ * if a database problem happened.
+ */
public ReplicationDBCursor generateCursorFrom(ChangeNumber startAfterCN)
throws ChangelogException
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java
index a096631..7f2c7af 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java
@@ -32,7 +32,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDBCursor;
-import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
+import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
/**
* Berkeley DB JE implementation of {@link ReplicationDBCursor}.
@@ -46,18 +46,18 @@
private ChangeNumber lastNonNullCurrentCN;
/**
- * Creates a new JEReplicationDBCursor. All created cursor must be released by
- * the caller using the {@link #close()} method.
- *
- * @param db
- * The db where the cursor must be created.
- * @param startAfterCN
- * The ChangeNumber after which the cursor must start.
- * @param dbHandler
- * The associated DbHandler.
- * @throws ChangelogException
- * if a database problem happened.
- */
+ * Creates a new JEReplicationDBCursor. All created cursor must be released by
+ * the caller using the {@link #close()} method.
+ *
+ * @param db
+ * The db where the cursor must be created.
+ * @param startAfterCN
+ * The ChangeNumber after which the cursor must start.
+ * @param dbHandler
+ * The associated DbHandler.
+ * @throws ChangelogException
+ * if a database problem happened.
+ */
public JEReplicationDBCursor(ReplicationDB db, ChangeNumber startAfterCN,
DbHandler dbHandler) throws ChangelogException
{
@@ -151,10 +151,10 @@
}
/**
- * Called by the Gc when the object is garbage collected Release the internal
- * cursor in case the cursor was badly used and {@link #close()} was never
- * called.
- */
+ * Called by the Gc when the object is garbage collected Release the internal
+ * cursor in case the cursor was badly used and {@link #close()} was never
+ * called.
+ */
@Override
protected void finalize()
{
--
Gitblit v1.10.0