opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -29,6 +29,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -49,7 +50,7 @@ import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer; import org.opends.server.replication.server.changelog.je.DomainDBCursor; import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor; import org.opends.server.replication.server.changelog.je.ReplicaCursor; import org.opends.server.types.DN; import org.opends.server.types.DebugLogLevel; import org.opends.server.util.StaticUtils; @@ -94,6 +95,8 @@ new HashMap<DN, List<DomainDBCursor>>(); private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<MultiDomainDBCursor>(); private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors = new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR); private ReplicationEnvironment replicationEnv; private final ReplicationServerCfg config; private final File dbDirectory; @@ -714,16 +717,28 @@ /** {@inheritDoc} */ @Override public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException final PositionStrategy positionStrategy) throws ChangelogException { final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); if (replicaDB != null) { final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); // TODO JNR if (offlineCSN != null) ?? // What about replicas that suddenly become offline? return new ReplicaOfflineCursor(cursor, offlineCSN); final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); synchronized (replicaCursors) { List<ReplicaCursor> cursors = replicaCursors.get(replicaID); if (cursors == null) { cursors = new ArrayList<ReplicaCursor>(); replicaCursors.put(replicaID, cursors); } cursors.add(replicaCursor); } return replicaCursor; } return EMPTY_CURSOR_REPLICA_DB; } @@ -748,6 +763,15 @@ } } } else if (cursor instanceof ReplicaCursor) { final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); if (cursors != null) { cursors.remove(cursor); } } } /** {@inheritDoc} */ @@ -788,6 +812,7 @@ { replicationEnv.notifyReplicaOnline(baseDN, serverId); } updateCursorsWithOfflineCSN(baseDN, serverId, null); } /** {@inheritDoc} */ @@ -800,6 +825,19 @@ { indexer.replicaOffline(baseDN, offlineCSN); } updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN); } private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN) { final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId)); if (cursors != null) { for (ReplicaCursor cursor : cursors) { cursor.setOfflineCSN(offlineCSN); } } } /** opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -558,11 +558,10 @@ /* * replica is not back online, Medium consistency point has gone past * its last offline time, and there are no more changes after the * offline CSN in the cursor: remove everything known about it: * cursor, offlineCSN from lastAliveCSN and remove all knowledge of * this replica from the medium consistency RUV. * offline CSN in the cursor: remove everything known about it * (offlineCSN from lastAliveCSN and remove all knowledge of this replica * from the medium consistency RUV). */ // TODO JNR how to close cursor for offline replica? lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -90,6 +91,8 @@ new HashMap<DN, List<DomainDBCursor>>(); private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<MultiDomainDBCursor>(); private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors = new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR); private ReplicationDbEnv replicationEnv; private final ReplicationServerCfg config; private final File dbDirectory; @@ -728,7 +731,7 @@ return cursor; } private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy) private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy) { synchronized (registeredDomainCursors) { @@ -759,21 +762,31 @@ /** {@inheritDoc} */ @Override public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, final PositionStrategy positionStrategy) throws ChangelogException { Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY" + " is not supported for the JE implementation fo changelog"); final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); if (replicaDB != null) { final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN); final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN); final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); // TODO JNR if (offlineCSN != null) ?? // What about replicas that suddenly become offline? return new ReplicaOfflineCursor(cursor, offlineCSN); final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); synchronized (replicaCursors) { List<ReplicaCursor> cursors = replicaCursors.get(replicaID); if (cursors == null) { cursors = new ArrayList<ReplicaCursor>(); replicaCursors.put(replicaID, cursors); } cursors.add(replicaCursor); } return replicaCursor; } return EMPTY_CURSOR_REPLICA_DB; } @@ -798,6 +811,15 @@ } } } else if (cursor instanceof ReplicaCursor) { final ReplicaCursor replicaCursor = (ReplicaCursor) cursor; final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID()); if (cursors != null) { cursors.remove(cursor); } } } /** {@inheritDoc} */ @@ -838,6 +860,7 @@ { replicationEnv.notifyReplicaOnline(baseDN, serverId); } updateCursorsWithOfflineCSN(baseDN, null); } /** {@inheritDoc} */ @@ -850,6 +873,19 @@ { indexer.replicaOffline(baseDN, offlineCSN); } updateCursorsWithOfflineCSN(baseDN, offlineCSN); } private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN) { final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN)); if (cursors != null && !cursors.isEmpty()) { for (ReplicaCursor cursor : cursors) { cursor.setOfflineCSN(offlineCSN); } } } /** opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java
New file @@ -0,0 +1,171 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.util.concurrent.atomic.AtomicReference; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.ReplicaOfflineMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.types.DN; import com.forgerock.opendj.util.Pair; /** * {@link DBCursor} over a replica returning {@link UpdateMsg}s. * <p> * It decorates an existing {@link DBCursor} on a replicaDB and can possibly * return replica offline messages when the decorated DBCursor is exhausted and * the offline CSN is newer than the last returned update CSN. */ public class ReplicaCursor implements DBCursor<UpdateMsg> { /** @NonNull */ private final DBCursor<UpdateMsg> cursor; private final AtomicReference<ReplicaOfflineMsg> replicaOfflineMsg = new AtomicReference<ReplicaOfflineMsg>(); private UpdateMsg currentRecord; private final Pair<DN, Integer> replicaID; private final ReplicationDomainDB domainDB; /** * Creates a ReplicaCursor object with a cursor to decorate * and an offlineCSN to return as part of a ReplicaOfflineMsg. * * @param cursor * the non-null underlying cursor that needs to be exhausted before * we return a ReplicaOfflineMsg * @param offlineCSN * the offline CSN from which to builder the * {@link ReplicaOfflineMsg} to return * @param replicaID * the baseDN => serverId pair to uniquely identify the replica * @param domainDB * the DB for the provided replication domain */ public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN, Pair<DN, Integer> replicaID, ReplicationDomainDB domainDB) { this.cursor = cursor; this.replicaID = replicaID; this.domainDB = domainDB; setOfflineCSN(offlineCSN); } /** * Sets the offline CSN to be returned by this cursor. * * @param offlineCSN * The offline CSN to be returned by this cursor. * If null, it will unset any previous offlineCSN and never return a ReplicaOfflineMsg */ public void setOfflineCSN(CSN offlineCSN) { this.replicaOfflineMsg.set( offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null); } /** {@inheritDoc} */ @Override public UpdateMsg getRecord() { return currentRecord; } /** * Returns the replica identifier that this cursor is associated to. * * @return the replica identifier that this cursor is associated to */ public Pair<DN, Integer> getReplicaID() { return replicaID; } /** {@inheritDoc} */ @Override public boolean next() throws ChangelogException { final ReplicaOfflineMsg offlineMsg1 = replicaOfflineMsg.get(); if (isReplicaOfflineMsgOutdated(offlineMsg1, currentRecord)) { replicaOfflineMsg.compareAndSet(offlineMsg1, null); } // now verify if new changes have been added to the DB // (cursors are automatically restarted) final UpdateMsg lastUpdate = cursor.getRecord(); final boolean hasNext = cursor.next(); if (hasNext) { currentRecord = cursor.getRecord(); return true; } // replicaDB just happened to be exhausted now final ReplicaOfflineMsg offlineMsg2 = replicaOfflineMsg.get(); if (isReplicaOfflineMsgOutdated(offlineMsg2, lastUpdate)) { replicaOfflineMsg.compareAndSet(offlineMsg2, null); currentRecord = null; return false; } currentRecord = offlineMsg2; return currentRecord != null; } /** It could also mean that the replica offline message has already been consumed. */ private boolean isReplicaOfflineMsgOutdated( final ReplicaOfflineMsg offlineMsg, final UpdateMsg updateMsg) { return offlineMsg != null && updateMsg != null && offlineMsg.getCSN().isOlderThanOrEqualTo(updateMsg.getCSN()); } /** {@inheritDoc} */ @Override public void close() { cursor.close(); domainDB.unregisterCursor(this); } /** {@inheritDoc} */ @Override public String toString() { final ReplicaOfflineMsg msg = replicaOfflineMsg.get(); return getClass().getSimpleName() + " currentRecord=" + currentRecord + " offlineCSN=" + (msg != null ? msg.getCSN().toStringUI() : null) + " cursor=" + cursor.toString().split("", 2)[1]; } } opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
File was deleted opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java @@ -35,10 +35,10 @@ import static org.assertj.core.api.Assertions.*; /** * Test the ReplicaOfflineCursor class. * Test the {@link ReplicaCursor} class. */ @SuppressWarnings("javadoc") public class ReplicaOfflineCursorTest extends ReplicationTestCase public class ReplicaCursorTest extends ReplicationTestCase { private int timestamp; @@ -55,7 +55,7 @@ { delegateCursor = new SequentialDBCursor(); final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null); final ReplicaCursor cursor = newReplicaCursor(delegateCursor, null); assertThat(cursor.getRecord()).isNull(); assertThat(cursor.next()).isFalse(); assertThat(cursor.getRecord()).isNull(); @@ -67,7 +67,7 @@ final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); delegateCursor = new SequentialDBCursor(updateMsg); final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null); final ReplicaCursor cursor = newReplicaCursor(delegateCursor, null); assertThat(cursor.getRecord()).isNull(); assertThat(cursor.next()).isTrue(); assertThat(cursor.getRecord()).isSameAs(updateMsg); @@ -81,7 +81,7 @@ delegateCursor = new SequentialDBCursor(); final CSN offlineCSN = new CSN(timestamp++, 1, 1); final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN); final ReplicaCursor cursor = newReplicaCursor(delegateCursor, offlineCSN); assertThat(cursor.getRecord()).isNull(); assertThat(cursor.next()).isTrue(); final UpdateMsg record = cursor.getRecord(); @@ -98,7 +98,7 @@ delegateCursor = new SequentialDBCursor(updateMsg); final CSN offlineCSN = new CSN(timestamp++, 1, 1); final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN); final ReplicaCursor cursor = newReplicaCursor(delegateCursor, offlineCSN); assertThat(cursor.getRecord()).isNull(); assertThat(cursor.next()).isTrue(); assertThat(cursor.getRecord()).isSameAs(updateMsg); @@ -118,7 +118,7 @@ final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); delegateCursor = new SequentialDBCursor(updateMsg); final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN); final ReplicaCursor cursor = newReplicaCursor(delegateCursor, outdatedOfflineCSN); assertThat(cursor.getRecord()).isNull(); assertThat(cursor.next()).isTrue(); assertThat(cursor.getRecord()).isSameAs(updateMsg); @@ -126,4 +126,9 @@ assertThat(cursor.getRecord()).isNull(); } private ReplicaCursor newReplicaCursor(DBCursor<UpdateMsg> delegateCursor, CSN offlineCSN) { return new ReplicaCursor(delegateCursor, offlineCSN, null, null); } }