mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.06.2015 dc02a21390ac3b24e2eaa2505c823a33fd3eee07
opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java
@@ -1077,7 +1077,7 @@
      continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
      if (continueSearch)
      {
        final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get());
        final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN());
        if (updateMsg != null)
        {
          continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie);
@@ -1092,20 +1092,48 @@
  private void initializeCookieForChangeNumberMode(
      MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
  {
    ECLMultiDomainDBCursor eclCursor = null;
    try
    // Initialize the multi domain cursor only from the change number index record.
    // The cookie is always empty at this stage.
    CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN());
    MultiDomainServerState unused = new MultiDomainServerState();
    MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(unused, options);
    try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor))
    {
      cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
      CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
      MultiDomainDBCursor cursor =
          getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie, options);
      eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
      eclCursor.next();
      cookie.update(eclCursor.toCookie());
      updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord);
    }
    finally
  }
  /**
   * Rebuilds the changelogcookie starting at the newest change number index record.
   * <p>
   * It updates the provided cookie with the changes from the provided ECL cursor,
   * up to (and including) the provided change number index record.
   * <p>
   * Therefore, after calling this method, the cursor is positioned
   * to the change immediately following the provided change number index record.
   *
   * @param cookie the cookie to update
   * @param cursor the cursor where to read changes from
   * @param cnIndexRecord the change number index record to go right after
   * @throws ChangelogException if any problem occurs
   */
  public static void updateCookieToMediumConsistencyPoint(
      MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord)
          throws ChangelogException
  {
    if (cnIndexRecord == null)
    {
      close(eclCursor);
      return;
    }
    while (cursor.next())
    {
      UpdateMsg updateMsg = cursor.getRecord();
      if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0)
      {
        break;
      }
      cookie.update(cursor.getData(), updateMsg.getCSN());
    }
  }
@@ -1135,15 +1163,13 @@
   *           If inconsistency is detected between the available update
   *           messages and the provided cnIndexRecord
   */
  private UpdateMsg findReplicaUpdateMessage(
      final ChangeNumberIndexRecord cnIndexRecord,
      final MultiDomainDBCursor replicaUpdatesCursor)
          throws DirectoryException, ChangelogException
  private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn)
      throws ChangelogException, DirectoryException
  {
    while (true)
    {
      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
      final int compareIndexWithUpdateMsg = cnIndexRecord.getCSN().compareTo(updateMsg.getCSN());
      final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN());
      if (compareIndexWithUpdateMsg < 0) {
        // Either update message has been purged or baseDN has been removed from changelogDB,
        // ignore current index record and go to the next one
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
@@ -71,8 +71,7 @@
   * for the supplied domain baseDNs. If a supplied domain is
   * {@link DN#NULL_DN}, then all domains will be cleared.
   */
  private final ConcurrentSkipListSet<DN> domainsToClear =
      new ConcurrentSkipListSet<DN>();
  private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>();
  private final ChangelogDB changelogDB;
  /** Only used for initialization, and then discarded. */
  private ChangelogState changelogState;
@@ -108,6 +107,7 @@
   * @NonNull
   */
  private ECLMultiDomainDBCursor nextChangeForInsertDBCursor;
  private MultiDomainServerState cookie = new MultiDomainServerState();
  /**
   * Builds a ChangeNumberIndexer object.
@@ -317,30 +317,16 @@
  private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
  {
    final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN();
    CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
    MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint =
        domainDB.getCursorFrom(cookieWithNewestCSN, options);
    // Initialize the multi domain cursor only from the change number index record.
    // The cookie is always empty at this stage.
    final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
    final CSN newestCsn = newestRecord != null ? newestRecord.getCSN() : null;
    final CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, newestCsn);
    final MultiDomainServerState unused = new MultiDomainServerState();
    MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = domainDB.getCursorFrom(unused, options);
    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
    nextChangeForInsertDBCursor.next();
  }
  /** Returns a cookie initialised with the newest CSN for each replica. */
  private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException
  {
    final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
    final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState();
    if (newestRecord != null)
    {
      final CSN newestCsn = newestRecord.getCSN();
      for (DN baseDN : changelogState.getDomainToServerIds().keySet())
      {
        cookieWithNewestCSN.update(baseDN, newestCsn);
      }
    }
    return cookieWithNewestCSN;
    ChangelogBackend.updateCookieToMediumConsistencyPoint(cookie, nextChangeForInsertDBCursor, newestRecord);
  }
  private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
@@ -477,7 +463,11 @@
          // let's publish it to the CNIndexDB.
          final long changeNumber = changelogDB.getChangeNumberIndexDB()
              .addRecord(new ChangeNumberIndexRecord(baseDN, csn));
          MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie();
          if (!cookie.update(baseDN, csn))
          {
            throw new IllegalStateException("It was expected that change (baseDN=" + baseDN + ", csn=" + csn
                + ") would have updated the cookie=" + cookie + ", but it did not");
          }
          notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg);
          moveForwardMediumConsistencyPoint(csn, baseDN);
        }
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java
@@ -25,10 +25,13 @@
 */
package org.opends.server.replication.server.changelog.file;
import java.util.*;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.forgerock.util.Pair;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -46,15 +49,11 @@
 */
abstract class CompositeDBCursor<T> implements DBCursor<UpdateMsg>
{
  private static final byte UNINITIALIZED = 0;
  private static final byte READY = 1;
  private static final byte CLOSED = 2;
  /**
   * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or
   * {@link #CLOSED}
   */
  /** The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or {@link #CLOSED} */
  private byte state = UNINITIALIZED;
  /**
@@ -62,8 +61,7 @@
   * last time {@link DBCursor#next()} was called on them. Exhausted cursors
   * might be recycled at some point when they start returning changes again.
   */
  private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors =
      new HashMap<DBCursor<UpdateMsg>, T>();
  private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors = new HashMap<>();
  /**
   * The cursors are sorted based on the current change of each cursor to
   * consider the next change across all available cursors.
@@ -73,8 +71,7 @@
   * thrown about
   * "Non-transactional Cursors may not be used in multiple threads;".
   */
  private final TreeMap<DBCursor<UpdateMsg>, T> cursors =
      new TreeMap<DBCursor<UpdateMsg>, T>(
  private final TreeMap<DBCursor<UpdateMsg>, T> cursors = new TreeMap<>(
          new Comparator<DBCursor<UpdateMsg>>()
          {
            @Override
@@ -82,7 +79,20 @@
            {
              final CSN csn1 = o1.getRecord().getCSN();
              final CSN csn2 = o2.getRecord().getCSN();
              return CSN.compare(csn1, csn2);
              int cmpCsn = CSN.compare(csn1, csn2);
              if (cmpCsn == 0
                  && o1 instanceof CompositeDBCursor
                  && o2 instanceof CompositeDBCursor)
              {
                // Ensures a consistent order when the CSNs are equal (rare in practice)
                T data1 = ((CompositeDBCursor<T>) o1).getData();
                T data2 = ((CompositeDBCursor<T>) o1).getData();
                if (data1 instanceof Comparable && data2 instanceof Comparable)
                {
                  return ((Comparable<T>) data1).compareTo(data2);
                }
              }
              return cmpCsn;
            }
          });
@@ -215,36 +225,6 @@
    return null;
  }
  /**
   * Returns a snapshot of this cursor.
   *
   * @return a list of (Data, UpdateMsg) pairs representing the state of the
   *         cursor. In each pair, the data or the update message may be
   *         {@code null}, but at least one of them is non-null.
   */
  public List<Pair<T, UpdateMsg>> getSnapshot()
  {
    final List<Pair<T, UpdateMsg>> snapshot = new ArrayList<Pair<T, UpdateMsg>>();
    for (Entry<DBCursor<UpdateMsg>, T> entry : cursors.entrySet())
    {
      final UpdateMsg updateMsg = entry.getKey().getRecord();
      final T data = entry.getValue();
      if (updateMsg != null || data != null)
      {
        snapshot.add(Pair.of(data, updateMsg));
      }
    }
    for (T data : exhaustedCursors.values())
    {
      if (data != null)
      {
        snapshot.add(Pair.of(data, (UpdateMsg) null));
      }
    }
    return snapshot;
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
@@ -255,12 +235,10 @@
    exhaustedCursors.clear();
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " openCursors=" + cursors
        + " exhaustedCursors=" + exhaustedCursors;
  }
}
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
@@ -24,11 +24,6 @@
 */
package org.opends.server.replication.server.changelog.file;
import java.util.ArrayList;
import java.util.List;
import org.forgerock.util.Pair;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
@@ -40,7 +35,6 @@
 */
public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
{
  private final ECLEnabledDomainPredicate predicate;
  private final MultiDomainDBCursor cursor;
@@ -88,7 +82,6 @@
    cursor.removeDomain(baseDN);
  }
  /** {@inheritDoc} */
  @Override
  public boolean next() throws ChangelogException
  {
@@ -102,59 +95,15 @@
    return hasNext;
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    cursor.close();
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " cursor=[" + cursor + ']';
  }
  /**
   * Returns a snapshot of this cursor.
   *
   * @return a list of (DN, UpdateMsg) pairs, containing all base DNs enabled
   *         for the external changelog. The update message may be {@code null}.
   */
  List<Pair<DN, UpdateMsg>> getSnapshot()
  {
    final List<Pair<DN, UpdateMsg>> snapshot = cursor.getSnapshot();
    final List<Pair<DN, UpdateMsg>> eclSnapshot = new ArrayList<Pair<DN,UpdateMsg>>();
    for (Pair<DN, UpdateMsg> pair : snapshot)
    {
      DN baseDN = pair.getFirst();
      if (predicate.isECLEnabledDomain(baseDN))
      {
        eclSnapshot.add(pair);
      }
    }
    return eclSnapshot;
  }
  /**
   * Returns the cookie corresponding to the state of this cursor.
   *
   * @return a valid cookie taking into account only the base DNs enabled for
   *         the external changelog
   */
  public MultiDomainServerState toCookie()
  {
    List<Pair<DN, UpdateMsg>> snapshot = getSnapshot();
    MultiDomainServerState cookie = new MultiDomainServerState();
    for (Pair<DN, UpdateMsg> pair : snapshot)
    {
      // only put base DNs where a CSN is available in the cookie
      if (pair.getSecond() != null)
      {
        cookie.update(pair.getFirst(), pair.getSecond().getCSN());
      }
    }
    return cookie;
  }
}
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -729,9 +729,10 @@
    final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN();
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
          startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
          actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN);
      final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -221,12 +221,11 @@
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    RepositionableCursor<CSN, UpdateMsg> cursor = log.getCursor(startCSN, matchingStrategy, positionStrategy);
    return new FileReplicaDBCursor(cursor, startCSN, positionStrategy);
    CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null;
    return new FileReplicaDBCursor(cursor, actualStartCSN, positionStrategy);
  }
  /**
   * Shutdown this ReplicaDB.
   */
  /** Shutdown this ReplicaDB. */
  void shutdown()
  {
    if (shutdown.compareAndSet(false, true))
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -764,7 +764,6 @@
    return null;
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
      CursorOptions options) throws ChangelogException
@@ -772,9 +771,10 @@
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN();
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
          startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
          actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN);
      final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -195,12 +195,11 @@
  DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    return new JEReplicaDBCursor(db, startCSN, matchingStrategy, positionStrategy, this);
    CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null;
    return new JEReplicaDBCursor(db, actualStartCSN, matchingStrategy, positionStrategy, this);
  }
  /**
   * Shutdown this ReplicaDB.
   */
  /** Shutdown this ReplicaDB. */
  void shutdown()
  {
    if (shutdown.compareAndSet(false, true))