From ea9e4fc1b5d3c481a65c5929eb1b4fedfc04a0be Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Fri, 28 Aug 2015 15:07:11 +0000
Subject: [PATCH] OPENDJ-2182 CR-7977 Change number indexer prevent server from shutting down
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 9 -
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java | 37 ++++-----
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 8 +-
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java | 51 ++++++------
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java | 9 +-
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 8 +-
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java | 41 ++++++++++
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java | 23 +++++
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java | 11 +-
9 files changed, 127 insertions(+), 70 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java
new file mode 100644
index 0000000..74dc007
--- /dev/null
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java
@@ -0,0 +1,41 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2015 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.api;
+
+import org.opends.server.replication.server.ChangelogState;
+
+/**
+ * Small interface for common Replication Environment operations.
+ */
+public interface ChangelogStateProvider
+{
+ /**
+ * Returns the current state of the replication changelog.
+ *
+ * @return current in-memory {@link ChangelogState}.
+ */
+ ChangelogState getChangelogState();
+}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
index 6569a04..f6011b4 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
@@ -37,13 +37,13 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
import org.opends.server.types.DN;
import static org.opends.messages.ReplicationMessages.*;
@@ -73,8 +73,7 @@
*/
private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>();
private final ChangelogDB changelogDB;
- /** Only used for initialization, and then discarded. */
- private ChangelogState changelogState;
+ private final ChangelogStateProvider changelogStateProvider;
private final ECLEnabledDomainPredicate predicate;
/*
@@ -111,33 +110,30 @@
/**
* Builds a ChangeNumberIndexer object.
- *
- * @param changelogDB
+ * @param changelogDB
* the changelogDB
- * @param changelogState
- * the changelog state used for initialization
+ * @param changelogStateProvider
+ * the replication environment information for access to changelog state
*/
- public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
+ public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider)
{
- this(changelogDB, changelogState, new ECLEnabledDomainPredicate());
+ this(changelogDB, changelogStateProvider, new ECLEnabledDomainPredicate());
}
/**
* Builds a ChangeNumberIndexer object.
- *
* @param changelogDB
* the changelogDB
- * @param changelogState
+ * @param changelogStateProvider
* the changelog state used for initialization
* @param predicate
- * tells whether a domain is enabled for the external changelog
*/
- ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState,
+ ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider,
ECLEnabledDomainPredicate predicate)
{
super("Change number indexer");
this.changelogDB = changelogDB;
- this.changelogState = changelogState;
+ this.changelogStateProvider = changelogStateProvider;
this.predicate = predicate;
}
@@ -310,9 +306,6 @@
initializeLastAliveCSNs(domainDB);
initializeNextChangeCursor(domainDB);
initializeOfflineReplicas();
-
- // this will not be used any more. Discard for garbage collection.
- this.changelogState = null;
}
private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
@@ -331,7 +324,7 @@
private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
{
- for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
+ for (Entry<DN, Set<Integer>> entry : changelogStateProvider.getChangelogState().getDomainToServerIds().entrySet())
{
final DN baseDN = entry.getKey();
if (predicate.isECLEnabledDomain(baseDN))
@@ -353,7 +346,7 @@
private void initializeOfflineReplicas()
{
- final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
+ final MultiDomainServerState offlineReplicas = changelogStateProvider.getChangelogState().getOfflineReplicas();
for (DN baseDN : offlineReplicas)
{
for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
@@ -409,7 +402,11 @@
// once this domain's state has been cleared.
domainsToClear.remove(baseDNToClear);
}
-
+ if (nextChangeForInsertDBCursor.shouldReInitialize())
+ {
+ nextChangeForInsertDBCursor.close();
+ initialize();
+ }
// Do not call DBCursor.next() here
// because we might not have consumed the last record,
// for example if we could not move the MCP forward
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
index 5d05037..4c33d32 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
@@ -29,6 +29,9 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Multi domain DB cursor that only returns updates for the domains which have
* been enabled for the external changelog.
@@ -37,6 +40,7 @@
{
private final ECLEnabledDomainPredicate predicate;
private final MultiDomainDBCursor cursor;
+ private final List<DN> eclDisabledDomains = new ArrayList<>();
/**
* Builds an instance of this class filtering updates from the provided cursor.
@@ -82,6 +86,24 @@
cursor.removeDomain(baseDN);
}
+ /**
+ * Returns whether the cursor should be reinitialized because a domain became re-enabled.
+ *
+ * @return whether the cursor should be reinitialized
+ */
+ public boolean shouldReInitialize()
+ {
+ for (DN domainDN : eclDisabledDomains)
+ {
+ if (predicate.isECLEnabledDomain(domainDN))
+ {
+ eclDisabledDomains.clear();
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public boolean next() throws ChangelogException
{
@@ -94,6 +116,7 @@
while (domain != null && !predicate.isECLEnabledDomain(domain))
{
cursor.removeDomain(domain);
+ eclDisabledDomains.add(domain);
domain = cursor.getData();
}
return domain != null;
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index d881e3e..db11877 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -297,7 +297,7 @@
initializeToChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
- startIndexer(changelogState);
+ startIndexer();
}
setPurgeDelay(replicationServer.getPurgeDelay());
}
@@ -610,7 +610,7 @@
{
if (computeChangeNumber)
{
- startIndexer(replicationEnv.getChangelogState());
+ startIndexer();
}
else
{
@@ -622,9 +622,9 @@
}
}
- private void startIndexer(final ChangelogState changelogState)
+ private void startIndexer()
{
- final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
+ final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv);
if (cnIndexer.compareAndSet(null, indexer))
{
indexer.start();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index 5deba88..a21bf54 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -59,6 +59,7 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -124,7 +125,7 @@
* | \---head.log [contains last records written]
* </pre>
*/
-class ReplicationEnvironment
+class ReplicationEnvironment implements ChangelogStateProvider
{
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
@@ -318,12 +319,8 @@
return state;
}
- /**
- * Returns the current state of the replication changelog.
- *
- * @return the current {@link ChangelogState}
- */
- ChangelogState getChangelogState()
+ @Override
+ public ChangelogState getChangelogState()
{
return changelogState;
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index b744bcc..864060c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -323,7 +323,7 @@
initializeToChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
- startIndexer(changelogState);
+ startIndexer();
}
setPurgeDelay(replicationServer.getPurgeDelay());
}
@@ -652,7 +652,7 @@
{
if (computeChangeNumber)
{
- startIndexer(replicationEnv.getChangelogState());
+ startIndexer();
}
else
{
@@ -664,9 +664,9 @@
}
}
- private void startIndexer(final ChangelogState changelogState)
+ private void startIndexer()
{
- final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
+ final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv);
if (cnIndexer.compareAndSet(null, indexer))
{
indexer.start();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index e33cbc0..d9f80bd 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -57,7 +58,7 @@
* This class represents a DB environment that acts as a factory for
* ReplicationDBs.
*/
-public class ReplicationDbEnv
+public class ReplicationDbEnv implements ChangelogStateProvider
{
private Environment dbEnvironment;
private Database changelogStateDb;
@@ -227,11 +228,7 @@
return db;
}
- /**
- * Return the current changelog state.
- *
- * @return the current {@link ChangelogState}
- */
+ @Override
public ChangelogState getChangelogState()
{
return changelogState;
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java
index d9cd028..9c1db25 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java
@@ -49,10 +49,7 @@
import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
import org.opends.server.replication.server.changelog.api.ReplicaId;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
-import org.opends.server.replication.server.changelog.file.ChangeNumberIndexer;
-import org.opends.server.replication.server.changelog.file.DomainDBCursor;
-import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate;
-import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor;
+import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
import org.opends.server.types.DN;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -593,7 +590,9 @@
return eclEnabledDomains.contains(baseDN);
}
};
- cnIndexer = new ChangeNumberIndexer(changelogDB, initialState, predicate)
+ ChangelogStateProvider changeLogState = mock(ChangelogStateProvider.class);
+ when(changeLogState.getChangelogState()).thenReturn(initialState);
+ cnIndexer = new ChangeNumberIndexer(changelogDB, changeLogState, predicate)
{
/** {@inheritDoc} */
@Override
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java
index 56645f5..46d2263 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java
@@ -125,43 +125,36 @@
final DN baseDN1 = DN.valueOf("dc=example,dc=com");
final DN baseDN2 = DN.valueOf("cn=admin data");
eclEnabledDomains.add(baseDN1);
+ final UpdateMsg msgs[] = newUpdateMsgs(13);
// At least two updates in an enabled domain
- final UpdateMsg msg1 = new FakeUpdateMsg(1);
- final UpdateMsg msg2 = new FakeUpdateMsg(2);
- final UpdateMsg msg3 = new FakeUpdateMsg(3);
- final UpdateMsg msg4 = new FakeUpdateMsg(4);
- addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg1, msg4));
- addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg2, msg3));
+ addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[0], msgs[3]));
+ addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[1], msgs[2]));
- assertMessagesInOrder(baseDN1, msg1, msg4);
+ assertMessagesInOrder(baseDN1, msgs[0], msgs[3]);
assertEmpty();
//Only one update in an enabled domain
- final UpdateMsg msg5 = new FakeUpdateMsg(5);
- final UpdateMsg msg6 = new FakeUpdateMsg(6);
- final UpdateMsg msg7 = new FakeUpdateMsg(7);
- addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg5));
- addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg6, msg7));
+ addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[4]));
+ addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[5], msgs[6]));
- assertMessagesInOrder(baseDN1, msg5, null);
+ assertMessagesInOrder(baseDN1, msgs[4], null);
assertEmpty();
// Two disabled domains
- final DN baseDN3 = DN.valueOf("cn=schema");
- final UpdateMsg msg8 = new FakeUpdateMsg(8);
- final UpdateMsg msg9 = new FakeUpdateMsg(9);
- final UpdateMsg msg10 = new FakeUpdateMsg(10);
- final UpdateMsg msg11 = new FakeUpdateMsg(11);
- final UpdateMsg msg12 = new FakeUpdateMsg(12);
- final UpdateMsg msg13 = new FakeUpdateMsg(13);
+ final DN baseDN3 = DN.valueOf("dc=example,dc=net");
- addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg8, msg10));
- addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg9, msg11));
- addDomainCursorToCursor(baseDN3, new SequentialDBCursor(msg12, msg13));
+ addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[7], msgs[9]));
+ addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[8], msgs[10]));
+ addDomainCursorToCursor(baseDN3, new SequentialDBCursor(msgs[11], msgs[12]));
- assertMessagesInOrder(baseDN1, msg8, msg10);
+ assertMessagesInOrder(baseDN1, msgs[7], msgs[9]);
assertEmpty();
+
+ // Test disable/enable domain tracking
+ eclEnabledDomains.add(baseDN3);
+ assertThat(eclCursor.shouldReInitialize()).isTrue();
+ assertThat(eclCursor.shouldReInitialize()).isFalse();
}
private void assertEmpty() throws Exception
@@ -174,6 +167,16 @@
assertMessagesInOrder(baseDN, msg1, null);
}
+ private UpdateMsg[] newUpdateMsgs(int num)
+ {
+ UpdateMsg[] results = new UpdateMsg[num];
+ for (int i = 0; i < num; i++)
+ {
+ results[i] = new FakeUpdateMsg(i + 1);
+ }
+ return results;
+ }
+
private void assertMessagesInOrder(DN baseDN, UpdateMsg msg1, UpdateMsg msg2) throws Exception
{
assertThat(eclCursor.getRecord()).isNull();
--
Gitblit v1.10.0