From c5135432faf9bbbcd496ea160d59755fba31012c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 21 Nov 2013 16:17:00 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java | 88 ++++
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java | 4
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 376 ++++++++++++++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 344 +++++++++++++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java | 145 +++---
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java | 65 ++
opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java | 34 +
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 92 ++-
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 25 +
9 files changed, 1,045 insertions(+), 128 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index 7e2cea2..537cc34 100644
--- a/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -69,8 +69,7 @@
* @param mdss The provided string representation of the state.
* @throws DirectoryException when the string has an invalid format
*/
- public MultiDomainServerState(String mdss)
- throws DirectoryException
+ public MultiDomainServerState(String mdss) throws DirectoryException
{
list = splitGenStateToServerStates(mdss);
}
@@ -105,18 +104,13 @@
synchronized(this)
{
- int serverId = csn.getServerId();
ServerState oldServerState = list.get(baseDN);
if (oldServerState == null)
- oldServerState = new ServerState();
-
- if (csn.isNewerThan(oldServerState.getCSN(serverId)))
{
- oldServerState.update(csn);
+ oldServerState = new ServerState();
list.put(baseDN, oldServerState);
- return true;
}
- return false;
+ return oldServerState.update(csn);
}
}
@@ -132,7 +126,24 @@
*/
public void update(DN baseDN, ServerState serverState)
{
- list.put(baseDN, serverState);
+ for (CSN csn : serverState)
+ {
+ update(baseDN, csn);
+ }
+ }
+
+ /**
+ * Update the current object with the provided multi domain server state.
+ *
+ * @param state
+ * The provided multi domain server state.
+ */
+ public void update(MultiDomainServerState state)
+ {
+ for (Entry<DN, ServerState> entry : state.list.entrySet())
+ {
+ update(entry.getKey(), entry.getValue());
+ }
}
/**
@@ -142,7 +153,7 @@
@Override
public String toString()
{
- StringBuilder res = new StringBuilder();
+ final StringBuilder res = new StringBuilder();
if (list != null && !list.isEmpty())
{
for (Entry<DN, ServerState> entry : list.entrySet())
@@ -163,7 +174,6 @@
buffer.append(this);
}
-
/**
* Tests if the state is empty.
*
@@ -182,6 +192,19 @@
}
/**
+ * Returns the ServerState associated to the provided replication domain's
+ * baseDN.
+ *
+ * @param baseDN
+ * the replication domain's baseDN
+ * @return the associated ServerState
+ */
+ public ServerState get(DN baseDN)
+ {
+ return list.get(baseDN);
+ }
+
+ /**
* Test if this object equals the provided other object.
* @param other The other object with which we want to test equality.
* @return Returns True if this equals other, else return false.
@@ -211,6 +234,22 @@
}
/**
+ * Test if this object covers the provided CSN for the provided baseDN.
+ *
+ * @param baseDN
+ * The provided baseDN.
+ * @param csn
+ * The provided CSN.
+ * @return true when this object covers the provided CSN for the provided
+ * baseDN.
+ */
+ public boolean cover(DN baseDN, CSN csn)
+ {
+ final ServerState state = list.get(baseDN);
+ return state != null && state.cover(csn);
+ }
+
+ /**
* Splits the provided generalizedServerState being a String with the
* following syntax: "domain1:state1;domain2:state2;..." to a Map of (domain
* DN, domain ServerState).
@@ -224,7 +263,7 @@
public static Map<DN, ServerState> splitGenStateToServerStates(
String multiDomainServerState) throws DirectoryException
{
- Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
+ final Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
if (multiDomainServerState != null && multiDomainServerState.length() > 0)
{
try
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 39c637e..b92dc07 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -210,6 +210,31 @@
ServerState startAfterServerState) throws ChangelogException;
/**
+ * Generates a {@link DBCursor} for one replicaDB for the specified
+ * replication domain and serverId starting after the provided {@link CSN}.
+ * <p>
+ * The cursor is already advanced to the records after the CSN.
+ * <p>
+ * When the cursor is not used anymore, client code MUST call the
+ * {@link DBCursor#close()} method to free the resources and locks used by the
+ * cursor.
+ *
+ * @param baseDN
+ * the replication domain baseDN of the replicaDB
+ * @param serverId
+ * the serverId of the replicaDB
+ * @param startAfterCSN
+ * Starting point for the ReplicaDB cursor. If the CSN is null, then
+ * start from the oldest CSN for this replicaDB
+ * @return a non null {@link DBCursor}
+ * @throws ChangelogException
+ * If a database problem happened
+ * @see #getCursorFrom(DN, CSN)
+ */
+ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN)
+ throws ChangelogException;
+
+ /**
* Publishes the provided change to the changelog DB for the specified
* serverId and replication domain. After a change has been successfully
* published, it becomes available to be returned by the External ChangeLog.
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
new file mode 100644
index 0000000..c82bc4e
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -0,0 +1,376 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DirectoryException;
+
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
+/**
+ * Thread responsible for inserting replicated changes into the ChangeNumber
+ * Index DB (CNIndexDB for short). Only changes older than the medium
+ * consistency point are inserted in the CNIndexDB. As a consequence this class
+ * is also responsible for maintaining the medium consistency point.
+ */
+public class ChangeNumberIndexer extends DirectoryThread
+{
+ /** The tracer object for the debug logger. */
+ private static final DebugTracer TRACER = getTracer();
+
+ private final ChangelogDB changelogDB;
+ /** Only used for initialization, and then discarded. */
+ private ChangelogState changelogState;
+
+ /*
+ * previousCookie and mediumConsistencyPoint must be thread safe, because
+ * 1) initialization can happen while the replication server starts receiving
+ * updates 2) many updates can happen concurrently. This solution also avoids
+ * using a queue that could fill up before we have consumed all its content.
+ */
+ /**
+ * Stores the value of the cookie before the change currently processed is
+ * inserted in the DB. After insert, it is updated with the CSN of the change
+ * currently processed (thus becoming the "current" cookie just before the
+ * change is returned.
+ */
+ private final MultiDomainServerState previousCookie =
+ new MultiDomainServerState();
+
+ /**
+ * Holds the medium consistency point for the current replication server.
+ *
+ * @see <a href=
+ * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
+ * >OpenDJ Domain Names for a description of what the medium consistency point
+ * is</a>
+ */
+ private final MultiDomainServerState mediumConsistencyPoint =
+ new MultiDomainServerState();
+
+ /**
+ * Composite cursor across all the replicaDBs for all the replication domains.
+ * It is volatile to ensure it supports concurrent update. Each time it is
+ * used more than once in a method, the method must take a local copy to
+ * ensure the cursor does not get updated in the middle of the method.
+ */
+ private volatile CompositeDBCursor<DN> crossDomainDBCursor;
+
+ /**
+ * New cursors for this Map must be created from the {@link #run()} method,
+ * i.e. from the same thread that will make use of them. If this rule is not
+ * obeyed, then a JE exception will be thrown about
+ * "Non-transactional Cursors may not be used in multiple threads;".
+ */
+ private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
+ new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
+ /** This map can be updated by multiple threads. */
+ private ConcurrentMap<Integer, DN> newCursors =
+ new ConcurrentSkipListMap<Integer, DN>();
+
+ /**
+ * Builds a ChangeNumberIndexer object.
+ *
+ * @param changelogDB
+ * the changelogDB
+ * @param changelogState
+ * the changelog state used for initialization
+ */
+ ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
+ {
+ super("Change number indexer");
+ this.changelogDB = changelogDB;
+ this.changelogState = changelogState;
+ }
+
+ /**
+ * Ensures the medium consistency point is updated by heartbeats.
+ *
+ * @param baseDN
+ * the baseDN of the domain for which the heartbeat is published
+ * @param heartbeatCSN
+ * the CSN coming from the heartbeat
+ */
+ public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
+ {
+ mediumConsistencyPoint.update(baseDN, heartbeatCSN);
+ final CompositeDBCursor<DN> localCursor = crossDomainDBCursor;
+ final DN changeBaseDN = localCursor.getData();
+ final CSN changeCSN = localCursor.getRecord().getCSN();
+ tryNotify(changeBaseDN, changeCSN);
+ }
+
+ /**
+ * Ensures the medium consistency point is updated by UpdateMsg.
+ *
+ * @param baseDN
+ * the baseDN of the domain for which the heartbeat is published
+ * @param updateMsg
+ * the updateMsg that will update the medium consistency point
+ * @throws ChangelogException
+ * If a database problem happened
+ */
+ public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
+ throws ChangelogException
+ {
+ final CSN csn = updateMsg.getCSN();
+ mediumConsistencyPoint.update(baseDN, csn);
+ newCursors.put(csn.getServerId(), baseDN);
+ tryNotify(baseDN, csn);
+ }
+
+ /**
+ * Notifies the Change number indexer thread if it will be able to do some
+ * work.
+ */
+ private void tryNotify(final DN baseDN, final CSN csn)
+ {
+ if (mediumConsistencyPoint.cover(baseDN, csn))
+ {
+ synchronized (this)
+ {
+ notify();
+ }
+ }
+ }
+
+ private void initialize() throws ChangelogException, DirectoryException
+ {
+ final ChangeNumberIndexRecord newestRecord =
+ changelogDB.getChangeNumberIndexDB().getNewestRecord();
+ if (newestRecord != null)
+ {
+ previousCookie.update(
+ new MultiDomainServerState(newestRecord.getPreviousCookie()));
+ }
+
+ // initialize the cross domain DB cursor
+ final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
+ for (Entry<DN, List<Integer>> entry
+ : changelogState.getDomainToServerIds().entrySet())
+ {
+ final DN baseDN = entry.getKey();
+ for (Integer serverId : entry.getValue())
+ {
+ final ServerState previousSS = previousCookie.get(baseDN);
+ final CSN csn = previousSS != null ? previousSS.getCSN(serverId) : null;
+ ensureCursorExists(baseDN, serverId, csn);
+ }
+
+ ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
+ mediumConsistencyPoint.update(baseDN, latestKnownState);
+ }
+
+ crossDomainDBCursor = newCompositeDBCursor();
+ if (newestRecord != null)
+ {
+ // restore the "previousCookie" state before shutdown
+ final UpdateMsg record = crossDomainDBCursor.getRecord();
+ if (!record.getCSN().equals(newestRecord.getCSN()))
+ {
+ // TODO JNR remove
+ throw new RuntimeException("They do not equal! recordCSN="
+ + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN());
+ }
+ // TODO JNR is it possible to use the following line instead?
+ // previousCookie.update(newestRecord.getBaseDN(), record.getCSN());
+ // TODO JNR would this mean updating the if above?
+ previousCookie.update(crossDomainDBCursor.getData(), record.getCSN());
+ crossDomainDBCursor.next();
+ }
+
+ // this will not be used any more. Discard for garbage collection.
+ this.changelogState = null;
+ }
+
+ private CompositeDBCursor<DN> newCompositeDBCursor() throws ChangelogException
+ {
+ final Map<DBCursor<UpdateMsg>, DN> cursors =
+ new HashMap<DBCursor<UpdateMsg>, DN>();
+ for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry
+ : this.allCursors.entrySet())
+ {
+ for (Entry<Integer, DBCursor<UpdateMsg>> entry2
+ : entry.getValue().entrySet())
+ {
+ cursors.put(entry2.getValue(), entry.getKey());
+ }
+ }
+ final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors);
+ result.next();
+ return result;
+ }
+
+ private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn)
+ throws ChangelogException
+ {
+ Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN);
+ if (map == null)
+ {
+ map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>();
+ allCursors.put(baseDN, map);
+ }
+ DBCursor<UpdateMsg> cursor = map.get(serverId);
+ if (cursor == null)
+ {
+ final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
+ cursor = domainDB.getCursorFrom(baseDN, serverId, csn);
+ map.put(serverId, cursor);
+ return false;
+ }
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run()
+ {
+ try
+ {
+ /*
+ * initialize here to allow fast application start up and avoid errors due
+ * cursors being created in a different thread to the one where they are
+ * used.
+ */
+ initialize();
+ }
+ catch (DirectoryException e)
+ {
+ // TODO JNR error message i18n
+ if (debugEnabled())
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ return;
+ }
+ catch (ChangelogException e)
+ {
+ // TODO Auto-generated catch block
+ if (debugEnabled())
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ return;
+ }
+
+ while (!isShutdownInitiated())
+ {
+ try
+ {
+ createNewCursors();
+
+ final UpdateMsg msg = crossDomainDBCursor.getRecord();
+ if (msg == null)
+ {
+ synchronized (this)
+ {
+ wait();
+ }
+ // advance cursor, success/failure will be checked later
+ crossDomainDBCursor.next();
+ // loop to check whether new changes have been added to the ReplicaDBs
+ continue;
+ }
+
+ final CSN csn = msg.getCSN();
+ final DN baseDN = crossDomainDBCursor.getData();
+ // FIXME problem: what if the serverId is not part of the ServerState?
+ // right now, thread will be blocked
+ if (!mediumConsistencyPoint.cover(baseDN, csn))
+ {
+ // the oldest record to insert is newer than the medium consistency
+ // point. Let's wait for a change that can be published.
+ synchronized (this)
+ {
+ // double check to protect against a missed call to notify()
+ if (!mediumConsistencyPoint.cover(baseDN, csn))
+ {
+ wait();
+ // loop to check if changes older than the medium consistency
+ // point have been added to the ReplicaDBs
+ continue;
+ }
+ }
+ }
+
+ // OK, the oldest change is older than the medium consistency point
+ // let's publish it to the CNIndexDB
+ final ChangeNumberIndexRecord record =
+ new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn);
+ changelogDB.getChangeNumberIndexDB().addRecord(record);
+ // update, so it becomes the previous cookie for the next change
+ previousCookie.update(baseDN, csn);
+
+ // advance cursor, success/failure will be checked later
+ crossDomainDBCursor.next();
+ }
+ catch (ChangelogException e)
+ {
+ if (debugEnabled())
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ // TODO JNR error message i18n
+ }
+ catch (InterruptedException ignored)
+ {
+ // was shutdown called?
+ }
+ }
+ }
+
+ private void createNewCursors() throws ChangelogException
+ {
+ if (!newCursors.isEmpty())
+ {
+ boolean newCursorAdded = false;
+ for (Iterator<Entry<Integer, DN>> iter = newCursors.entrySet().iterator();
+ iter.hasNext();)
+ {
+ final Entry<Integer, DN> entry = iter.next();
+ if (!ensureCursorExists(entry.getValue(), entry.getKey(), null))
+ {
+ newCursorAdded = true;
+ }
+ iter.remove();
+ }
+ if (newCursorAdded)
+ {
+ crossDomainDBCursor = newCompositeDBCursor();
+ }
+ }
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 7f8e3c3..2e0e7c7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -27,6 +27,7 @@
package org.opends.server.replication.server.changelog.je;
import java.util.*;
+import java.util.Map.Entry;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -38,28 +39,33 @@
* {@link DBCursor} implementation that iterates across a Collection of
* {@link DBCursor}s, advancing from the oldest to the newest change cross all
* cursors.
+ *
+ * @param <Data>
+ * The type of data associated with each cursor
*/
-final class CompositeDBCursor implements DBCursor<UpdateMsg>
+final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
- private UpdateMsg currentChange;
- private final List<DBCursor<UpdateMsg>> exhaustedCursors =
- new ArrayList<DBCursor<UpdateMsg>>();
+ private UpdateMsg currentRecord;
+ private Data currentData;
+ private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
+ new HashMap<DBCursor<UpdateMsg>, Data>();
/**
* The cursors are sorted based on the current change of each cursor to
* consider the next change across all available cursors.
*/
- private final NavigableSet<DBCursor<UpdateMsg>> cursors =
- new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
- {
- @Override
- public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
- {
- final CSN csn1 = o1.getRecord().getCSN();
- final CSN csn2 = o2.getRecord().getCSN();
- return CSN.compare(csn1, csn2);
- }
- });
+ private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
+ new TreeMap<DBCursor<UpdateMsg>, Data>(
+ new Comparator<DBCursor<UpdateMsg>>()
+ {
+ @Override
+ public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
+ {
+ final CSN csn1 = o1.getRecord().getCSN();
+ final CSN csn2 = o2.getRecord().getCSN();
+ return CSN.compare(csn1, csn2);
+ }
+ });
/**
* Builds a CompositeDBCursor using the provided collection of cursors.
@@ -67,11 +73,11 @@
* @param cursors
* the cursors that will be iterated upon.
*/
- public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors)
+ public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
{
- for (DBCursor<UpdateMsg> cursor : cursors)
+ for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
{
- add(cursor);
+ put(entry);
}
}
@@ -83,41 +89,46 @@
{
// try to recycle empty cursors in case the underlying ReplicaDBs received
// new changes. Copy the List to avoid ConcurrentModificationExceptions.
- final DBCursor<UpdateMsg>[] copy =
- exhaustedCursors.toArray(new DBCursor[exhaustedCursors.size()]);
+ final Map<DBCursor<UpdateMsg>, Data> copy =
+ new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
exhaustedCursors.clear();
- for (DBCursor<UpdateMsg> cursor : copy)
+ for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
{
- cursor.next();
- add(cursor);
+ entry.getKey().next();
+ put(entry);
}
}
if (cursors.isEmpty())
{
// no cursors are left with changes.
- currentChange = null;
+ currentRecord = null;
+ currentData = null;
return false;
}
// To keep consistent the cursors' order in the SortedSet, it is necessary
// to remove and eventually add again a cursor (after moving it forward).
- final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
- currentChange = cursor.getRecord();
+ final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
+ final DBCursor<UpdateMsg> cursor = entry.getKey();
+ currentRecord = cursor.getRecord();
+ currentData = entry.getValue();
cursor.next();
- add(cursor);
+ put(entry);
return true;
}
- private void add(DBCursor<UpdateMsg> cursor)
+ private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
{
+ final DBCursor<UpdateMsg> cursor = entry.getKey();
+ final Data data = entry.getValue();
if (cursor.getRecord() != null)
{
- this.cursors.add(cursor);
+ this.cursors.put(cursor, data);
}
else
{
- this.exhaustedCursors.add(cursor);
+ this.exhaustedCursors.put(cursor, data);
}
}
@@ -125,23 +136,34 @@
@Override
public UpdateMsg getRecord()
{
- return currentChange;
+ return currentRecord;
+ }
+
+ /**
+ * Returns the data associated to the cursor that returned the current record.
+ *
+ * @return the data associated to the cursor that returned the current record.
+ */
+ public Data getData()
+ {
+ return currentData;
}
/** {@inheritDoc} */
@Override
public void close()
{
- StaticUtils.close(cursors);
- StaticUtils.close(exhaustedCursors);
+ StaticUtils.close(cursors.keySet());
+ StaticUtils.close(exhaustedCursors.keySet());
}
/** {@inheritDoc} */
@Override
public String toString()
{
- return getClass().getSimpleName() + " currentChange=" + currentChange
- + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors;
+ return getClass().getSimpleName() + " currentRecord=" + currentRecord
+ + " currentData=" + currentData + " openCursors=" + cursors
+ + " exhaustedCursors=" + exhaustedCursors;
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index 0b1c599..f502a7c 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -154,7 +154,7 @@
@Override
public String toString()
{
- return getClass().getSimpleName() + " currentChange=" + currentChange + ""
- + replicaDB;
+ return getClass().getSimpleName() + " currentChange=" + currentChange
+ + " replicaDB=" + replicaDB;
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
new file mode 100644
index 0000000..3754ac4
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -0,0 +1,344 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.lang.Thread.State;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.mockito.ArgumentCaptor;
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+import org.opends.server.util.Pair;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+@SuppressWarnings("javadoc")
+public class ChangeNumberIndexerTest extends DirectoryServerTestCase
+{
+
+ private static final class ReplicatedUpdateMsg extends UpdateMsg
+ {
+
+ private final DN baseDN;
+ private final boolean emptyCursor;
+
+ public ReplicatedUpdateMsg(DN baseDN, CSN csn)
+ {
+ this(baseDN, csn, false);
+ }
+
+ public ReplicatedUpdateMsg(DN baseDN, CSN csn, boolean emptyCursor)
+ {
+ super(csn, null);
+ this.baseDN = baseDN;
+ this.emptyCursor = emptyCursor;
+ }
+
+ public DN getBaseDN()
+ {
+ return baseDN;
+ }
+
+ public boolean isEmptyCursor()
+ {
+ return emptyCursor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "csn=" + getCSN() + ", baseDN=" + baseDN;
+ }
+ }
+
+ private static DN BASE_DN;
+ private static final int serverId1 = 101;
+ private static final int serverId2 = 102;
+
+ private ChangelogDB changelogDB;
+ private ChangeNumberIndexDB cnIndexDB;
+ private ReplicationDomainDB domainDB;
+ private Map<Pair<DN, Integer>, SequentialDBCursor> cursors =
+ new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+ private ChangelogState initialState;
+ private ChangeNumberIndexer indexer;
+ private MultiDomainServerState previousCookie;
+
+ @BeforeClass
+ public static void classSetup() throws Exception
+ {
+ TestCaseUtils.startFakeServer();
+ BASE_DN = DN.decode("dc=example,dc=com");
+ }
+
+ @AfterClass
+ public static void classTearDown() throws Exception
+ {
+ TestCaseUtils.shutdownFakeServer();
+ }
+
+ @BeforeMethod
+ public void setup() throws Exception
+ {
+ changelogDB = mock(ChangelogDB.class);
+ cnIndexDB = mock(ChangeNumberIndexDB.class);
+ domainDB = mock(ReplicationDomainDB.class);
+ when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
+ when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
+
+ initialState = new ChangelogState();
+ previousCookie = new MultiDomainServerState();
+ }
+
+
+ private static final String EMPTY_DB_NO_DS = "emptyDBNoDS";
+
+ @Test
+ public void emptyDBNoDS() throws Exception
+ {
+ startIndexer();
+ verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
+ }
+
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBOneInitialDS() throws Exception
+ {
+ addReplica(BASE_DN, serverId1);
+ startIndexer();
+
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+ publishUpdateMsg(msg1);
+
+ assertAddedRecords(msg1);
+ }
+
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void nonEmptyDBOneInitialDS() throws Exception
+ {
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+ addReplica(BASE_DN, serverId1);
+ setDBInitialRecords(msg1);
+ startIndexer();
+
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
+ publishUpdateMsg(msg2);
+
+ assertAddedRecords(msg2);
+ }
+
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBTwoInitialDSs() throws Exception
+ {
+ addReplica(BASE_DN, serverId1);
+ addReplica(BASE_DN, serverId2);
+ startIndexer();
+
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+ publishUpdateMsg(msg2, msg1);
+
+ assertAddedRecords(msg1, msg2);
+ }
+
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void nonEmptyDBTwoInitialDSs() throws Exception
+ {
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+ addReplica(BASE_DN, serverId1);
+ addReplica(BASE_DN, serverId2);
+ setDBInitialRecords(msg1, msg2);
+ startIndexer();
+
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
+ final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
+ publishUpdateMsg(msg3, msg4);
+
+ assertAddedRecords(msg3, msg4);
+ }
+
+ @Test(enabled = false, dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBTwoCursorsOneEmptyForSomeTime() throws Exception
+ {
+ // TODO JNR make this tests work
+ addReplica(BASE_DN, serverId1);
+ addReplica(BASE_DN, serverId2);
+ startIndexer();
+
+ final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN, serverId2, 1);
+ final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN, serverId2);
+ final ReplicatedUpdateMsg msg2Sid1 = msg(BASE_DN, serverId1, 2);
+ final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
+ // simulate no messages received during some time for replica 2
+ publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
+
+ assertAddedRecords(msg1Sid2, msg2Sid1, msg3Sid2);
+ }
+
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
+ {
+ addReplica(BASE_DN, serverId1);
+ startIndexer();
+
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+ publishUpdateMsg(msg1);
+
+ addReplica(BASE_DN, serverId2);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+ publishUpdateMsg(msg2);
+
+ assertAddedRecords(msg1, msg2);
+ }
+
+ private void addReplica(DN baseDN, int serverId) throws Exception
+ {
+ final SequentialDBCursor cursor = new SequentialDBCursor();
+ cursors.put(Pair.of(baseDN, serverId), cursor);
+ when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
+ .thenReturn(cursor);
+ when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(new ServerState());
+ initialState.addServerIdToDomain(serverId, baseDN);
+ }
+
+ private void startIndexer()
+ {
+ indexer = new ChangeNumberIndexer(changelogDB, initialState);
+ indexer.start();
+ waitForWaitingState(indexer);
+ }
+
+ private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time)
+ {
+ return new ReplicatedUpdateMsg(baseDN, new CSN(time, 0, serverId));
+ }
+
+ private ReplicatedUpdateMsg emptyCursor(DN baseDN, int serverId)
+ {
+ return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true);
+ }
+
+ private void setDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
+ {
+ // Initialize the previous cookie that will be used to compare the records
+ // added to the CNIndexDB at the end of this test
+ for (int i = 0; i < msgs.length; i++)
+ {
+ ReplicatedUpdateMsg msg = msgs[i];
+ if (i + 1 == msgs.length)
+ {
+ final ReplicatedUpdateMsg newestMsg = msg;
+ final DN baseDN = newestMsg.getBaseDN();
+ final CSN csn = newestMsg.getCSN();
+ when(cnIndexDB.getNewestRecord()).thenReturn(
+ new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn));
+ final SequentialDBCursor cursor =
+ cursors.get(Pair.of(baseDN, csn.getServerId()));
+ cursor.add(newestMsg);
+ cursor.next(); // simulate the cursor had been initialized with this change
+ }
+ previousCookie.update(msg.getBaseDN(), msg.getCSN());
+ }
+ }
+
+ private void publishUpdateMsg(ReplicatedUpdateMsg... msgs) throws Exception
+ {
+ for (ReplicatedUpdateMsg msg : msgs)
+ {
+ final SequentialDBCursor cursor =
+ cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
+ if (msg.isEmptyCursor())
+ {
+ cursor.add(null);
+ }
+ else
+ {
+ cursor.add(msg);
+ }
+ }
+ for (ReplicatedUpdateMsg msg : msgs)
+ {
+ if (!msg.isEmptyCursor())
+ {
+ indexer.publishUpdateMsg(msg.getBaseDN(), msg);
+ }
+ }
+ waitForWaitingState(indexer);
+ }
+
+ private void waitForWaitingState(final Thread t)
+ {
+ State state = t.getState();
+ while (!state.equals(State.WAITING) && !state.equals(State.TERMINATED))
+ {
+ Thread.yield();
+ state = t.getState();
+ }
+ assertThat(state).isEqualTo(State.WAITING);
+ }
+
+ private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception
+ {
+ final ArgumentCaptor<ChangeNumberIndexRecord> arg =
+ ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
+ verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
+ final List<ChangeNumberIndexRecord> allValues = arg.getAllValues();
+
+ // recheck it was not called more than expected
+ assertThat(allValues).hasSameSizeAs(msgs);
+ for (int i = 0; i < msgs.length; i++)
+ {
+ final ReplicatedUpdateMsg msg = msgs[i];
+ final ChangeNumberIndexRecord record = allValues.get(i);
+ // check content in order
+ String description = "expected: <" + msg + ">, but got: <" + record + ">";
+ assertThat(record.getBaseDN()).as(description).isEqualTo(msg.getBaseDN());
+ assertThat(record.getCSN()).as(description).isEqualTo(msg.getCSN());
+ assertThat(record.getPreviousCookie()).as(description).isEqualTo(previousCookie.toString());
+ previousCookie.update(msg.getBaseDN(), msg.getCSN());
+ }
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index 46354c9..2f0a47e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -26,69 +26,31 @@
*/
package org.opends.server.replication.server.changelog.je;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.util.Pair;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.opends.server.util.Pair.*;
import static org.testng.Assert.*;
-@SuppressWarnings("javadoc")
+@SuppressWarnings({ "javadoc", "unchecked" })
public class CompositeDBCursorTest extends DirectoryServerTestCase
{
- private static class MyDBCursor implements DBCursor<UpdateMsg>
- {
-
- private final List<UpdateMsg> msgs;
- private UpdateMsg current;
-
- public MyDBCursor(UpdateMsg... msgs)
- {
- this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs));
- next();
- }
-
- /** {@inheritDoc} */
- @Override
- public UpdateMsg getRecord()
- {
- return this.current;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean next()
- {
- if (!this.msgs.isEmpty())
- {
- this.current = this.msgs.remove(0);
- return true;
- }
- this.current = null;
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
- public void close()
- {
- // nothing to do
- }
-
- }
-
private UpdateMsg msg1;
private UpdateMsg msg2;
private UpdateMsg msg3;
private UpdateMsg msg4;
+ private String baseDN1 = "dc=forgerock,dc=com";
+ private String baseDN2 = "dc=example,dc=com";
@BeforeClass
public void setupMsgs()
@@ -102,60 +64,89 @@
@Test
public void emptyCursor() throws Exception
{
- final CompositeDBCursor compCursor = newCompositeDBCursor(new MyDBCursor());
+ final CompositeDBCursor<String> compCursor =
+ newCompositeDBCursor(of(new SequentialDBCursor(), baseDN1));
assertInOrder(compCursor);
}
@Test
public void oneElementCursor() throws Exception
{
- final CompositeDBCursor compCursor =
- newCompositeDBCursor(new MyDBCursor(msg1));
- assertInOrder(compCursor, msg1);
+ final CompositeDBCursor<String> compCursor =
+ newCompositeDBCursor(of(new SequentialDBCursor(msg1), baseDN1));
+ assertInOrder(compCursor, of(msg1, baseDN1));
}
@Test
public void twoElementsCursor() throws Exception
{
- final CompositeDBCursor compCursor =
- newCompositeDBCursor(new MyDBCursor(msg1, msg2));
- assertInOrder(compCursor, msg1, msg2);
+ final CompositeDBCursor<String> compCursor =
+ newCompositeDBCursor(of(new SequentialDBCursor(msg1, msg2), baseDN1));
+ assertInOrder(compCursor,
+ of(msg1, baseDN1),
+ of(msg2, baseDN1));
}
@Test
public void twoEmptyCursors() throws Exception
{
- final CompositeDBCursor compCursor = newCompositeDBCursor(
- new MyDBCursor(),
- new MyDBCursor());
+ final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+ of(new SequentialDBCursor(), baseDN1),
+ of(new SequentialDBCursor(), baseDN2));
assertInOrder(compCursor);
}
@Test
public void twoOneElementCursors() throws Exception
{
- final CompositeDBCursor compCursor = newCompositeDBCursor(
- new MyDBCursor(msg2),
- new MyDBCursor(msg1));
- assertInOrder(compCursor, msg1, msg2);
+ final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+ of(new SequentialDBCursor(msg2), baseDN1),
+ of(new SequentialDBCursor(msg1), baseDN2));
+ assertInOrder(compCursor,
+ of(msg1, baseDN2),
+ of(msg2, baseDN1));
}
@Test
public void twoTwoElementCursors() throws Exception
{
- final CompositeDBCursor compCursor = newCompositeDBCursor(
- new MyDBCursor(msg2, msg3),
- new MyDBCursor(msg1, msg4));
- assertInOrder(compCursor, msg1, msg2, msg3, msg4);
+ final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+ of(new SequentialDBCursor(msg2, msg3), baseDN1),
+ of(new SequentialDBCursor(msg1, msg4), baseDN2));
+ assertInOrder(compCursor,
+ of(msg1, baseDN2),
+ of(msg2, baseDN1),
+ of(msg3, baseDN1),
+ of(msg4, baseDN2));
}
@Test
public void recycleTwoElementCursors() throws Exception
{
- final CompositeDBCursor compCursor = newCompositeDBCursor(
- new MyDBCursor(msg2, null, msg3),
- new MyDBCursor(null, msg1, msg4));
- assertInOrder(compCursor, msg1, msg2, msg3, msg4);
+ final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+ of(new SequentialDBCursor(msg2, null, msg3), baseDN1),
+ of(new SequentialDBCursor(null, msg1, msg4), baseDN2));
+ assertInOrder(compCursor,
+ of(msg1, baseDN2),
+ of(msg2, baseDN1),
+ of(msg3, baseDN1),
+ of(msg4, baseDN2));
+ }
+
+ @Test
+ public void recycleTwoElementCursorsTODOJNR() throws Exception
+ {
+ SequentialDBCursor cursor1 = new SequentialDBCursor(msg2, null, msg3);
+ SequentialDBCursor cursor2 = new SequentialDBCursor(null, msg1, msg4);
+ cursor1.next();
+ cursor2.next();
+ final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+ of(cursor1, baseDN1),
+ of(cursor2, baseDN2));
+ assertInOrder(compCursor,
+ of(msg1, baseDN2),
+ of(msg3, baseDN1),
+ of(msg4, baseDN2));
}
private UpdateMsg newUpdateMsg(int t)
@@ -163,18 +154,26 @@
return new UpdateMsg(new CSN(t, t, t), new byte[t]);
}
- private CompositeDBCursor newCompositeDBCursor(DBCursor<UpdateMsg>... cursors)
+ private CompositeDBCursor<String> newCompositeDBCursor(
+ Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
{
- return new CompositeDBCursor(Arrays.asList(cursors));
+ final Map<DBCursor<UpdateMsg>, String> cursorsMap =
+ new HashMap<DBCursor<UpdateMsg>, String>();
+ for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
+ {
+ cursorsMap.put(pair.getFirst(), pair.getSecond());
+ }
+ return new CompositeDBCursor<String>(cursorsMap);
}
- private void assertInOrder(final CompositeDBCursor compCursor,
- UpdateMsg... msgs) throws ChangelogException
+ private void assertInOrder(final CompositeDBCursor<String> compCursor,
+ Pair<UpdateMsg, String>... expecteds) throws ChangelogException
{
- for (UpdateMsg msg : msgs)
+ for (final Pair<UpdateMsg, String> expected : expecteds)
{
assertTrue(compCursor.next());
- assertSame(compCursor.getRecord(), msg);
+ assertSame(compCursor.getRecord(), expected.getFirst());
+ assertSame(compCursor.getData(), expected.getSecond());
}
assertFalse(compCursor.next());
compCursor.close();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
new file mode 100644
index 0000000..063fd27
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
@@ -0,0 +1,88 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+
+@SuppressWarnings("javadoc")
+class SequentialDBCursor implements DBCursor<UpdateMsg>
+{
+
+ private final List<UpdateMsg> msgs;
+ private UpdateMsg current;
+
+ public SequentialDBCursor(UpdateMsg... msgs)
+ {
+ this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs));
+ next();
+ }
+
+ public void add(UpdateMsg msg)
+ {
+ this.msgs.add(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return current;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next()
+ {
+ if (!msgs.isEmpty())
+ {
+ current = msgs.remove(0);
+ return current != null;
+ }
+ current = null;
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ // nothing to do
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "currentRecord=" + current + " nextMessages=" + msgs;
+ }
+
+}
\ No newline at end of file
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
index 6f55686..b536725 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
@@ -26,8 +26,9 @@
*/
package org.opends.server.types;
-import java.util.Collections;
+import java.util.Arrays;
+import org.opends.server.api.AttributeSyntax;
import org.opends.server.schema.OIDSyntax;
/**
@@ -38,13 +39,36 @@
public interface AttributeTypeConstants
{
+ AttributeSyntax<?> OID_SYNTAX = new OIDSyntax();
+
AttributeType OBJECT_CLASS = new AttributeType(
"( 2.5.4.0 NAME 'objectClass' EQUALITY objectIdentifierMatch "
+ "SYNTAX 1.3.6.1.4.1.1466.115.121.1.38 X-ORIGIN 'RFC 2256' )",
- "objectClass", Collections.singletonList("objectClass"), "2.5.4.0", null,
- null, new OIDSyntax(), AttributeUsage.USER_APPLICATIONS, false, false,
- false, false);
+ "objectClass", Arrays.asList("objectClass"), "2.5.4.0",
+ null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
+ false, false, false, false);
- AttributeType[] ALL = { OBJECT_CLASS, };
+ AttributeType ORGANIZATION_NAME = new AttributeType(
+ "( 2.5.4.10 NAME ( 'o' 'organizationName' ) SUP name X-ORIGIN 'RFC 4519' )",
+ "organizationName", Arrays.asList("o", "organizationName"), "2.5.4.10",
+ null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
+ false, false, false, false);
+
+ AttributeType ORGANIZATIONAL_UNIT_NAME = new AttributeType(
+ "( 2.5.4.11 NAME ( 'ou' 'organizationalUnitName' ) SUP name X-ORIGIN 'RFC 4519' )",
+ "organizationalUnitName", Arrays.asList("ou", "organizationalUnitName"), "2.5.4.11",
+ null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
+ false, false, false, false);
+
+ AttributeType DOMAIN_COMPONENT = new AttributeType(
+ "( 0.9.2342.19200300.100.1.25 NAME ( 'dc' 'domainComponent' ) "
+ + "EQUALITY caseIgnoreIA5Match SUBSTR caseIgnoreIA5SubstringsMatch"
+ + "SYNTAX 1.3.6.1.4.1.1466.115.121.1.26 SINGLE-VALUE X-ORIGIN 'RFC 4519' )",
+ "domainComponent", Arrays.asList("dc", "domainComponent"), "0.9.2342.19200300.100.1.25",
+ null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
+ false, false, false, false);
+
+ AttributeType[] ALL = { OBJECT_CLASS, ORGANIZATION_NAME,
+ ORGANIZATIONAL_UNIT_NAME, DOMAIN_COMPONENT, };
}
--
Gitblit v1.10.0