From ea9e4fc1b5d3c481a65c5929eb1b4fedfc04a0be Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Fri, 28 Aug 2015 15:07:11 +0000
Subject: [PATCH] OPENDJ-2182 CR-7977 Change number indexer prevent server from shutting down

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java             |    9 -
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java        |   37 ++++-----
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                |    8 +-
 opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java |   51 ++++++------
 opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java    |    9 +-
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java            |    8 +-
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java      |   41 ++++++++++
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java     |   23 +++++
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java     |   11 +-
 9 files changed, 127 insertions(+), 70 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java
new file mode 100644
index 0000000..74dc007
--- /dev/null
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java
@@ -0,0 +1,41 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2015 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.api;
+
+import org.opends.server.replication.server.ChangelogState;
+
+/**
+ * Small interface for common Replication Environment operations.
+ */
+public interface ChangelogStateProvider
+{
+  /**
+   * Returns the current state of the replication changelog.
+   *
+   * @return current in-memory {@link ChangelogState}.
+   */
+  ChangelogState getChangelogState();
+}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
index 6569a04..f6011b4 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
@@ -37,13 +37,13 @@
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.ReplicaOfflineMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
 import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
 import org.opends.server.types.DN;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -73,8 +73,7 @@
    */
   private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>();
   private final ChangelogDB changelogDB;
-  /** Only used for initialization, and then discarded. */
-  private ChangelogState changelogState;
+  private final ChangelogStateProvider changelogStateProvider;
   private final ECLEnabledDomainPredicate predicate;
 
   /*
@@ -111,33 +110,30 @@
 
   /**
    * Builds a ChangeNumberIndexer object.
-   *
-   * @param changelogDB
+   *  @param changelogDB
    *          the changelogDB
-   * @param changelogState
-   *          the changelog state used for initialization
+   * @param changelogStateProvider
+   *          the replication environment information for access to changelog state
    */
-  public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
+  public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider)
   {
-    this(changelogDB, changelogState, new ECLEnabledDomainPredicate());
+    this(changelogDB, changelogStateProvider, new ECLEnabledDomainPredicate());
   }
 
   /**
    * Builds a ChangeNumberIndexer object.
-   *
    * @param changelogDB
    *          the changelogDB
-   * @param changelogState
+   * @param changelogStateProvider
    *          the changelog state used for initialization
    * @param predicate
-   *          tells whether a domain is enabled for the external changelog
    */
-  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState,
+  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider,
       ECLEnabledDomainPredicate predicate)
   {
     super("Change number indexer");
     this.changelogDB = changelogDB;
-    this.changelogState = changelogState;
+    this.changelogStateProvider = changelogStateProvider;
     this.predicate = predicate;
   }
 
@@ -310,9 +306,6 @@
     initializeLastAliveCSNs(domainDB);
     initializeNextChangeCursor(domainDB);
     initializeOfflineReplicas();
-
-    // this will not be used any more. Discard for garbage collection.
-    this.changelogState = null;
   }
 
   private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
@@ -331,7 +324,7 @@
 
   private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
   {
-    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
+    for (Entry<DN, Set<Integer>> entry : changelogStateProvider.getChangelogState().getDomainToServerIds().entrySet())
     {
       final DN baseDN = entry.getKey();
       if (predicate.isECLEnabledDomain(baseDN))
@@ -353,7 +346,7 @@
 
   private void initializeOfflineReplicas()
   {
-    final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
+    final MultiDomainServerState offlineReplicas = changelogStateProvider.getChangelogState().getOfflineReplicas();
     for (DN baseDN : offlineReplicas)
     {
       for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
@@ -409,7 +402,11 @@
             // once this domain's state has been cleared.
             domainsToClear.remove(baseDNToClear);
           }
-
+          if (nextChangeForInsertDBCursor.shouldReInitialize())
+          {
+            nextChangeForInsertDBCursor.close();
+            initialize();
+          }
           // Do not call DBCursor.next() here
           // because we might not have consumed the last record,
           // for example if we could not move the MCP forward
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
index 5d05037..4c33d32 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
@@ -29,6 +29,9 @@
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.types.DN;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Multi domain DB cursor that only returns updates for the domains which have
  * been enabled for the external changelog.
@@ -37,6 +40,7 @@
 {
   private final ECLEnabledDomainPredicate predicate;
   private final MultiDomainDBCursor cursor;
+  private final List<DN> eclDisabledDomains = new ArrayList<>();
 
   /**
    * Builds an instance of this class filtering updates from the provided cursor.
@@ -82,6 +86,24 @@
     cursor.removeDomain(baseDN);
   }
 
+  /**
+   * Returns whether the cursor should be reinitialized because a domain became re-enabled.
+   *
+   * @return whether the cursor should be reinitialized
+   */
+  public boolean shouldReInitialize()
+  {
+    for (DN domainDN : eclDisabledDomains)
+    {
+      if (predicate.isECLEnabledDomain(domainDN))
+      {
+        eclDisabledDomains.clear();
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public boolean next() throws ChangelogException
   {
@@ -94,6 +116,7 @@
     while (domain != null && !predicate.isECLEnabledDomain(domain))
     {
       cursor.removeDomain(domain);
+      eclDisabledDomains.add(domain);
       domain = cursor.getData();
     }
     return domain != null;
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index d881e3e..db11877 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -297,7 +297,7 @@
       initializeToChangelogState(changelogState);
       if (config.isComputeChangeNumber())
       {
-        startIndexer(changelogState);
+        startIndexer();
       }
       setPurgeDelay(replicationServer.getPurgeDelay());
     }
@@ -610,7 +610,7 @@
   {
     if (computeChangeNumber)
     {
-      startIndexer(replicationEnv.getChangelogState());
+      startIndexer();
     }
     else
     {
@@ -622,9 +622,9 @@
     }
   }
 
-  private void startIndexer(final ChangelogState changelogState)
+  private void startIndexer()
   {
-    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
+    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv);
     if (cnIndexer.compareAndSet(null, indexer))
     {
       indexer.start();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index 5deba88..a21bf54 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -59,6 +59,7 @@
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
 import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
@@ -124,7 +125,7 @@
  * |           \---head.log [contains last records written]
  * </pre>
  */
-class ReplicationEnvironment
+class ReplicationEnvironment implements ChangelogStateProvider
 {
 
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
@@ -318,12 +319,8 @@
     return state;
   }
 
-  /**
-   * Returns the current state of the replication changelog.
-   *
-   * @return the current {@link ChangelogState}
-   */
-  ChangelogState getChangelogState()
+  @Override
+  public ChangelogState getChangelogState()
   {
     return changelogState;
   }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index b744bcc..864060c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -323,7 +323,7 @@
       initializeToChangelogState(changelogState);
       if (config.isComputeChangeNumber())
       {
-        startIndexer(changelogState);
+        startIndexer();
       }
       setPurgeDelay(replicationServer.getPurgeDelay());
     }
@@ -652,7 +652,7 @@
   {
     if (computeChangeNumber)
     {
-      startIndexer(replicationEnv.getChangelogState());
+      startIndexer();
     }
     else
     {
@@ -664,9 +664,9 @@
     }
   }
 
-  private void startIndexer(final ChangelogState changelogState)
+  private void startIndexer()
   {
-    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
+    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv);
     if (cnIndexer.compareAndSet(null, indexer))
     {
       indexer.start();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index e33cbc0..d9f80bd 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -41,6 +41,7 @@
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 
@@ -57,7 +58,7 @@
  * This class represents a DB environment that acts as a factory for
  * ReplicationDBs.
  */
-public class ReplicationDbEnv
+public class ReplicationDbEnv implements ChangelogStateProvider
 {
   private Environment dbEnvironment;
   private Database changelogStateDb;
@@ -227,11 +228,7 @@
     return db;
   }
 
-  /**
-   * Return the current changelog state.
-   *
-   * @return the current {@link ChangelogState}
-   */
+  @Override
   public ChangelogState getChangelogState()
   {
     return changelogState;
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java
index d9cd028..9c1db25 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java
@@ -49,10 +49,7 @@
 import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
 import org.opends.server.replication.server.changelog.api.ReplicaId;
 import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
-import org.opends.server.replication.server.changelog.file.ChangeNumberIndexer;
-import org.opends.server.replication.server.changelog.file.DomainDBCursor;
-import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate;
-import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor;
+import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
 import org.opends.server.types.DN;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -593,7 +590,9 @@
         return eclEnabledDomains.contains(baseDN);
       }
     };
-    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState, predicate)
+    ChangelogStateProvider changeLogState = mock(ChangelogStateProvider.class);
+    when(changeLogState.getChangelogState()).thenReturn(initialState);
+    cnIndexer = new ChangeNumberIndexer(changelogDB, changeLogState, predicate)
     {
       /** {@inheritDoc} */
       @Override
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java
index 56645f5..46d2263 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java
@@ -125,43 +125,36 @@
     final DN baseDN1 = DN.valueOf("dc=example,dc=com");
     final DN baseDN2 = DN.valueOf("cn=admin data");
     eclEnabledDomains.add(baseDN1);
+    final UpdateMsg msgs[] = newUpdateMsgs(13);
 
     // At least two updates in an enabled domain
-    final UpdateMsg msg1 = new FakeUpdateMsg(1);
-    final UpdateMsg msg2 = new FakeUpdateMsg(2);
-    final UpdateMsg msg3 = new FakeUpdateMsg(3);
-    final UpdateMsg msg4 = new FakeUpdateMsg(4);
-    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg1, msg4));
-    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg2, msg3));
+    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[0], msgs[3]));
+    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[1], msgs[2]));
 
-    assertMessagesInOrder(baseDN1, msg1, msg4);
+    assertMessagesInOrder(baseDN1, msgs[0], msgs[3]);
     assertEmpty();
 
     //Only one update in an enabled domain
-    final UpdateMsg msg5 = new FakeUpdateMsg(5);
-    final UpdateMsg msg6 = new FakeUpdateMsg(6);
-    final UpdateMsg msg7 = new FakeUpdateMsg(7);
-    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg5));
-    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg6, msg7));
+    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[4]));
+    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[5], msgs[6]));
 
-    assertMessagesInOrder(baseDN1, msg5, null);
+    assertMessagesInOrder(baseDN1, msgs[4], null);
     assertEmpty();
 
     // Two disabled domains
-    final DN baseDN3 = DN.valueOf("cn=schema");
-    final UpdateMsg msg8 = new FakeUpdateMsg(8);
-    final UpdateMsg msg9 = new FakeUpdateMsg(9);
-    final UpdateMsg msg10 = new FakeUpdateMsg(10);
-    final UpdateMsg msg11 = new FakeUpdateMsg(11);
-    final UpdateMsg msg12 = new FakeUpdateMsg(12);
-    final UpdateMsg msg13 = new FakeUpdateMsg(13);
+    final DN baseDN3 = DN.valueOf("dc=example,dc=net");
 
-    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg8, msg10));
-    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg9, msg11));
-    addDomainCursorToCursor(baseDN3, new SequentialDBCursor(msg12, msg13));
+    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[7], msgs[9]));
+    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[8], msgs[10]));
+    addDomainCursorToCursor(baseDN3, new SequentialDBCursor(msgs[11], msgs[12]));
 
-    assertMessagesInOrder(baseDN1, msg8, msg10);
+    assertMessagesInOrder(baseDN1, msgs[7], msgs[9]);
     assertEmpty();
+
+    // Test disable/enable domain tracking
+    eclEnabledDomains.add(baseDN3);
+    assertThat(eclCursor.shouldReInitialize()).isTrue();
+    assertThat(eclCursor.shouldReInitialize()).isFalse();
   }
 
   private void assertEmpty() throws Exception
@@ -174,6 +167,16 @@
     assertMessagesInOrder(baseDN, msg1, null);
   }
 
+  private UpdateMsg[] newUpdateMsgs(int num)
+  {
+    UpdateMsg[] results = new UpdateMsg[num];
+    for (int i = 0; i < num; i++)
+    {
+      results[i] = new FakeUpdateMsg(i + 1);
+    }
+    return results;
+  }
+
   private void assertMessagesInOrder(DN baseDN, UpdateMsg msg1, UpdateMsg msg2) throws Exception
   {
     assertThat(eclCursor.getRecord()).isNull();

--
Gitblit v1.10.0