opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java
New file @@ -0,0 +1,41 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2015 ForgeRock AS */ package org.opends.server.replication.server.changelog.api; import org.opends.server.replication.server.ChangelogState; /** * Small interface for common Replication Environment operations. */ public interface ChangelogStateProvider { /** * Returns the current state of the replication changelog. * * @return current in-memory {@link ChangelogState}. */ ChangelogState getChangelogState(); } opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
@@ -37,13 +37,13 @@ import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.ReplicaOfflineMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.replication.server.changelog.api.ChangelogStateProvider; import org.opends.server.types.DN; import static org.opends.messages.ReplicationMessages.*; @@ -73,8 +73,7 @@ */ private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>(); private final ChangelogDB changelogDB; /** Only used for initialization, and then discarded. */ private ChangelogState changelogState; private final ChangelogStateProvider changelogStateProvider; private final ECLEnabledDomainPredicate predicate; /* @@ -111,33 +110,30 @@ /** * Builds a ChangeNumberIndexer object. * * @param changelogDB * @param changelogDB * the changelogDB * @param changelogState * the changelog state used for initialization * @param changelogStateProvider * the replication environment information for access to changelog state */ public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState) public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider) { this(changelogDB, changelogState, new ECLEnabledDomainPredicate()); this(changelogDB, changelogStateProvider, new ECLEnabledDomainPredicate()); } /** * Builds a ChangeNumberIndexer object. * * @param changelogDB * the changelogDB * @param changelogState * @param changelogStateProvider * the changelog state used for initialization * @param predicate * tells whether a domain is enabled for the external changelog */ ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState, ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider, ECLEnabledDomainPredicate predicate) { super("Change number indexer"); this.changelogDB = changelogDB; this.changelogState = changelogState; this.changelogStateProvider = changelogStateProvider; this.predicate = predicate; } @@ -310,9 +306,6 @@ initializeLastAliveCSNs(domainDB); initializeNextChangeCursor(domainDB); initializeOfflineReplicas(); // this will not be used any more. Discard for garbage collection. this.changelogState = null; } private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException @@ -331,7 +324,7 @@ private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB) { for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) for (Entry<DN, Set<Integer>> entry : changelogStateProvider.getChangelogState().getDomainToServerIds().entrySet()) { final DN baseDN = entry.getKey(); if (predicate.isECLEnabledDomain(baseDN)) @@ -353,7 +346,7 @@ private void initializeOfflineReplicas() { final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas(); final MultiDomainServerState offlineReplicas = changelogStateProvider.getChangelogState().getOfflineReplicas(); for (DN baseDN : offlineReplicas) { for (CSN offlineCSN : offlineReplicas.getServerState(baseDN)) @@ -409,7 +402,11 @@ // once this domain's state has been cleared. domainsToClear.remove(baseDNToClear); } if (nextChangeForInsertDBCursor.shouldReInitialize()) { nextChangeForInsertDBCursor.close(); initialize(); } // Do not call DBCursor.next() here // because we might not have consumed the last record, // for example if we could not move the MCP forward opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
@@ -29,6 +29,9 @@ import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.types.DN; import java.util.ArrayList; import java.util.List; /** * Multi domain DB cursor that only returns updates for the domains which have * been enabled for the external changelog. @@ -37,6 +40,7 @@ { private final ECLEnabledDomainPredicate predicate; private final MultiDomainDBCursor cursor; private final List<DN> eclDisabledDomains = new ArrayList<>(); /** * Builds an instance of this class filtering updates from the provided cursor. @@ -82,6 +86,24 @@ cursor.removeDomain(baseDN); } /** * Returns whether the cursor should be reinitialized because a domain became re-enabled. * * @return whether the cursor should be reinitialized */ public boolean shouldReInitialize() { for (DN domainDN : eclDisabledDomains) { if (predicate.isECLEnabledDomain(domainDN)) { eclDisabledDomains.clear(); return true; } } return false; } @Override public boolean next() throws ChangelogException { @@ -94,6 +116,7 @@ while (domain != null && !predicate.isECLEnabledDomain(domain)) { cursor.removeDomain(domain); eclDisabledDomains.add(domain); domain = cursor.getData(); } return domain != null; opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -297,7 +297,7 @@ initializeToChangelogState(changelogState); if (config.isComputeChangeNumber()) { startIndexer(changelogState); startIndexer(); } setPurgeDelay(replicationServer.getPurgeDelay()); } @@ -610,7 +610,7 @@ { if (computeChangeNumber) { startIndexer(replicationEnv.getChangelogState()); startIndexer(); } else { @@ -622,9 +622,9 @@ } } private void startIndexer(final ChangelogState changelogState) private void startIndexer() { final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState); final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv); if (cnIndexer.compareAndSet(null, indexer)) { indexer.start(); opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -59,6 +59,7 @@ import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ChangelogStateProvider; import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; @@ -124,7 +125,7 @@ * | \---head.log [contains last records written] * </pre> */ class ReplicationEnvironment class ReplicationEnvironment implements ChangelogStateProvider { private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); @@ -318,12 +319,8 @@ return state; } /** * Returns the current state of the replication changelog. * * @return the current {@link ChangelogState} */ ChangelogState getChangelogState() @Override public ChangelogState getChangelogState() { return changelogState; } opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -323,7 +323,7 @@ initializeToChangelogState(changelogState); if (config.isComputeChangeNumber()) { startIndexer(changelogState); startIndexer(); } setPurgeDelay(replicationServer.getPurgeDelay()); } @@ -652,7 +652,7 @@ { if (computeChangeNumber) { startIndexer(replicationEnv.getChangelogState()); startIndexer(); } else { @@ -664,9 +664,9 @@ } } private void startIndexer(final ChangelogState changelogState) private void startIndexer() { final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState); final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv); if (cnIndexer.compareAndSet(null, indexer)) { indexer.start(); opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -41,6 +41,7 @@ import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ChangelogStateProvider; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; @@ -57,7 +58,7 @@ * This class represents a DB environment that acts as a factory for * ReplicationDBs. */ public class ReplicationDbEnv public class ReplicationDbEnv implements ChangelogStateProvider { private Environment dbEnvironment; private Database changelogStateDb; @@ -227,11 +228,7 @@ return db; } /** * Return the current changelog state. * * @return the current {@link ChangelogState} */ @Override public ChangelogState getChangelogState() { return changelogState; opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java
@@ -49,10 +49,7 @@ import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; import org.opends.server.replication.server.changelog.api.ReplicaId; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.replication.server.changelog.file.ChangeNumberIndexer; import org.opends.server.replication.server.changelog.file.DomainDBCursor; import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate; import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor; import org.opends.server.replication.server.changelog.api.ChangelogStateProvider; import org.opends.server.types.DN; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -593,7 +590,9 @@ return eclEnabledDomains.contains(baseDN); } }; cnIndexer = new ChangeNumberIndexer(changelogDB, initialState, predicate) ChangelogStateProvider changeLogState = mock(ChangelogStateProvider.class); when(changeLogState.getChangelogState()).thenReturn(initialState); cnIndexer = new ChangeNumberIndexer(changelogDB, changeLogState, predicate) { /** {@inheritDoc} */ @Override opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java
@@ -125,43 +125,36 @@ final DN baseDN1 = DN.valueOf("dc=example,dc=com"); final DN baseDN2 = DN.valueOf("cn=admin data"); eclEnabledDomains.add(baseDN1); final UpdateMsg msgs[] = newUpdateMsgs(13); // At least two updates in an enabled domain final UpdateMsg msg1 = new FakeUpdateMsg(1); final UpdateMsg msg2 = new FakeUpdateMsg(2); final UpdateMsg msg3 = new FakeUpdateMsg(3); final UpdateMsg msg4 = new FakeUpdateMsg(4); addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg1, msg4)); addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg2, msg3)); addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[0], msgs[3])); addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[1], msgs[2])); assertMessagesInOrder(baseDN1, msg1, msg4); assertMessagesInOrder(baseDN1, msgs[0], msgs[3]); assertEmpty(); //Only one update in an enabled domain final UpdateMsg msg5 = new FakeUpdateMsg(5); final UpdateMsg msg6 = new FakeUpdateMsg(6); final UpdateMsg msg7 = new FakeUpdateMsg(7); addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg5)); addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg6, msg7)); addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[4])); addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[5], msgs[6])); assertMessagesInOrder(baseDN1, msg5, null); assertMessagesInOrder(baseDN1, msgs[4], null); assertEmpty(); // Two disabled domains final DN baseDN3 = DN.valueOf("cn=schema"); final UpdateMsg msg8 = new FakeUpdateMsg(8); final UpdateMsg msg9 = new FakeUpdateMsg(9); final UpdateMsg msg10 = new FakeUpdateMsg(10); final UpdateMsg msg11 = new FakeUpdateMsg(11); final UpdateMsg msg12 = new FakeUpdateMsg(12); final UpdateMsg msg13 = new FakeUpdateMsg(13); final DN baseDN3 = DN.valueOf("dc=example,dc=net"); addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg8, msg10)); addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg9, msg11)); addDomainCursorToCursor(baseDN3, new SequentialDBCursor(msg12, msg13)); addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[7], msgs[9])); addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[8], msgs[10])); addDomainCursorToCursor(baseDN3, new SequentialDBCursor(msgs[11], msgs[12])); assertMessagesInOrder(baseDN1, msg8, msg10); assertMessagesInOrder(baseDN1, msgs[7], msgs[9]); assertEmpty(); // Test disable/enable domain tracking eclEnabledDomains.add(baseDN3); assertThat(eclCursor.shouldReInitialize()).isTrue(); assertThat(eclCursor.shouldReInitialize()).isFalse(); } private void assertEmpty() throws Exception @@ -174,6 +167,16 @@ assertMessagesInOrder(baseDN, msg1, null); } private UpdateMsg[] newUpdateMsgs(int num) { UpdateMsg[] results = new UpdateMsg[num]; for (int i = 0; i < num; i++) { results[i] = new FakeUpdateMsg(i + 1); } return results; } private void assertMessagesInOrder(DN baseDN, UpdateMsg msg1, UpdateMsg msg2) throws Exception { assertThat(eclCursor.getRecord()).isNull();