From d24edab25bfe93a3e3ea4eb38fb9860cef7aa034 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 13 Nov 2013 14:06:28 +0000
Subject: [PATCH] OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 128 ++++---------------------
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 144 ++++++++++++++++++++++++++++
2 files changed, 167 insertions(+), 105 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
new file mode 100644
index 0000000..75eb518
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -0,0 +1,144 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.*;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.util.StaticUtils;
+
+/**
+ * {@link DBCursor} implementation that iterates across a Collection of
+ * {@link DBCursor}s, advancing from the oldest to the newest change cross all
+ * cursors.
+ */
+final class CompositeDBCursor implements DBCursor<UpdateMsg>
+{
+
+ private UpdateMsg currentChange;
+ private final List<DBCursor<UpdateMsg>> exhaustedCursors =
+ new ArrayList<DBCursor<UpdateMsg>>();
+ /**
+ * The cursors are sorted based on the current change of each cursor to
+ * consider the next change across all available cursors.
+ */
+ private final NavigableSet<DBCursor<UpdateMsg>> cursors =
+ new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
+ {
+ @Override
+ public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
+ {
+ final CSN csn1 = o1.getRecord().getCSN();
+ final CSN csn2 = o2.getRecord().getCSN();
+ return CSN.compare(csn1, csn2);
+ }
+ });
+
+ /**
+ * Builds a CompositeDBCursor using the provided collection of cursors.
+ *
+ * @param cursors
+ * the cursors that will be iterated upon.
+ */
+ public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors)
+ {
+ for (DBCursor<UpdateMsg> cursor : cursors)
+ {
+ add(cursor);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ // try to recycle empty cursors in case the underlying ReplicaDBs received
+ // new changes
+ for (Iterator<DBCursor<UpdateMsg>> iter = exhaustedCursors.iterator(); iter
+ .hasNext();)
+ {
+ DBCursor<UpdateMsg> cursor = iter.next();
+ iter.remove();
+ cursor.next();
+ add(cursor);
+ }
+
+ if (cursors.isEmpty())
+ {
+ // no cursors are left with changes.
+ currentChange = null;
+ return false;
+ }
+
+ // To keep consistent the cursors' order in the SortedSet, it is necessary
+ // to remove and eventually add again a cursor (after moving it forward).
+ final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
+ currentChange = cursor.getRecord();
+ cursor.next();
+ add(cursor);
+ return true;
+ }
+
+ private void add(DBCursor<UpdateMsg> cursor)
+ {
+ if (cursor.getRecord() != null)
+ {
+ this.cursors.add(cursor);
+ }
+ else
+ {
+ this.exhaustedCursors.add(cursor);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return currentChange;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ StaticUtils.close(cursors);
+ StaticUtils.close(exhaustedCursors);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " currentChange=" + currentChange
+ + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors;
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 30bdeff..fdc8f3e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -61,110 +61,6 @@
protected static final DebugTracer TRACER = getTracer();
/**
- * {@link DBCursor} implementation that iterates across all the ReplicaDBs of
- * a replication domain, advancing from the oldest to the newest change cross
- * all replicaDBs.
- */
- private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg>
- {
-
- private final DN baseDN;
- private UpdateMsg currentChange;
- /**
- * The cursors are sorted based on the current change of each cursor to
- * consider the next change across all replicaDBs.
- */
- private final NavigableSet<DBCursor<UpdateMsg>> cursors =
- new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
- {
-
- @Override
- public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
- {
- final CSN csn1 = o1.getRecord().getCSN();
- final CSN csn2 = o2.getRecord().getCSN();
- return CSN.compare(csn1, csn2);
- }
- });
-
- public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
- throws ChangelogException
- {
- this.baseDN = baseDN;
- for (int serverId : getDomainMap(baseDN).keySet())
- {
- // get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterServerState.getCSN(serverId);
- addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN));
- }
- }
-
- private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
- CSN startAfterCSN) throws ChangelogException
- {
- JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
- if (replicaDB != null)
- {
- DBCursor<UpdateMsg> cursor =
- replicaDB.generateCursorFrom(startAfterCSN);
- cursor.next();
- return cursor;
- }
- return EMPTY_CURSOR;
- }
-
- @Override
- public boolean next() throws ChangelogException
- {
- if (cursors.isEmpty())
- {
- currentChange = null;
- return false;
- }
-
- // To keep consistent the cursors' order in the SortedSet, it is necessary
- // to remove and eventually add again a cursor (after moving it forward).
- final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
- currentChange = cursor.getRecord();
- cursor.next();
- addCursorIfNotEmpty(cursor);
- return true;
- }
-
- void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor)
- {
- if (cursor.getRecord() != null)
- {
- cursors.add(cursor);
- }
- else
- {
- StaticUtils.close(cursor);
- }
- }
-
- @Override
- public UpdateMsg getRecord()
- {
- return currentChange;
- }
-
- @Override
- public void close()
- {
- StaticUtils.close(cursors);
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return getClass().getSimpleName() + " baseDN=" + baseDN
- + " currentChange=" + currentChange + " open cursors=" + cursors;
- }
- }
-
- /**
* This map contains the List of updates received from each LDAP server.
* <p>
* When removing a domainMap, code:
@@ -790,7 +686,29 @@
public DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
ServerState startAfterServerState) throws ChangelogException
{
- return new CrossReplicaDBCursor(baseDN, startAfterServerState);
+ final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
+ final List<DBCursor<UpdateMsg>> cursors =
+ new ArrayList<DBCursor<UpdateMsg>>(serverIds.size());
+ for (int serverId : serverIds)
+ {
+ // get the last already sent CSN from that server to get a cursor
+ final CSN lastCSN = startAfterServerState.getCSN(serverId);
+ cursors.add(getCursorFrom(baseDN, serverId, lastCSN));
+ }
+ return new CompositeDBCursor(cursors);
+ }
+
+ private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
+ CSN startAfterCSN) throws ChangelogException
+ {
+ JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+ if (replicaDB != null)
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
+ cursor.next();
+ return cursor;
+ }
+ return EMPTY_CURSOR;
}
private ServerState buildServerState(DN baseDN, CSN startAfterCSN)
--
Gitblit v1.10.0