mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.41.2014 dc20d17584703e0736ef3982bc0dc18b4d11bb37
OPENDJ-1441 (CR-4244) Persistent searches on external changelog do not return changes for new replicas and new domains

Follow up of r10944 (and r10912 originally).
Update ReplicaCursors with new replica offline information (For example, when replica came online or when there is more up to date replica offline information).
Renamed ReplicaOfflineCursor(Test) to ReplicaCursor(Test)



FileChangelogDB.java, JEChangelogDB.java:
Added replicaCursors Map field.
In getCursorFrom(), registered replicaCursors.
In unregisterCursor(), handled ReplicaCursors.
In notifyReplicaOnline() and notifyReplicaOffline(), called new method updateCursorsWithOfflineCSN().

ReplicaOfflineCursor.java: RENAMED to ReplicaCursor
Removed returnReplicaOfflineMsg field.
Changed replicaOfflineMsg field from ReplicaOfflineMsg to AtomicReference<ReplicaOfflineMsg>.
In next(), rewrote the code to support resetting the replica offline CSN + added isReplicaOfflineMsgOutdated() and setOfflineCSN()
Added replicaID and domainDB fields + called ReplicationDomainDB.unregisterCursor() in close() + added getReplicaID().

ReplicaOfflineCursorTest.java: RENAMED to ReplicaOfflineCursorTest
Consequence of the change to ReplicaCursor.

ChangeNumberIndexer.java:
Removed outdated TODO
1 files deleted
1 files added
1 files renamed
3 files modified
425 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 48 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 54 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java 171 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java 126 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -49,7 +50,7 @@
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.je.DomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor;
import org.opends.server.replication.server.changelog.je.ReplicaCursor;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
@@ -94,6 +95,8 @@
      new HashMap<DN, List<DomainDBCursor>>();
  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
      new CopyOnWriteArrayList<MultiDomainDBCursor>();
  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
  private ReplicationEnvironment replicationEnv;
  private final ReplicationServerCfg config;
  private final File dbDirectory;
@@ -714,16 +717,28 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
      PositionStrategy positionStrategy) throws ChangelogException
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);
      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
      synchronized (replicaCursors)
      {
        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
        if (cursors == null)
        {
          cursors = new ArrayList<ReplicaCursor>();
          replicaCursors.put(replicaID, cursors);
        }
        cursors.add(replicaCursor);
      }
      return replicaCursor;
    }
    return EMPTY_CURSOR_REPLICA_DB;
  }
@@ -748,6 +763,15 @@
        }
      }
    }
    else if (cursor instanceof ReplicaCursor)
    {
      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
      final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
      if (cursors != null)
      {
        cursors.remove(cursor);
      }
    }
  }
  /** {@inheritDoc} */
@@ -788,6 +812,7 @@
    {
      replicationEnv.notifyReplicaOnline(baseDN, serverId);
    }
    updateCursorsWithOfflineCSN(baseDN, serverId, null);
  }
  /** {@inheritDoc} */
@@ -800,6 +825,19 @@
    {
      indexer.replicaOffline(baseDN, offlineCSN);
    }
    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
  }
  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
  {
    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
    if (cursors != null)
    {
      for (ReplicaCursor cursor : cursors)
      {
        cursor.setOfflineCSN(offlineCSN);
      }
    }
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -558,11 +558,10 @@
        /*
         * replica is not back online, Medium consistency point has gone past
         * its last offline time, and there are no more changes after the
         * offline CSN in the cursor: remove everything known about it:
         * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
         * this replica from the medium consistency RUV.
         * offline CSN in the cursor: remove everything known about it
         * (offlineCSN from lastAliveCSN and remove all knowledge of this replica
         * from the medium consistency RUV).
         */
        // TODO JNR how to close cursor for offline replica?
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -90,6 +91,8 @@
      new HashMap<DN, List<DomainDBCursor>>();
  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
      new CopyOnWriteArrayList<MultiDomainDBCursor>();
  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
  private ReplicationDbEnv replicationEnv;
  private final ReplicationServerCfg config;
  private final File dbDirectory;
@@ -728,7 +731,7 @@
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
  private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
@@ -759,21 +762,31 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
      PositionStrategy positionStrategy) throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
        + " is not supported for the JE implementation fo changelog");
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor =
          replicaDB.generateCursorFrom(startCSN);
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);
      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
      synchronized (replicaCursors)
      {
        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
        if (cursors == null)
        {
          cursors = new ArrayList<ReplicaCursor>();
          replicaCursors.put(replicaID, cursors);
        }
        cursors.add(replicaCursor);
      }
      return replicaCursor;
    }
    return EMPTY_CURSOR_REPLICA_DB;
  }
@@ -798,6 +811,15 @@
        }
      }
    }
    else if (cursor instanceof ReplicaCursor)
    {
      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
      final List<ReplicaCursor> cursors =  replicaCursors.get(replicaCursor.getReplicaID());
      if (cursors != null)
      {
        cursors.remove(cursor);
      }
    }
  }
  /** {@inheritDoc} */
@@ -838,6 +860,7 @@
    {
      replicationEnv.notifyReplicaOnline(baseDN, serverId);
    }
    updateCursorsWithOfflineCSN(baseDN, null);
  }
  /** {@inheritDoc} */
@@ -850,6 +873,19 @@
    {
      indexer.replicaOffline(baseDN, offlineCSN);
    }
    updateCursorsWithOfflineCSN(baseDN, offlineCSN);
  }
  private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
  {
    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
    if (cursors != null && !cursors.isEmpty())
    {
      for (ReplicaCursor cursor : cursors)
      {
        cursor.setOfflineCSN(offlineCSN);
      }
    }
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java
New file
@@ -0,0 +1,171 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import com.forgerock.opendj.util.Pair;
/**
 * {@link DBCursor} over a replica returning {@link UpdateMsg}s.
 * <p>
 * It decorates an existing {@link DBCursor} on a replicaDB and can possibly
 * return replica offline messages when the decorated DBCursor is exhausted and
 * the offline CSN is newer than the last returned update CSN.
 */
public class ReplicaCursor implements DBCursor<UpdateMsg>
{
  /** @NonNull */
  private final DBCursor<UpdateMsg> cursor;
  private final AtomicReference<ReplicaOfflineMsg> replicaOfflineMsg =
      new AtomicReference<ReplicaOfflineMsg>();
  private UpdateMsg currentRecord;
  private final Pair<DN, Integer> replicaID;
  private final ReplicationDomainDB domainDB;
  /**
   * Creates a ReplicaCursor object with a cursor to decorate
   * and an offlineCSN to return as part of a ReplicaOfflineMsg.
   *
   * @param cursor
   *          the non-null underlying cursor that needs to be exhausted before
   *          we return a ReplicaOfflineMsg
   * @param offlineCSN
   *          the offline CSN from which to builder the
   *          {@link ReplicaOfflineMsg} to return
   * @param replicaID
   *          the baseDN => serverId pair to uniquely identify the replica
   * @param domainDB
   *          the DB for the provided replication domain
   */
  public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN,
      Pair<DN, Integer> replicaID, ReplicationDomainDB domainDB)
  {
    this.cursor = cursor;
    this.replicaID = replicaID;
    this.domainDB = domainDB;
    setOfflineCSN(offlineCSN);
  }
  /**
   * Sets the offline CSN to be returned by this cursor.
   *
   * @param offlineCSN
   *          The offline CSN to be returned by this cursor.
   *          If null, it will unset any previous offlineCSN and never return a ReplicaOfflineMsg
   */
  public void setOfflineCSN(CSN offlineCSN)
  {
    this.replicaOfflineMsg.set(
        offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null);
  }
  /** {@inheritDoc} */
  @Override
  public UpdateMsg getRecord()
  {
    return currentRecord;
  }
  /**
   * Returns the replica identifier that this cursor is associated to.
   *
   * @return the replica identifier that this cursor is associated to
   */
  public Pair<DN, Integer> getReplicaID()
  {
    return replicaID;
  }
  /** {@inheritDoc} */
  @Override
  public boolean next() throws ChangelogException
  {
    final ReplicaOfflineMsg offlineMsg1 = replicaOfflineMsg.get();
    if (isReplicaOfflineMsgOutdated(offlineMsg1, currentRecord))
    {
      replicaOfflineMsg.compareAndSet(offlineMsg1, null);
    }
    // now verify if new changes have been added to the DB
    // (cursors are automatically restarted)
    final UpdateMsg lastUpdate = cursor.getRecord();
    final boolean hasNext = cursor.next();
    if (hasNext)
    {
      currentRecord = cursor.getRecord();
      return true;
    }
    // replicaDB just happened to be exhausted now
    final ReplicaOfflineMsg offlineMsg2 = replicaOfflineMsg.get();
    if (isReplicaOfflineMsgOutdated(offlineMsg2, lastUpdate))
    {
      replicaOfflineMsg.compareAndSet(offlineMsg2, null);
      currentRecord = null;
      return false;
    }
    currentRecord = offlineMsg2;
    return currentRecord != null;
  }
  /** It could also mean that the replica offline message has already been consumed. */
  private boolean isReplicaOfflineMsgOutdated(
      final ReplicaOfflineMsg offlineMsg, final UpdateMsg updateMsg)
  {
    return offlineMsg != null
        && updateMsg != null
        && offlineMsg.getCSN().isOlderThanOrEqualTo(updateMsg.getCSN());
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    cursor.close();
    domainDB.unregisterCursor(this);
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    final ReplicaOfflineMsg msg = replicaOfflineMsg.get();
    return getClass().getSimpleName()
        + " currentRecord=" + currentRecord
        + " offlineCSN=" + (msg != null ? msg.getCSN().toStringUI() : null)
        + " cursor=" + cursor.toString().split("", 2)[1];
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
File was deleted
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
@@ -35,10 +35,10 @@
import static org.assertj.core.api.Assertions.*;
/**
 * Test the ReplicaOfflineCursor class.
 * Test the {@link ReplicaCursor} class.
 */
@SuppressWarnings("javadoc")
public class ReplicaOfflineCursorTest extends ReplicationTestCase
public class ReplicaCursorTest extends ReplicationTestCase
{
  private int timestamp;
@@ -55,7 +55,7 @@
  {
    delegateCursor = new SequentialDBCursor();
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, null);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isFalse();
    assertThat(cursor.getRecord()).isNull();
@@ -67,7 +67,7 @@
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(updateMsg);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, null);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -81,7 +81,7 @@
    delegateCursor = new SequentialDBCursor();
    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, offlineCSN);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    final UpdateMsg record = cursor.getRecord();
@@ -98,7 +98,7 @@
    delegateCursor = new SequentialDBCursor(updateMsg);
    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, offlineCSN);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -118,7 +118,7 @@
    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
    delegateCursor = new SequentialDBCursor(updateMsg);
    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, outdatedOfflineCSN);
    assertThat(cursor.getRecord()).isNull();
    assertThat(cursor.next()).isTrue();
    assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -126,4 +126,9 @@
    assertThat(cursor.getRecord()).isNull();
  }
  private ReplicaCursor newReplicaCursor(DBCursor<UpdateMsg> delegateCursor, CSN offlineCSN)
  {
    return new ReplicaCursor(delegateCursor, offlineCSN, null, null);
  }
}