From 881c7d3d5c53debdd4a1bb5f63146f0f73d56957 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 25 Jul 2014 08:26:13 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4082) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java |   54 +++++++++++++++++++++++++++---------------------------
 1 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index eae5638..eee6fe4 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -27,6 +27,7 @@
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
 import static org.opends.server.util.StaticUtils.*;
 
 import java.util.Map.Entry;
@@ -39,7 +40,6 @@
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.ReplicaOfflineMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
@@ -47,7 +47,6 @@
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
-import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 
@@ -73,6 +72,7 @@
   private final ChangelogDB changelogDB;
   /** Only used for initialization, and then discarded. */
   private ChangelogState changelogState;
+  private final ECLEnabledDomainPredicate predicate;
 
   /*
    * The following MultiDomainServerState fields must be thread safe, because
@@ -121,7 +121,7 @@
    *
    * @NonNull
    */
-  private MultiDomainDBCursor nextChangeForInsertDBCursor;
+  private ECLMultiDomainDBCursor nextChangeForInsertDBCursor;
 
   /**
    * Builds a ChangeNumberIndexer object.
@@ -133,9 +133,26 @@
    */
   public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
   {
+    this(changelogDB, changelogState, new ECLEnabledDomainPredicate());
+  }
+
+  /**
+   * Builds a ChangeNumberIndexer object.
+   *
+   * @param changelogDB
+   *          the changelogDB
+   * @param changelogState
+   *          the changelog state used for initialization
+   * @param predicate
+   *          tells whether a domain is enabled for the external changelog
+   */
+  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState,
+      ECLEnabledDomainPredicate predicate)
+  {
     super("Change number indexer");
     this.changelogDB = changelogDB;
     this.changelogState = changelogState;
+    this.predicate = predicate;
   }
 
   /**
@@ -148,7 +165,7 @@
    */
   public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
   {
-    if (!isECLEnabledDomain(baseDN))
+    if (!predicate.isECLEnabledDomain(baseDN))
     {
       return;
     }
@@ -186,7 +203,7 @@
   public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
       throws ChangelogException
   {
-    if (!isECLEnabledDomain(baseDN))
+    if (!predicate.isECLEnabledDomain(baseDN))
     {
       return;
     }
@@ -197,24 +214,6 @@
   }
 
   /**
-   * Returns whether the provided baseDN represents a replication domain enabled
-   * for the external changelog.
-   * <p>
-   * This method is a test seam that break the dependency on a static method.
-   *
-   * @param baseDN
-   *          the replication domain to check
-   * @return true if the provided baseDN is enabled for the external changelog,
-   *         false if the provided baseDN is disabled for the external changelog
-   *         or unknown to multimaster replication.
-   * @see MultimasterReplication#isECLEnabledDomain(DN)
-   */
-  protected boolean isECLEnabledDomain(DN baseDN)
-  {
-    return MultimasterReplication.isECLEnabledDomain(baseDN);
-  }
-
-  /**
    * Signals a replica went offline.
    *
    * @param baseDN
@@ -224,7 +223,7 @@
    */
   public void replicaOffline(DN baseDN, CSN offlineCSN)
   {
-    if (!isECLEnabledDomain(baseDN))
+    if (!predicate.isECLEnabledDomain(baseDN))
     {
       return;
     }
@@ -337,7 +336,7 @@
     for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
     {
       final DN baseDN = entry.getKey();
-      if (isECLEnabledDomain(baseDN))
+      if (predicate.isECLEnabledDomain(baseDN))
       {
         for (Integer serverId : entry.getValue())
         {
@@ -353,7 +352,8 @@
       }
     }
 
-    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY);
+    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate,
+        domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY));
     nextChangeForInsertDBCursor.next();
 
     if (newestRecord != null)
@@ -384,7 +384,7 @@
     {
       for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
       {
-        if (isECLEnabledDomain(baseDN))
+        if (predicate.isECLEnabledDomain(baseDN))
         {
           replicasOffline.update(baseDN, offlineCSN);
           // a replica offline message could also be the very last time

--
Gitblit v1.10.0