mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
16.05.2014 f8c7aa009c68e43c46ee23bc27a211b1687d8502
opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -119,6 +119,7 @@
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.plugin.MultimasterReplication.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.LDIFWriter.*;
import static org.opends.server.util.ServerConstants.*;
@@ -393,7 +394,7 @@
      {
        final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
        final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
            new MultiDomainServerState(), ON_MATCHING_KEY, getExcludedBaseDNs());
            new MultiDomainServerState(), GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, getExcludedBaseDNs());
        try
        {
          baseEntryHasSubordinates = cursor.next();
@@ -1007,7 +1008,7 @@
    {
      final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
      final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
          cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
          cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
      replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
      boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
@@ -1144,12 +1145,13 @@
    try
    {
      cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber);
      MultiDomainServerState cookie = new MultiDomainServerState();
      final boolean continueSearch =
          sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor);
          sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor, cookie);
      if (continueSearch)
      {
        entrySender.transitioningToPersistentSearchPhase();
        sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor);
        sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor, cookie);
      }
    }
    finally
@@ -1161,7 +1163,8 @@
  private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender,
      final SearchParams params, DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor,
      AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor) throws ChangelogException, DirectoryException
      AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor, MultiDomainServerState cookie)
          throws ChangelogException, DirectoryException
  {
    boolean continueSearch = true;
    while (continueSearch && cnIndexDBCursor.next())
@@ -1171,6 +1174,11 @@
      if (replicaUpdatesCursor.get() == null)
      {
        replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord));
        initializeCookieForChangeNumberMode(cookie, cnIndexRecord);
      }
      else
      {
        cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
      }
      continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
      if (continueSearch)
@@ -1178,7 +1186,7 @@
        final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get());
        if (updateMsg != null)
        {
          continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg);
          continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie);
          replicaUpdatesCursor.get().next();
        }
      }
@@ -1186,6 +1194,27 @@
    return continueSearch;
  }
  /** Initialize the provided cookie from the provided change number index record. */
  private void initializeCookieForChangeNumberMode(
      MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
  {
    ECLMultiDomainDBCursor eclCursor = null;
    try
    {
      cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
      MultiDomainDBCursor cursor =
          getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie,
              LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
      eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
      eclCursor.next();
      cookie.update(eclCursor.toCookie());
    }
    finally
    {
      close(eclCursor);
    }
  }
  private MultiDomainDBCursor initializeReplicaUpdatesCursor(
      final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
  {
@@ -1195,7 +1224,7 @@
    // No need for ECLMultiDomainDBCursor in this case
    // as updateMsg will be matched with cnIndexRecord
    final MultiDomainDBCursor replicaUpdatesCursor =
        getChangelogDB().getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY);
        getChangelogDB().getReplicationDomainDB().getCursorFrom(state, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
    replicaUpdatesCursor.next();
    return replicaUpdatesCursor;
  }
@@ -1735,16 +1764,12 @@
    /**
     * @return {@code true} if search should continue, {@code false} otherwise
     */
    private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg)
        throws DirectoryException
    private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg,
        MultiDomainServerState cookie) throws DirectoryException
    {
      final DN baseDN = cnIndexRecord.getBaseDN();
      final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
      cookie.update(baseDN, cnIndexRecord.getCSN());
      final String cookieString = cookie.toString();
      sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber());
      final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
      final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
      return sendEntryIfMatches(searchOp, entry, null);
    }