From ab3cac04319c920ba14be59ea874e6e35f730655 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 21 Jul 2014 17:06:28 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend   to support cn=changelog CR-4053

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |   33 ++++++++++++++++++++-------------
 1 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index ce8b030..469347f 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.forgerock.util.Reject;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.api.DirectoryThread;
@@ -46,6 +47,7 @@
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
 import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
@@ -56,6 +58,7 @@
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
 import static org.opends.server.util.StaticUtils.*;
 
 /**
@@ -704,37 +707,38 @@
 
   /** {@inheritDoc} */
   @Override
-  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+      final PositionStrategy positionStrategy) throws ChangelogException
   {
-    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
     registeredMultiDomainCursors.add(cursor);
     for (DN baseDN : domainToReplicaDBs.keySet())
     {
-      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
+      cursor.addDomain(baseDN, startState.getServerState(baseDN));
     }
     return cursor;
   }
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
-      throws ChangelogException
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
+      final PositionStrategy positionStrategy) throws ChangelogException
   {
-    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+    final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
     for (int serverId : getDomainMap(baseDN).keySet())
     {
       // get the last already sent CSN from that server to get a cursor
-      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
       cursor.addReplicaDB(serverId, lastCSN);
     }
     return cursor;
   }
 
-  private DomainDBCursor newDomainDBCursor(final DN baseDN)
+  private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
   {
     synchronized (registeredDomainCursors)
     {
-      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
       List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
       if (cursors == null)
       {
@@ -761,15 +765,18 @@
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
-      throws ChangelogException
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
+      PositionStrategy positionStrategy) throws ChangelogException
+
   {
+    Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
+        + " is not supported for the JE implementation fo changelog");
     final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
       final DBCursor<UpdateMsg> cursor =
-          replicaDB.generateCursorFrom(startAfterCSN);
-      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+          replicaDB.generateCursorFrom(startCSN);
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
       // TODO JNR if (offlineCSN != null) ??
       // What about replicas that suddenly become offline?
       return new ReplicaOfflineCursor(cursor, offlineCSN);

--
Gitblit v1.10.0