mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.04.2014 938dda347b7223b73a1c5d5c47c8674ecdd90102
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -541,11 +541,10 @@
        /*
         * replica is not back online, Medium consistency point has gone past
         * its last offline time, and there are no more changes after the
         * offline CSN in the cursor: remove everything known about it:
         * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
         * this replica from the medium consistency RUV.
         * offline CSN in the cursor: remove everything known about it
         * (offlineCSN from lastAliveCSN and remove all knowledge of this replica
         * from the medium consistency RUV).
         */
        // TODO JNR how to close cursor for offline replica?
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -87,6 +88,8 @@
      new HashMap<DN, List<DomainDBCursor>>();
  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
      new CopyOnWriteArrayList<MultiDomainDBCursor>();
  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
  private ReplicationDbEnv replicationEnv;
  private final ReplicationServerCfg config;
  private final File dbDirectory;
@@ -720,7 +723,7 @@
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
  private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
@@ -751,21 +754,31 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
      PositionStrategy positionStrategy) throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
        + " is not supported for the JE implementation fo changelog");
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor =
          replicaDB.generateCursorFrom(startCSN);
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);
      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
      synchronized (replicaCursors)
      {
        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
        if (cursors == null)
        {
          cursors = new ArrayList<ReplicaCursor>();
          replicaCursors.put(replicaID, cursors);
        }
        cursors.add(replicaCursor);
      }
      return replicaCursor;
    }
    return EMPTY_CURSOR_REPLICA_DB;
  }
@@ -790,6 +803,15 @@
        }
      }
    }
    else if (cursor instanceof ReplicaCursor)
    {
      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
      final List<ReplicaCursor> cursors =  replicaCursors.get(replicaCursor.getReplicaID());
      if (cursors != null)
      {
        cursors.remove(cursor);
      }
    }
  }
  /** {@inheritDoc} */
@@ -831,6 +853,19 @@
    {
      indexer.replicaOffline(baseDN, offlineCSN);
    }
    updateCursorsWithOfflineCSN(baseDN, offlineCSN);
  }
  private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
  {
    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
    if (cursors != null && !cursors.isEmpty())
    {
      for (ReplicaCursor cursor : cursors)
      {
        cursor.setOfflineCSN(offlineCSN);
      }
    }
  }
  /**
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java
New file
@@ -0,0 +1,171 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import com.forgerock.opendj.util.Pair;
/**
 * {@link DBCursor} over a replica returning {@link UpdateMsg}s.
 * <p>
 * It decorates an existing {@link DBCursor} on a replicaDB and can possibly
 * return replica offline messages when the decorated DBCursor is exhausted and
 * the offline CSN is newer than the last returned update CSN.
 */
public class ReplicaCursor implements DBCursor<UpdateMsg>
{
  /** @NonNull */
  private final DBCursor<UpdateMsg> cursor;
  private final AtomicReference<ReplicaOfflineMsg> replicaOfflineMsg =
      new AtomicReference<ReplicaOfflineMsg>();
  private UpdateMsg currentRecord;
  private final Pair<DN, Integer> replicaID;
  private final ReplicationDomainDB domainDB;
  /**
   * Creates a ReplicaCursor object with a cursor to decorate
   * and an offlineCSN to return as part of a ReplicaOfflineMsg.
   *
   * @param cursor
   *          the non-null underlying cursor that needs to be exhausted before
   *          we return a ReplicaOfflineMsg
   * @param offlineCSN
   *          the offline CSN from which to builder the
   *          {@link ReplicaOfflineMsg} to return
   * @param replicaID
   *          the baseDN => serverId pair to uniquely identify the replica
   * @param domainDB
   *          the DB for the provided replication domain
   */
  public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN,
      Pair<DN, Integer> replicaID, ReplicationDomainDB domainDB)
  {
    this.cursor = cursor;
    this.replicaID = replicaID;
    this.domainDB = domainDB;
    setOfflineCSN(offlineCSN);
  }
  /**
   * Sets the offline CSN to be returned by this cursor.
   *
   * @param offlineCSN
   *          The offline CSN to be returned by this cursor.
   *          If null, it will unset any previous offlineCSN and never return a ReplicaOfflineMsg
   */
  public void setOfflineCSN(CSN offlineCSN)
  {
    this.replicaOfflineMsg.set(
        offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null);
  }
  /** {@inheritDoc} */
  @Override
  public UpdateMsg getRecord()
  {
    return currentRecord;
  }
  /**
   * Returns the replica identifier that this cursor is associated to.
   *
   * @return the replica identifier that this cursor is associated to
   */
  public Pair<DN, Integer> getReplicaID()
  {
    return replicaID;
  }
  /** {@inheritDoc} */
  @Override
  public boolean next() throws ChangelogException
  {
    final ReplicaOfflineMsg offlineMsg1 = replicaOfflineMsg.get();
    if (isReplicaOfflineMsgOutdated(offlineMsg1, currentRecord))
    {
      replicaOfflineMsg.compareAndSet(offlineMsg1, null);
    }
    // now verify if new changes have been added to the DB
    // (cursors are automatically restarted)
    final UpdateMsg lastUpdate = cursor.getRecord();
    final boolean hasNext = cursor.next();
    if (hasNext)
    {
      currentRecord = cursor.getRecord();
      return true;
    }
    // replicaDB just happened to be exhausted now
    final ReplicaOfflineMsg offlineMsg2 = replicaOfflineMsg.get();
    if (isReplicaOfflineMsgOutdated(offlineMsg2, lastUpdate))
    {
      replicaOfflineMsg.compareAndSet(offlineMsg2, null);
      currentRecord = null;
      return false;
    }
    currentRecord = offlineMsg2;
    return currentRecord != null;
  }
  /** It could also mean that the replica offline message has already been consumed. */
  private boolean isReplicaOfflineMsgOutdated(
      final ReplicaOfflineMsg offlineMsg, final UpdateMsg updateMsg)
  {
    return offlineMsg != null
        && updateMsg != null
        && offlineMsg.getCSN().isOlderThanOrEqualTo(updateMsg.getCSN());
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    cursor.close();
    domainDB.unregisterCursor(this);
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    final ReplicaOfflineMsg msg = replicaOfflineMsg.get();
    return getClass().getSimpleName()
        + " currentRecord=" + currentRecord
        + " offlineCSN=" + (msg != null ? msg.getCSN().toStringUI() : null)
        + " cursor=" + cursor.toString().split("", 2)[1];
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
File was deleted
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java
File was renamed from opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
@@ -35,10 +35,10 @@
import static org.assertj.core.api.Assertions.*;
/**
 * Test the ReplicaOfflineCursor class.
 * Test the {@link ReplicaCursor} class.
 */
@SuppressWarnings("javadoc")
public class ReplicaOfflineCursorTest extends ReplicationTestCase
public class ReplicaCursorTest extends ReplicationTestCase
{
  private int timestamp;
@@ -55,7 +55,7 @@
  {
    delegateCursor = new SequentialDBCursor();
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, null);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
@@ -67,7 +67,7 @@
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(updateMsg);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, null);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -81,7 +81,7 @@
    delegateCursor = new SequentialDBCursor();
    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, offlineCSN);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    final UpdateMsg record = cursor.getRecord();
@@ -98,7 +98,7 @@
    delegateCursor = new SequentialDBCursor(updateMsg);
    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, offlineCSN);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -118,7 +118,7 @@
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(updateMsg);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, outdatedOfflineCSN);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -126,4 +126,9 @@
    assertThat(cursor.getRecord()).isNull();
  }
  private ReplicaCursor newReplicaCursor(DBCursor<UpdateMsg> delegateCursor, CSN offlineCSN)
  {
    return new ReplicaCursor(delegateCursor, offlineCSN, null, null);
  }
}