From d24edab25bfe93a3e3ea4eb38fb9860cef7aa034 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 13 Nov 2013 14:06:28 +0000
Subject: [PATCH] OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 128 +++++++-----------------------------------
1 files changed, 23 insertions(+), 105 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 30bdeff..fdc8f3e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -61,110 +61,6 @@
protected static final DebugTracer TRACER = getTracer();
/**
- * {@link DBCursor} implementation that iterates across all the ReplicaDBs of
- * a replication domain, advancing from the oldest to the newest change cross
- * all replicaDBs.
- */
- private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg>
- {
-
- private final DN baseDN;
- private UpdateMsg currentChange;
- /**
- * The cursors are sorted based on the current change of each cursor to
- * consider the next change across all replicaDBs.
- */
- private final NavigableSet<DBCursor<UpdateMsg>> cursors =
- new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
- {
-
- @Override
- public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
- {
- final CSN csn1 = o1.getRecord().getCSN();
- final CSN csn2 = o2.getRecord().getCSN();
- return CSN.compare(csn1, csn2);
- }
- });
-
- public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
- throws ChangelogException
- {
- this.baseDN = baseDN;
- for (int serverId : getDomainMap(baseDN).keySet())
- {
- // get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterServerState.getCSN(serverId);
- addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
- }
- }
-
- private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
- CSN startAfterCSN) throws ChangelogException
- {
- JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
- if (replicaDB != null)
- {
- DBCursor<UpdateMsg> cursor =
- replicaDB.generateCursorFrom(startAfterCSN);
- cursor.next();
- return cursor;
- }
- return EMPTY_CURSOR;
- }
-
- @Override
- public boolean next() throws ChangelogException
- {
- if (cursors.isEmpty())
- {
- currentChange = null;
- return false;
- }
-
- // To keep consistent the cursors' order in the SortedSet, it is necessary
- // to remove and eventually add again a cursor (after moving it forward).
- final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
- currentChange = cursor.getRecord();
- cursor.next();
- addCursorIfNotEmpty(cursor);
- return true;
- }
-
- void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor)
- {
- if (cursor.getRecord() != null)
- {
- cursors.add(cursor);
- }
- else
- {
- StaticUtils.close(cursor);
- }
- }
-
- @Override
- public UpdateMsg getRecord()
- {
- return currentChange;
- }
-
- @Override
- public void close()
- {
- StaticUtils.close(cursors);
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return getClass().getSimpleName() + " baseDN=" + baseDN
- + " currentChange=" + currentChange + " open cursors=" + cursors;
- }
- }
-
- /**
* This map contains the List of updates received from each LDAP server.
* <p>
* When removing a domainMap, code:
@@ -790,7 +686,29 @@
public DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
ServerState startAfterServerState) throws ChangelogException
{
- return new CrossReplicaDBCursor(baseDN, startAfterServerState);
+ final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
+ final List<DBCursor<UpdateMsg>> cursors =
+ new ArrayList<DBCursor<UpdateMsg>>(serverIds.size());
+ for (int serverId : serverIds)
+ {
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startAfterServerState.getCSN(serverId);
+ cursors.add(getCursorFrom(baseDN, serverId, lastCSN));
+ }
+ return new CompositeDBCursor(cursors);
+ }
+
+ private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
+ CSN startAfterCSN) throws ChangelogException
+ {
+ JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+ if (replicaDB != null)
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
+ cursor.next();
+ return cursor;
+ }
+ return EMPTY_CURSOR;
}
private ServerState buildServerState(DN baseDN, CSN startAfterCSN)
--
Gitblit v1.10.0