From d454b5f9a2b7dc4ef2a70cd983a26436568cbe04 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 13:52:43 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 169 +++++++++++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 147 insertions(+), 22 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 21d4426..de85252 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,10 +27,7 @@
package org.opends.server.replication.server.changelog.je;
import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.opends.messages.Message;
@@ -57,6 +54,114 @@
{
/**
+ * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a
+ * replication domain, advancing from the oldest to the newest change cross
+ * all replicaDBs.
+ */
+ private final class CrossReplicaDBCursor implements ReplicaDBCursor
+ {
+
+ private UpdateMsg currentChange;
+ /**
+ * The cursors are sorted based on the current change of each cursor to
+ * consider the next change across all replicaDBs.
+ */
+ private final NavigableSet<ReplicaDBCursor> cursors =
+ new TreeSet<ReplicaDBCursor>();
+ private final DN baseDN;
+
+ public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
+ {
+ this.baseDN = baseDN;
+ for (int serverId : getDomainMap(baseDN).keySet())
+ {
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startAfterServerState.getCSN(serverId);
+ addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
+ }
+ }
+
+ private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
+ CSN startAfterCSN)
+ {
+ JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+ if (replicaDB != null)
+ {
+ try
+ {
+ ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
+ cursor.next();
+ return cursor;
+ }
+ catch (ChangelogException e)
+ {
+ // ignored
+ }
+ }
+ return EMPTY_CURSOR;
+ }
+
+ @Override
+ public boolean next()
+ {
+ if (cursors.isEmpty())
+ {
+ currentChange = null;
+ return false;
+ }
+
+ // To keep consistent the cursors' order in the SortedSet, it is necessary
+ // to remove and eventually add again a cursor (after moving it forward).
+ final ReplicaDBCursor cursor = cursors.pollFirst();
+ currentChange = cursor.getChange();
+ cursor.next();
+ addCursorIfNotEmpty(cursor);
+ return true;
+ }
+
+ void addCursorIfNotEmpty(ReplicaDBCursor cursor)
+ {
+ if (cursor.getChange() != null)
+ {
+ cursors.add(cursor);
+ }
+ else
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
+ @Override
+ public UpdateMsg getChange()
+ {
+ return currentChange;
+ }
+
+ @Override
+ public void close()
+ {
+ StaticUtils.close(cursors);
+ }
+
+ @Override
+ public int compareTo(ReplicaDBCursor o)
+ {
+ final CSN csn1 = getChange().getCSN();
+ final CSN csn2 = o.getChange().getCSN();
+
+ return CSN.compare(csn1, csn2);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " baseDN=" + baseDN
+ + " currentChange=" + currentChange + " open cursors=" + cursors;
+ }
+ }
+
+ /**
* This map contains the List of updates received from each LDAP server.
*/
private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
@@ -109,6 +214,12 @@
{
// empty
}
+
+ @Override
+ public String toString()
+ {
+ return "EmptyReplicaDBCursor";
+ }
};
/**
@@ -368,13 +479,6 @@
/** {@inheritDoc} */
@Override
- public Set<Integer> getDomainServerIds(DN baseDN)
- {
- return Collections.unmodifiableSet(getDomainMap(baseDN).keySet());
- }
-
- /** {@inheritDoc} */
- @Override
public long getCount(DN baseDN, int serverId, CSN from, CSN to)
{
JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
@@ -565,24 +669,45 @@
/** {@inheritDoc} */
@Override
- public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
- CSN startAfterCSN)
+ public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN)
{
- JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
- if (replicaDB != null)
+ // Builds a new serverState for all the serverIds in the replication domain
+ // to ensure we get cursors starting after the provided CSN.
+ return getCursorFrom(baseDN, buildServerState(baseDN, startAfterCSN));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReplicaDBCursor getCursorFrom(DN baseDN,
+ ServerState startAfterServerState)
+ {
+ return new CrossReplicaDBCursor(baseDN, startAfterServerState);
+ }
+
+ private ServerState buildServerState(DN baseDN, CSN startAfterCSN)
+ {
+ final ServerState result = new ServerState();
+ if (startAfterCSN == null)
{
- try
+ return result;
+ }
+
+ for (int serverId : getDomainMap(baseDN).keySet())
+ {
+ if (serverId == startAfterCSN.getServerId())
{
- ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
- cursor.next();
- return cursor;
+ // reuse the provided CSN one as it is the most accurate
+ result.update(startAfterCSN);
}
- catch (ChangelogException e)
+ else
{
- // ignored
+ // build a new CSN, ignoring the seqNum since it is irrelevant for
+ // a different serverId
+ final CSN csn = startAfterCSN; // only used for increased readability
+ result.update(new CSN(csn.getTime(), 0, serverId));
}
}
- return EMPTY_CURSOR;
+ return result;
}
/** {@inheritDoc} */
--
Gitblit v1.10.0