opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -41,7 +41,6 @@ import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.ReplicationServer; @@ -230,8 +229,7 @@ } // unlucky, the domainMap does not exist: take the hit and create the // newValue, even though the same could be done concurrently by another // thread // newValue, even though the same could be done concurrently by another thread final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<Integer, FileReplicaDB>(); final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue); if (previousValue != null) @@ -240,15 +238,10 @@ return previousValue; } // When called at replication startup, the isECLEnabledDomain() method blocks on STARTING state. // Checking cursors list ensure that it is never called in the startup case. if (!registeredMultiDomainCursors.isEmpty() && MultimasterReplication.isECLEnabledDomain(baseDN)) // we just created a new domain => update all cursors for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) { // we just created a new domain => update all cursors for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) { cursor.addDomain(baseDN, null); } cursor.addDomain(baseDN, null); } return newValue; } opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -27,6 +27,7 @@ import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; import static org.opends.server.util.StaticUtils.*; import java.util.Map.Entry; @@ -39,7 +40,6 @@ import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.ReplicaOfflineMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; @@ -47,7 +47,6 @@ import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; @@ -73,6 +72,7 @@ private final ChangelogDB changelogDB; /** Only used for initialization, and then discarded. */ private ChangelogState changelogState; private final ECLEnabledDomainPredicate predicate; /* * The following MultiDomainServerState fields must be thread safe, because @@ -121,7 +121,7 @@ * * @NonNull */ private MultiDomainDBCursor nextChangeForInsertDBCursor; private ECLMultiDomainDBCursor nextChangeForInsertDBCursor; /** * Builds a ChangeNumberIndexer object. @@ -133,9 +133,26 @@ */ public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState) { this(changelogDB, changelogState, new ECLEnabledDomainPredicate()); } /** * Builds a ChangeNumberIndexer object. * * @param changelogDB * the changelogDB * @param changelogState * the changelog state used for initialization * @param predicate * tells whether a domain is enabled for the external changelog */ ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState, ECLEnabledDomainPredicate predicate) { super("Change number indexer"); this.changelogDB = changelogDB; this.changelogState = changelogState; this.predicate = predicate; } /** @@ -148,7 +165,7 @@ */ public void publishHeartbeat(DN baseDN, CSN heartbeatCSN) { if (!isECLEnabledDomain(baseDN)) if (!predicate.isECLEnabledDomain(baseDN)) { return; } @@ -186,7 +203,7 @@ public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) throws ChangelogException { if (!isECLEnabledDomain(baseDN)) if (!predicate.isECLEnabledDomain(baseDN)) { return; } @@ -197,24 +214,6 @@ } /** * Returns whether the provided baseDN represents a replication domain enabled * for the external changelog. * <p> * This method is a test seam that break the dependency on a static method. * * @param baseDN * the replication domain to check * @return true if the provided baseDN is enabled for the external changelog, * false if the provided baseDN is disabled for the external changelog * or unknown to multimaster replication. * @see MultimasterReplication#isECLEnabledDomain(DN) */ protected boolean isECLEnabledDomain(DN baseDN) { return MultimasterReplication.isECLEnabledDomain(baseDN); } /** * Signals a replica went offline. * * @param baseDN @@ -224,7 +223,7 @@ */ public void replicaOffline(DN baseDN, CSN offlineCSN) { if (!isECLEnabledDomain(baseDN)) if (!predicate.isECLEnabledDomain(baseDN)) { return; } @@ -337,7 +336,7 @@ for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) { final DN baseDN = entry.getKey(); if (isECLEnabledDomain(baseDN)) if (predicate.isECLEnabledDomain(baseDN)) { for (Integer serverId : entry.getValue()) { @@ -353,7 +352,8 @@ } } nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY); nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY)); nextChangeForInsertDBCursor.next(); if (newestRecord != null) @@ -384,7 +384,7 @@ { for (CSN offlineCSN : offlineReplicas.getServerState(baseDN)) { if (isECLEnabledDomain(baseDN)) if (predicate.isECLEnabledDomain(baseDN)) { replicasOffline.update(baseDN, offlineCSN); // a replica offline message could also be the very last time opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
New file @@ -0,0 +1,55 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.types.DN; /** * Returns whether a domain is enabled for the external changelog. * * @FunctionalInterface */ class ECLEnabledDomainPredicate { /** * Returns whether the provided baseDN represents a replication domain enabled * for the external changelog. * <p> * This method is a test seam that break the dependency on a static method. * * @param baseDN * the replication domain to check * @return true if the provided baseDN is enabled for the external changelog, * false if the provided baseDN is disabled for the external changelog * or unknown to multimaster replication. * @see MultimasterReplication#isECLEnabledDomain(DN) */ public boolean isECLEnabledDomain(DN baseDN) { return MultimasterReplication.isECLEnabledDomain(baseDN); } } opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
New file @@ -0,0 +1,113 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.types.DN; /** * Multi domain DB cursor that only returns updates for the domains which have * been enabled for the external changelog. */ class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg> { private final ECLEnabledDomainPredicate predicate; private final MultiDomainDBCursor cursor; /** * Builds an instance of this class filtering updates from the provided cursor. * * @param predicate * tells whether a domain is enabled for the external changelog * @param cursor * the cursor whose updates will be filtered */ public ECLMultiDomainDBCursor(ECLEnabledDomainPredicate predicate, MultiDomainDBCursor cursor) { this.predicate = predicate; this.cursor = cursor; } /** {@inheritDoc} */ @Override public UpdateMsg getRecord() { return cursor.getRecord(); } /** * Returns the data associated to the cursor that returned the current record. * * @return the data associated to the cursor that returned the current record. */ public DN getData() { return cursor.getData(); } /** * Removes a replication domain from this cursor and stops iterating over it. * Removed cursors will be effectively removed on the next call to * {@link #next()}. * * @param baseDN * the replication domain's baseDN */ public void removeDomain(DN baseDN) { cursor.removeDomain(baseDN); } /** {@inheritDoc} */ @Override public boolean next() throws ChangelogException { // discard updates from non ECL enabled domains boolean hasNext; do { hasNext = cursor.next(); } while (hasNext && !predicate.isECLEnabledDomain(cursor.getData())); return hasNext; } /** {@inheritDoc} */ @Override public void close() { cursor.close(); } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " cursor=[" + cursor + ']'; } } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -42,7 +42,6 @@ import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.ReplicationServer; @@ -58,7 +57,6 @@ import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; import static org.opends.server.util.StaticUtils.*; /** @@ -256,8 +254,7 @@ } // unlucky, the domainMap does not exist: take the hit and create the // newValue, even though the same could be done concurrently by another // thread // newValue, even though the same could be done concurrently by another thread final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>(); final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue); if (previousValue != null) @@ -266,15 +263,10 @@ return previousValue; } // When called at replication startup, the isECLEnabledDomain() method blocks on STARTING state. // Checking cursors list ensure that it is never called in the startup case. if (!registeredMultiDomainCursors.isEmpty() && MultimasterReplication.isECLEnabledDomain(baseDN)) // we just created a new domain => update all cursors for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) { // we just created a new domain => update all cursors for (MultiDomainDBCursor cursor : registeredMultiDomainCursors) { cursor.addDomain(baseDN, null); } cursor.addDomain(baseDN, null); } return newValue; } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -44,7 +44,6 @@ import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.types.DN; import org.testng.annotations.*; @@ -136,6 +135,7 @@ private Map<DN, DomainDBCursor> domainDBCursors; private ChangelogState initialState; private Map<DN, ServerState> domainNewestCSNs; private ECLEnabledDomainPredicate predicate; private ChangeNumberIndexer cnIndexer; private MultiDomainServerState initialCookie; @@ -592,7 +592,7 @@ final SequentialDBCursor replicaDBCursor = new SequentialDBCursor(); replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor); if (isECLEnabledDomain2(baseDN)) if (predicate.isECLEnabledDomain(baseDN)) { DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN); if (domainDBCursor == null) @@ -627,24 +627,19 @@ private void startCNIndexer() { cnIndexer = new ChangeNumberIndexer(changelogDB, initialState) predicate = new ECLEnabledDomainPredicate() { @Override protected boolean isECLEnabledDomain(DN baseDN) public boolean isECLEnabledDomain(DN baseDN) { return isECLEnabledDomain2(baseDN); return eclEnabledDomains.contains(baseDN); } }; cnIndexer = new ChangeNumberIndexer(changelogDB, initialState, predicate); cnIndexer.start(); waitForWaitingState(cnIndexer); } private boolean isECLEnabledDomain2(DN baseDN) { return eclEnabledDomains.contains(baseDN); } private void stopCNIndexer() throws Exception { if (cnIndexer != null) opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursorTest.java
New file @@ -0,0 +1,170 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.util.HashSet; import java.util.Set; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opends.server.DirectoryServerTestCase; import org.opends.server.TestCaseUtils; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.types.DN; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; @SuppressWarnings("javadoc") public class ECLMultiDomainDBCursorTest extends DirectoryServerTestCase { @Mock private ReplicationDomainDB domainDB; private MultiDomainDBCursor multiDomainCursor; private ECLMultiDomainDBCursor eclCursor; private final Set<DN> eclEnabledDomains = new HashSet<DN>(); private ECLEnabledDomainPredicate predicate = new ECLEnabledDomainPredicate() { @Override public boolean isECLEnabledDomain(DN baseDN) { return eclEnabledDomains.contains(baseDN); } }; @BeforeMethod public void setup() throws Exception { TestCaseUtils.startFakeServer(); MockitoAnnotations.initMocks(this); multiDomainCursor = new MultiDomainDBCursor(domainDB, ON_MATCHING_KEY); eclCursor = new ECLMultiDomainDBCursor(predicate, multiDomainCursor); } @AfterMethod public void teardown() throws Exception { TestCaseUtils.shutdownFakeServer(); domainDB = null; multiDomainCursor = null; eclCursor.close(); eclCursor = null; eclEnabledDomains.clear(); } @Test public void testEmptyCursor() throws Exception { assertEmpty(); } @Test public void testECLDisabledDomainWithCursor() throws Exception { final DN baseDN = DN.decode("dc=example,dc=com"); final UpdateMsg msg1 = new FakeUpdateMsg(1); addDomainCursorToCursor(baseDN, new SequentialDBCursor(msg1)); assertEmpty(); } @Test public void testECLEnabledDomainWithCursor() throws Exception { final DN baseDN = DN.decode("dc=example,dc=com"); eclEnabledDomains.add(baseDN); final UpdateMsg msg1 = new FakeUpdateMsg(1); addDomainCursorToCursor(baseDN, new SequentialDBCursor(msg1)); assertSingleMessage(baseDN, msg1); } @Test(dependsOnMethods = { "testECLEnabledDomainWithCursor", "testECLDisabledDomainWithCursor" }) public void testECLEnabledAndDisabledDomainCursors() throws Exception { final DN baseDN1 = DN.decode("dc=example,dc=com"); final DN baseDN2 = DN.decode("cn=admin data"); eclEnabledDomains.add(baseDN1); final UpdateMsg msg1 = new FakeUpdateMsg(1); final UpdateMsg msg2 = new FakeUpdateMsg(2); final UpdateMsg msg3 = new FakeUpdateMsg(3); final UpdateMsg msg4 = new FakeUpdateMsg(4); addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg1, msg4)); addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg2, msg3)); assertMessagesInOrder(baseDN1, msg1, msg4); } private void assertEmpty() throws Exception { assertMessagesInOrder(null, null, null); } private void assertSingleMessage(DN baseDN, UpdateMsg msg1) throws Exception { assertMessagesInOrder(baseDN, msg1, null); } private void assertMessagesInOrder(DN baseDN, UpdateMsg msg1, UpdateMsg msg2) throws Exception { assertThat(eclCursor.getRecord()).isNull(); assertThat(eclCursor.getData()).isNull(); if (msg1 != null) { assertThat(eclCursor.next()).isTrue(); assertThat(eclCursor.getRecord()).isEqualTo(msg1); assertThat(eclCursor.getData()).isEqualTo(baseDN); } if (msg2 != null) { assertThat(eclCursor.next()).isTrue(); assertThat(eclCursor.getRecord()).isEqualTo(msg2); assertThat(eclCursor.getData()).isEqualTo(baseDN); } assertThat(eclCursor.next()).isFalse(); assertThat(eclCursor.getRecord()).isNull(); assertThat(eclCursor.getData()).isNull(); } private void addDomainCursorToCursor(DN baseDN, SequentialDBCursor cursor) throws ChangelogException { final ServerState state = new ServerState(); when(domainDB.getCursorFrom(baseDN, state, ON_MATCHING_KEY)).thenReturn(cursor); multiDomainCursor.addDomain(baseDN, state); } }