From d6a26e0aea658bf89f950e2255484fdffe343e58 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 19 Aug 2014 15:43:59 +0000
Subject: [PATCH] OPENDJ-1206 (CR-4261) Create a new ReplicationBackend/ChangelogBackend to support cn=changelog

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |   19 +++++++++++--------
 1 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 90091c9..5b3cca6 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -34,7 +34,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.forgerock.util.Reject;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.api.DirectoryThread;
@@ -183,7 +182,9 @@
     catch (Exception e)
     {
       if (debugEnabled())
+      {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
 
       final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ")
           .append(String.valueOf(dbDirectory));
@@ -585,7 +586,9 @@
             firstException = e;
           }
           else if (debugEnabled())
+          {
             TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
         }
       }
     }
@@ -687,7 +690,9 @@
         catch (Exception e)
         {
           if (debugEnabled())
+          {
             TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
           logError(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
         }
       }
@@ -765,12 +770,10 @@
   public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
       final PositionStrategy positionStrategy) throws ChangelogException
   {
-    Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
-        + " is not supported for the JE implementation fo changelog");
     final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
-      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
+      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
       final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
       final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
       final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
@@ -860,7 +863,7 @@
     {
       replicationEnv.notifyReplicaOnline(baseDN, serverId);
     }
-    updateCursorsWithOfflineCSN(baseDN, null);
+    updateCursorsWithOfflineCSN(baseDN, serverId, null);
   }
 
   /** {@inheritDoc} */
@@ -873,12 +876,12 @@
     {
       indexer.replicaOffline(baseDN, offlineCSN);
     }
-    updateCursorsWithOfflineCSN(baseDN, offlineCSN);
+    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
   }
 
-  private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
+  private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN)
   {
-    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
+    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
     if (cursors != null && !cursors.isEmpty())
     {
       for (ReplicaCursor cursor : cursors)

--
Gitblit v1.10.0