From 4936231f6b43a59233dc4ee909d9a2eeb3ced31a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 13:52:43 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/MessageHandler.java | 76 ++--------
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 169 +++++++++++++++++++++---
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java | 8 +
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 3
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java | 59 ++++----
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 45 +++--
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 47 ++++--
7 files changed, 254 insertions(+), 153 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index ed243c4..07f2371 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,7 +27,8 @@
*/
package org.opends.server.replication.server;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
@@ -294,22 +295,19 @@
* restart as usual
* load this change on the delayList
*/
- NavigableSet<ReplicaDBCursor> sortedCursors = null;
+ ReplicaDBCursor cursor = null;
try
{
- sortedCursors = collectAllCursorsWithChanges();
-
// fill the lateQueue
- while (!sortedCursors.isEmpty()
- && lateQueue.count() < 100
- && lateQueue.bytesCount() < 50000)
+ cursor = replicationServerDomain.getCursorFrom(serverState);
+ while (cursor.next() && isLateQueueBelowThreshold())
{
- lateQueue.add(nextOldestUpdateMsg(sortedCursors));
+ lateQueue.add(cursor.getChange());
}
}
finally
{
- close(sortedCursors);
+ close(cursor);
}
/*
@@ -403,34 +401,9 @@
return null;
}
- private UpdateMsg nextOldestUpdateMsg(
- NavigableSet<ReplicaDBCursor> sortedCursors)
+ private boolean isLateQueueBelowThreshold()
{
- /*
- * The cursors are sorted based on the currentChange of each cursor to
- * consider the next change across all servers.
- * To keep consistent the order of the cursors in the SortedSet,
- * it is necessary to remove and eventually add again a cursor (after moving
- * it forward).
- */
- final ReplicaDBCursor cursor = sortedCursors.pollFirst();
- final UpdateMsg result = cursor.getChange();
- cursor.next();
- addCursorIfNotEmpty(sortedCursors, cursor);
- return result;
- }
-
- private void addCursorIfNotEmpty(Collection<ReplicaDBCursor> cursors,
- ReplicaDBCursor cursor)
- {
- if (cursor.getChange() != null)
- {
- cursors.add(cursor);
- }
- else
- {
- close(cursor);
- }
+ return lateQueue.count() < 100 && lateQueue.bytesCount() < 50000;
}
/**
@@ -476,12 +449,12 @@
private CSN findOldestCSNFromReplicaDBs()
{
- SortedSet<ReplicaDBCursor> sortedCursors = null;
+ ReplicaDBCursor cursor = null;
try
{
- sortedCursors = collectAllCursorsWithChanges();
- UpdateMsg msg = sortedCursors.first().getChange();
- return msg.getCSN();
+ cursor = replicationServerDomain.getCursorFrom(serverState);
+ cursor.next();
+ return cursor.getChange().getCSN();
}
catch (Exception e)
{
@@ -489,32 +462,11 @@
}
finally
{
- close(sortedCursors);
+ close(cursor);
}
}
/**
- * Collects all the {@link ReplicaDBCursor}s that have changes and sort them
- * with the oldest {@link CSN} first.
- *
- * @return a List of cursors with changes sorted by their {@link CSN}
- * (oldest first)
- */
- private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges()
- {
- final NavigableSet<ReplicaDBCursor> results =
- new TreeSet<ReplicaDBCursor>();
- for (int serverId : replicationServerDomain.getServerIds())
- {
- // get the last already sent CSN from that server to get a cursor
- final CSN lastCsn = serverState.getCSN(serverId);
- addCursorIfNotEmpty(results,
- replicationServerDomain.getCursorFrom(serverId, lastCsn));
- }
- return results;
- }
-
- /**
* Get the count of updates sent to this server.
* @return The count of update sent to this server.
*/
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index 9c8c0c2..f8a9cac 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -79,8 +79,7 @@
* <p>
* Currently are only implemented the create and restore backup features.
*/
-public class ReplicationBackend
- extends Backend
+public class ReplicationBackend extends Backend
{
private static final String CHANGE_NUMBER = "replicationChangeNumber";
@@ -620,43 +619,41 @@
* Exports or returns all the changes from a ReplicationServerDomain coming
* after the CSN specified in the searchOperation.
*/
- private void writeChangesAfterCSN(ReplicationServerDomain rsd,
+ private void writeChangesAfterCSN(ReplicationServerDomain rsDomain,
final LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
SearchOperation searchOperation, final CSN previousCSN)
{
- for (int serverId : rsd.getServerIds())
+ if (exportConfig != null && exportConfig.isCancelled())
+ { // Abort if cancelled
+ return;
+ }
+
+ ReplicaDBCursor cursor = rsDomain.getCursorFrom(previousCSN);
+ try
{
- if (exportConfig != null && exportConfig.isCancelled())
- { // Abort if cancelled
- return;
- }
+ int lookthroughCount = 0;
- ReplicaDBCursor cursor = rsd.getCursorFrom(serverId, previousCSN);
- try
+ // Walk through the changes
+ cursor.next(); // first try to advance the cursor
+ while (cursor.getChange() != null)
{
- int lookthroughCount = 0;
-
- // Walk through the changes
- while (cursor.getChange() != null)
- {
- if (exportConfig != null && exportConfig.isCancelled())
- { // abort if cancelled
- return;
- }
- if (!canContinue(searchOperation, lookthroughCount))
- {
- break;
- }
- lookthroughCount++;
- writeChange(cursor.getChange(), ldifWriter, searchOperation,
- rsd.getBaseDN(), exportConfig != null);
- cursor.next();
+ if (exportConfig != null && exportConfig.isCancelled())
+ { // abort if cancelled
+ return;
}
+ if (!canContinue(searchOperation, lookthroughCount))
+ {
+ break;
+ }
+ lookthroughCount++;
+ writeChange(cursor.getChange(), ldifWriter, searchOperation,
+ rsDomain.getBaseDN(), exportConfig != null);
+ cursor.next();
}
- finally
- {
- close(cursor);
- }
+ }
+ finally
+ {
+ close(cursor);
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index fd57424..a3a9043 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1269,19 +1269,7 @@
}
/**
- * Returns a set containing the serverIds that produced updates and known by
- * this replicationServer from all over the topology, whether directly
- * connected or connected to another RS.
- *
- * @return a set containing the serverIds known by this replicationServer.
- */
- public Set<Integer> getServerIds()
- {
- return domainDB.getDomainServerIds(baseDN);
- }
-
- /**
- * Creates and returns a cursor.
+ * Creates and returns a cursor across this replication domain.
* <p>
* Client code must call {@link ReplicaDBCursor#next()} to advance the cursor
* to the next available record.
@@ -1290,16 +1278,35 @@
* {@link ReplicaDBCursor#close()} method to free the resources and locks used
* by the cursor.
*
- * @param serverId
- * Identifier of the server for which the cursor is created
* @param startAfterCSN
* Starting point for the cursor. If null, start from the oldest CSN
* @return a non null {@link ReplicaDBCursor}
- * @see ReplicationDomainDB#getCursorFrom(DN, int, CSN)
+ * @see ReplicationDomainDB#getCursorFrom(DN, CSN)
*/
- public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
+ public ReplicaDBCursor getCursorFrom(CSN startAfterCSN)
{
- return domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+ return domainDB.getCursorFrom(baseDN, startAfterCSN);
+ }
+
+ /**
+ * Creates and returns a cursor across this replication domain.
+ * <p>
+ * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor
+ * to the next available record.
+ * <p>
+ * When the cursor is not used anymore, client code MUST call the
+ * {@link ReplicaDBCursor#close()} method to free the resources and locks used
+ * by the cursor.
+ *
+ * @param startAfterServerState
+ * Starting point for the replicaDB cursors. If null, start from the
+ * oldest CSN
+ * @return a non null {@link ReplicaDBCursor} going from oldest to newest CSN
+ * @see ReplicationDomainDB#getCursorFrom(DN, ServerState)
+ */
+ public ReplicaDBCursor getCursorFrom(ServerState startAfterServerState)
+ {
+ return domainDB.getCursorFrom(baseDN, startAfterServerState);
}
/**
@@ -2720,7 +2727,7 @@
*/
public void storeReceivedCTHeartbeat(CSN csn)
{
- // TODO:May be we can spare processing by only storing CSN (timestamp)
+ // TODO:Maybe we can spare processing by only storing CSN (timestamp)
// instead of a server state.
getChangeTimeHeartbeatState().update(csn);
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 8e5b22f..bc57fbe 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -26,8 +26,6 @@
*/
package org.opends.server.replication.server.changelog.api;
-import java.util.Set;
-
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -41,16 +39,6 @@
{
/**
- * Returns the serverIds for the servers that are or have been part of the
- * provided replication domain.
- *
- * @param baseDN
- * the replication domain baseDN
- * @return an unmodifiable set of integers holding the serverIds
- */
- Set<Integer> getDomainServerIds(DN baseDN);
-
- /**
* Get the number of changes for the specified replication domain.
*
* @param baseDN
@@ -171,8 +159,9 @@
long getCount(DN baseDN, int serverId, CSN from, CSN to);
/**
- * Generates a {@link ReplicaDBCursor} for the specified serverId and
- * replication domain starting after the provided CSN.
+ * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the
+ * specified replication domain, with all cursors starting after the provided
+ * CSN.
* <p>
* The cursor is already advanced to the record after startAfterCSN.
* <p>
@@ -182,13 +171,35 @@
*
* @param baseDN
* the replication domain baseDN
- * @param serverId
- * Identifier of the server for which the cursor is created
* @param startAfterCSN
- * Starting point for the cursor. If null, start from the oldest CSN
+ * Starting point for each ReplicaDB cursor. If null, start from the
+ * oldest CSN for each ReplicaDB cursor.
* @return a non null {@link ReplicaDBCursor}
+ * @see #getCursorFrom(DN, ServerState)
*/
- ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN);
+ ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN);
+
+ /**
+ * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the
+ * specified replication domain starting after the provided
+ * {@link ServerState} for each replicaDBs.
+ * <p>
+ * The cursor is already advanced to the records after the serverState.
+ * <p>
+ * When the cursor is not used anymore, client code MUST call the
+ * {@link ReplicaDBCursor#close()} method to free the resources and locks used
+ * by the cursor.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @param startAfterServerState
+ * Starting point for each ReplicaDB cursor. If any CSN for a
+ * replicaDB is null, then start from the oldest CSN for this
+ * replicaDB
+ * @return a non null {@link ReplicaDBCursor}
+ * @see #getCursorFrom(DN, CSN)
+ */
+ ReplicaDBCursor getCursorFrom(DN baseDN, ServerState startAfterServerState);
/**
* for the specified serverId and replication domain.
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 21d4426..de85252 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,10 +27,7 @@
package org.opends.server.replication.server.changelog.je;
import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.opends.messages.Message;
@@ -57,6 +54,114 @@
{
/**
+ * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a
+ * replication domain, advancing from the oldest to the newest change cross
+ * all replicaDBs.
+ */
+ private final class CrossReplicaDBCursor implements ReplicaDBCursor
+ {
+
+ private UpdateMsg currentChange;
+ /**
+ * The cursors are sorted based on the current change of each cursor to
+ * consider the next change across all replicaDBs.
+ */
+ private final NavigableSet<ReplicaDBCursor> cursors =
+ new TreeSet<ReplicaDBCursor>();
+ private final DN baseDN;
+
+ public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
+ {
+ this.baseDN = baseDN;
+ for (int serverId : getDomainMap(baseDN).keySet())
+ {
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startAfterServerState.getCSN(serverId);
+ addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
+ }
+ }
+
+ private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
+ CSN startAfterCSN)
+ {
+ JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+ if (replicaDB != null)
+ {
+ try
+ {
+ ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
+ cursor.next();
+ return cursor;
+ }
+ catch (ChangelogException e)
+ {
+ // ignored
+ }
+ }
+ return EMPTY_CURSOR;
+ }
+
+ @Override
+ public boolean next()
+ {
+ if (cursors.isEmpty())
+ {
+ currentChange = null;
+ return false;
+ }
+
+ // To keep consistent the cursors' order in the SortedSet, it is necessary
+ // to remove and eventually add again a cursor (after moving it forward).
+ final ReplicaDBCursor cursor = cursors.pollFirst();
+ currentChange = cursor.getChange();
+ cursor.next();
+ addCursorIfNotEmpty(cursor);
+ return true;
+ }
+
+ void addCursorIfNotEmpty(ReplicaDBCursor cursor)
+ {
+ if (cursor.getChange() != null)
+ {
+ cursors.add(cursor);
+ }
+ else
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+ @Override
+ public UpdateMsg getChange()
+ {
+ return currentChange;
+ }
+
+ @Override
+ public void close()
+ {
+ StaticUtils.close(cursors);
+ }
+
+ @Override
+ public int compareTo(ReplicaDBCursor o)
+ {
+ final CSN csn1 = getChange().getCSN();
+ final CSN csn2 = o.getChange().getCSN();
+
+ return CSN.compare(csn1, csn2);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " baseDN=" + baseDN
+ + " currentChange=" + currentChange + " open cursors=" + cursors;
+ }
+ }
+
+ /**
* This map contains the List of updates received from each LDAP server.
*/
private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
@@ -109,6 +214,12 @@
{
// empty
}
+
+ @Override
+ public String toString()
+ {
+ return "EmptyReplicaDBCursor";
+ }
};
/**
@@ -368,13 +479,6 @@
/** {@inheritDoc} */
@Override
- public Set<Integer> getDomainServerIds(DN baseDN)
- {
- return Collections.unmodifiableSet(getDomainMap(baseDN).keySet());
- }
-
- /** {@inheritDoc} */
- @Override
public long getCount(DN baseDN, int serverId, CSN from, CSN to)
{
JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
@@ -565,24 +669,45 @@
/** {@inheritDoc} */
@Override
- public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
- CSN startAfterCSN)
+ public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN)
{
- JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
- if (replicaDB != null)
+ // Builds a new serverState for all the serverIds in the replication domain
+ // to ensure we get cursors starting after the provided CSN.
+ return getCursorFrom(baseDN, buildServerState(baseDN, startAfterCSN));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReplicaDBCursor getCursorFrom(DN baseDN,
+ ServerState startAfterServerState)
+ {
+ return new CrossReplicaDBCursor(baseDN, startAfterServerState);
+ }
+
+ private ServerState buildServerState(DN baseDN, CSN startAfterCSN)
+ {
+ final ServerState result = new ServerState();
+ if (startAfterCSN == null)
{
- try
+ return result;
+ }
+
+ for (int serverId : getDomainMap(baseDN).keySet())
+ {
+ if (serverId == startAfterCSN.getServerId())
{
- ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
- cursor.next();
- return cursor;
+ // reuse the provided CSN one as it is the most accurate
+ result.update(startAfterCSN);
}
- catch (ChangelogException e)
+ else
{
- // ignored
+ // build a new CSN, ignoring the seqNum since it is irrelevant for
+ // a different serverId
+ final CSN csn = startAfterCSN; // only used for increased readability
+ result.update(new CSN(csn.getTime(), 0, serverId));
}
}
- return EMPTY_CURSOR;
+ return result;
}
/** {@inheritDoc} */
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index 4cf8caa..2424239 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -584,7 +584,8 @@
@Override
public String toString()
{
- return baseDN + " " + serverId + " " + oldestCSN + " " + newestCSN;
+ return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
+ + oldestCSN + " " + newestCSN;
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index f2ecd4d..03355df 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -166,4 +166,12 @@
return CSN.compare(csn1, csn2);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " currentChange=" + currentChange + ""
+ + replicaDB;
+ }
}
--
Gitblit v1.10.0