mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
01.51.2014 c11b3a5611e1a431e62c6cfa23b881a5fdbb62b9
opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -29,8 +29,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.LDIFWriter.*;
@@ -39,6 +38,7 @@
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -53,6 +53,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
@@ -62,6 +63,7 @@
import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
@@ -72,10 +74,11 @@
import org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.util.StaticUtils;
/**
 * A backend that provides access to the changelog, ie the "cn=changelog"
 * A backend that provides access to the changelog, i.e. the "cn=changelog"
 * suffix. It is a read-only backend that is created by a
 * {@code ReplicationServer} and is not configurable.
 * <p>
@@ -85,8 +88,8 @@
 * request. The cookie provided in the control is used to retrieve entries from
 * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
 * the entries.</li>
 * <li>Draft compat mode: when no "ECL Cookie Exchange Control" is provided with
 * the request. The entries are retrieved using the ChangeNumberIndexDB (or
 * <li>Draft compatibility mode: when no "ECL Cookie Exchange Control" is provided
 * with the request. The entries are retrieved using the ChangeNumberIndexDB (or
 * DraftDB, hence the name) and their attributes are set with the information
 * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set
 * from the content of ChangeNumberIndexDB.</li>
@@ -134,8 +137,20 @@
  private static final AttributeType MODIFIERS_NAME_TYPE =
      DirectoryConfig.getAttributeType(OP_ATTR_MODIFIERS_NAME_LC, true);
  /** The DN for the base changelog entry. */
  private DN baseChangelogDN;
  /** The base DN for the external change log. */
  public static final DN CHANGELOG_BASE_DN;
  static
  {
    try
    {
      CHANGELOG_BASE_DN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT);
    }
    catch (DirectoryException e)
    {
      throw new RuntimeException(e);
    }
  }
  /** The set of base DNs for this backend. */
  private DN[] baseDNs;
@@ -149,7 +164,7 @@
  private final ECLEnabledDomainPredicate domainPredicate;
  /**
   * Creates a new backend with the provided repication server.
   * Creates a new backend with the provided replication server.
   *
   * @param replicationServer
   *          The replication server on which the changes are read.
@@ -165,6 +180,23 @@
    setPrivateBackend(true);
  }
  private ChangelogDB getChangelogDB()
  {
    return replicationServer.getChangelogDB();
  }
  /**
   * Returns the ChangelogBackend configured for "cn=changelog" in this directory server.
   *
   * @return the ChangelogBackend configured for "cn=changelog" in this directory server
   * @deprecated instead inject the required object where needed
   */
  @Deprecated
  public static ChangelogBackend getInstance()
  {
    return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN);
  }
  /** {@inheritDoc} */
  @Override
  public void configureBackend(final Configuration config) throws ConfigException
@@ -176,29 +208,16 @@
  @Override
  public void initializeBackend() throws InitializationException
  {
    try
    {
      baseChangelogDN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT);
      baseDNs = new DN[] { baseChangelogDN };
    }
    catch (final DirectoryException e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      throw new InitializationException(
          ERR_BACKEND_CANNOT_DECODE_BACKEND_ROOT_DN.get(getBackendID(), getExceptionMessage(e)), e);
    }
    baseDNs = new DN[] { CHANGELOG_BASE_DN };
    try
    {
      DirectoryServer.registerBaseDN(baseChangelogDN, this, true);
      DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true);
    }
    catch (final DirectoryException e)
    {
      throw new InitializationException(
          ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(baseChangelogDN.toString(), getExceptionMessage(e)), e);
          ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(CHANGELOG_BASE_DN.toString(), getExceptionMessage(e)), e);
    }
  }
@@ -206,9 +225,11 @@
  @Override
  public void finalizeBackend()
  {
    super.finalizeBackend();
    try
    {
      DirectoryServer.deregisterBaseDN(baseChangelogDN);
      DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN);
    }
    catch (final DirectoryException e)
    {
@@ -299,7 +320,7 @@
    @Override
    public DN getBaseDN()
    {
      return baseChangelogDN;
      return CHANGELOG_BASE_DN;
    }
    @Override
@@ -313,6 +334,13 @@
    {
      return SearchScope.WHOLE_SUBTREE;
    }
    /** {@inheritDoc} */
    @Override
    public Object setAttachment(String name, Object value)
    {
      return null;
    }
  }
  /** {@inheritDoc} */
@@ -320,7 +348,7 @@
  public long numSubordinates(final DN entryDN, final boolean subtree) throws DirectoryException
  {
    // Compute the num subordinates only for the base DN
    if (entryDN == null || !baseChangelogDN.equals(entryDN))
    if (entryDN == null || !CHANGELOG_BASE_DN.equals(entryDN))
    {
      return -1;
    }
@@ -329,11 +357,9 @@
      return 1;
    }
    // Search with cookie mode to count all update messages
    final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
    excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT);
    SearchParams params = new SearchParams("0", excludedDomains);
    final SearchParams params = new SearchParams(getExcludedDomains());
    params.requestType = REQUEST_TYPE_FROM_COOKIE;
    params.multiDomainServerState = new MultiDomainServerState();
    params.cookie = new MultiDomainServerState();
    NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation();
    try
    {
@@ -342,11 +368,118 @@
    catch (ChangelogException e)
    {
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES.get(
          baseChangelogDN.toString(), stackTraceToSingleLineString(e)));
          CHANGELOG_BASE_DN.toString(), stackTraceToSingleLineString(e)));
    }
    return searchOp.numSubordinates;
  }
  private Set<String> getExcludedDomains()
  {
    final Set<String> domains = MultimasterReplication.getECLDisabledDomains();
    domains.add(DN_EXTERNAL_CHANGELOG_ROOT);
    return domains;
  }
  /**
   * Notifies persistent searches of this backend that a new entry was added to it.
   * <p>
   * Note: This method is called in a multi-threaded context.
   *
   * @param baseDN
   *          the baseDN of the newly added entry.
   * @param changeNumber
   *          the change number of the newly added entry. It will be greater
   *          than zero for entries added to the change number index and less
   *          than or equal to zero for entries added to any replica DB
   * @param cookieString
   *          a string representing the cookie of the newly added entry.
   *          This is only meaningful for entries added to the change number index
   * @param updateMsg
   *          the update message of the newly added entry
   * @throws ChangelogException
   *           If a problem occurs while notifying of the newly added entry.
   */
  public void notifyEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
      throws ChangelogException
  {
    final boolean isCookieEntry = changeNumber <= 0;
    final List<SearchOperation> pSearchOps = getPersistentSearches(isCookieEntry);
    if (pSearchOps.isEmpty() || !(updateMsg instanceof LDAPUpdateMsg))
    {
      return;
    }
    try
    {
      final Entry entry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
      for (SearchOperation pSearchOp : pSearchOps)
      {
        final EntrySender entrySender = (EntrySender)
            pSearchOp.getAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL);
        // when returning changesOnly, the first incoming update must return
        // the base entry before any other changes,
        // so force sending now, when protected by the synchronized block
        if (isCookieEntry)
        { // cookie based search
          final String cookieStr;
          synchronized (entrySender)
          { // forbid concurrent updates to the cookie
            entrySender.cookie.update(baseDN, updateMsg.getCSN());
            cookieStr = entrySender.cookie.toString();
            entrySender.sendBaseChangelogEntry(true);
          }
          final Entry entry2 = createEntryFromMsg(baseDN, changeNumber, cookieStr, updateMsg);
          // FIXME JNR use this instead of previous line:
          // entry.replaceAttribute(Attributes.create("changelogcookie", cookieStr));
          entrySender.sendEntryIfMatches(entry2, cookieStr);
        }
        else
        { // draft changeNumber search
          if (!entrySender.hasReturnedBaseEntry.get())
          {
            synchronized (entrySender)
            {
              entrySender.sendBaseChangelogEntry(true);
            }
          }
          entrySender.sendEntryIfMatches(entry, null);
        }
      }
    }
    catch (DirectoryException e)
    {
      throw new ChangelogException(e.getMessageObject(), e);
    }
  }
  private List<SearchOperation> getPersistentSearches(boolean wantCookieBasedSearch)
  {
    final List<SearchOperation> results = new ArrayList<SearchOperation>();
    for (PersistentSearch pSearch : getPersistentSearches())
    {
      final SearchOperation op = pSearch.getSearchOperation();
      if (wantCookieBasedSearch == isCookieBased(op))
      {
        results.add(op);
      }
    }
    return results;
  }
  private boolean isCookieBased(final SearchOperation searchOp)
  {
    for (Control c : searchOp.getRequestControls())
    {
      if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID()))
      {
        return true;
      }
    }
    return false;
  }
  /** {@inheritDoc} */
  @Override
  public void addEntry(Entry entry, AddOperation addOperation)
@@ -409,9 +542,7 @@
  private SearchParams buildSearchParameters(final SearchOperation searchOperation) throws DirectoryException
  {
    final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
    excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT);
    final SearchParams params = new SearchParams(searchOperation.toString(), excludedDomains);
    final SearchParams params = new SearchParams(getExcludedDomains());
    final ExternalChangelogRequestControl eclRequestControl =
        searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
    if (eclRequestControl == null)
@@ -421,7 +552,7 @@
    else
    {
      params.requestType = REQUEST_TYPE_FROM_COOKIE;
      params.multiDomainServerState = eclRequestControl.getCookie();
      params.cookie = eclRequestControl.getCookie();
    }
    return params;
  }
@@ -523,7 +654,7 @@
  {
    try
    {
      return numSubordinates(baseChangelogDN, true) + 1;
      return numSubordinates(CHANGELOG_BASE_DN, true) + 1;
    }
    catch (DirectoryException e)
    {
@@ -543,33 +674,28 @@
  static class SearchParams
  {
    private ECLRequestType requestType;
    private final String operationId;
    private final Set<String> excludedBaseDNs;
    private long lowestChangeNumber = -1;
    private long highestChangeNumber = -1;
    private CSN csn = new CSN(0, 0, 0);
    private MultiDomainServerState multiDomainServerState;
    private MultiDomainServerState cookie;
    /**
     * Creates search parameters.
     */
    SearchParams()
    {
      operationId = "";
      excludedBaseDNs = Collections.emptySet();
      this.excludedBaseDNs = Collections.emptySet();
    }
    /**
     * Creates search parameters with provided id and excluded domain DNs.
     *
     * @param operationId
     *          The id of the operation.
     * @param excludedBaseDNs
     *          Set of DNs to exclude from search.
     */
    SearchParams(final String operationId, final Set<String> excludedBaseDNs)
    SearchParams(final Set<String> excludedBaseDNs)
    {
      this.operationId = operationId;
      this.excludedBaseDNs = excludedBaseDNs;
    }
@@ -802,45 +928,40 @@
  private void searchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation)
      throws DirectoryException, ChangelogException
  {
    final ReplicationDomainDB replicationDomainDB = replicationServer.getChangelogDB().getReplicationDomainDB();
    validateProvidedCookie(searchParams);
    final boolean isPersistentSearch = isPersistentSearch(searchOperation);
    boolean hasReturnedBaseEntry = false;
    final EntrySender entrySender = new EntrySender(searchOperation, searchParams.cookie);
    if (isPersistentSearch)
    {
      searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, entrySender);
    }
    ECLMultiDomainDBCursor replicaUpdatesCursor = null;
    try
    {
      final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
      final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
          searchParams.multiDomainServerState, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
          searchParams.cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
      replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
      MultiDomainServerState cookie = searchParams.multiDomainServerState;
      boolean continueSearch = true;
      while (continueSearch && replicaUpdatesCursor.next())
      {
        // Handle creation of base changelog entry on first update message found
        if (!hasReturnedBaseEntry)
        {
          if (!returnBaseChangelogEntry(searchOperation, true))
          {
            return;
          }
          hasReturnedBaseEntry = true;
        }
        // Handle the update message
        final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
        final DN domainBaseDN = replicaUpdatesCursor.getData();
        cookie.update(domainBaseDN, updateMsg.getCSN());
        final Entry entry = createEntryFromMsg(domainBaseDN, 0L, cookie.toString(), updateMsg);
        if (matchBaseAndScopeAndFilter(entry, searchOperation))
        {
          Control control = new EntryChangelogNotificationControl(true, cookie.toString());
          continueSearch = searchOperation.returnEntry(entry, Arrays.asList(control));
        }
        searchParams.cookie.update(domainBaseDN, updateMsg.getCSN());
        final String cookieString = searchParams.cookie.toString();
        final Entry entry = createEntryFromMsg(domainBaseDN, 0, cookieString, updateMsg);
        continueSearch = entrySender.sendEntryIfMatches(entry, cookieString);
      }
      // Handle creation of base changelog entry when no update message is found
      if (!hasReturnedBaseEntry)
      if (!isPersistentSearch)
      {
        returnBaseChangelogEntry(searchOperation, false);
        // send the base changelog entry if no update message is found
        entrySender.sendBaseChangelogEntry(false);
      }
    }
    finally
@@ -849,6 +970,52 @@
    }
  }
  private boolean isPersistentSearch(SearchOperation op)
  {
    for (PersistentSearch pSearch : getPersistentSearches())
    {
      if (op == pSearch.getSearchOperation())
      {
        return true;
      }
    }
    return false;
  }
  /** {@inheritDoc} */
  @Override
  public void registerPersistentSearch(PersistentSearch pSearch)
  {
    super.registerPersistentSearch(pSearch);
    final SearchOperation searchOp = pSearch.getSearchOperation();
    if (pSearch.isChangesOnly())
    {
      // this persistent search will not go through #search0() down below
      // so we must initialize the cookie here
      searchOp.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL,
          new EntrySender(searchOp, getNewestCookie(searchOp)));
    }
  }
  private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
  {
    if (!isCookieBased(searchOp))
    {
      return null;
    }
    final MultiDomainServerState cookie = new MultiDomainServerState();
    for (final Iterator<ReplicationServerDomain> it =
        replicationServer.getDomainIterator(); it.hasNext();)
    {
      final DN baseDN = it.next().getBaseDN();
      final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN);
      cookie.update(baseDN, state);
    }
    return cookie;
  }
  /**
   * Validates the cookie contained in search parameters by checking its content
   * with the actual replication server state.
@@ -858,7 +1025,7 @@
   */
  private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException
  {
    final MultiDomainServerState state = searchParams.multiDomainServerState;
    final MultiDomainServerState state = searchParams.cookie;
    if (state != null && !state.isEmpty())
    {
      replicationServer.validateServerState(state, searchParams.getExcludedBaseDNs());
@@ -871,102 +1038,67 @@
  private void searchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation)
      throws ChangelogException, DirectoryException
  {
    boolean hasReturnedBaseEntry = false;
    final ChangelogDB changelogDB = replicationServer.getChangelogDB();
    final EntrySender entrySender = new EntrySender(searchOperation, null);
    final boolean isPersistentSearch = isPersistentSearch(searchOperation);
    if (isPersistentSearch)
    {
      searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, entrySender);
    }
    DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
    MultiDomainDBCursor replicaUpdatesCursor = null;
    try {
      cnIndexDBCursor = getCNIndexDBCursor(changelogDB, params.lowestChangeNumber);
    try
    {
      cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber);
      boolean continueSearch = true;
      while (continueSearch && cnIndexDBCursor.next())
      {
        // Handle creation of base changelog entry on cnIndex record found
        if (!hasReturnedBaseEntry)
        {
          if (!returnBaseChangelogEntry(searchOperation, true))
          {
            return;
          }
          hasReturnedBaseEntry = true;
        }
        // Handle the current cnIndex record
        final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
        if (replicaUpdatesCursor == null)
        {
          replicaUpdatesCursor = initializeReplicaUpdatesCursor(changelogDB, cnIndexRecord);
          replicaUpdatesCursor = initializeReplicaUpdatesCursor(cnIndexRecord);
        }
        continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
        if (continueSearch)
        {
           UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
           if (updateMsg != null)
           {
             continueSearch = returnEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg);
             replicaUpdatesCursor.next();
           }
          UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
          if (updateMsg != null)
          {
            continueSearch = sendEntryForUpdateMessage(entrySender, cnIndexRecord, updateMsg);
            replicaUpdatesCursor.next();
          }
        }
      }
      // Handle creation of base changelog entry when no update message is found
      if (!hasReturnedBaseEntry)
      if (!isPersistentSearch)
      {
        returnBaseChangelogEntry(searchOperation, false);
        // send the base changelog entry if no update message is found
        entrySender.sendBaseChangelogEntry(false);
      }
    }
    finally {
    finally
    {
      StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor);
    }
  }
  /**
   * Create and returns the base changelog entry to provided search operation.
   *
   * @return {@code true} if search should continue, {@code false} otherwise
   */
  private boolean returnBaseChangelogEntry(final SearchOperation searchOperation, boolean hasSubordinates)
      throws DirectoryException
  private boolean sendEntryForUpdateMessage(EntrySender entrySender,
      ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) throws DirectoryException
  {
    final DN baseDN = searchOperation.getBaseDN();
    final SearchFilter filter = searchOperation.getFilter();
    final SearchScope scope = searchOperation.getScope();
    if (baseChangelogDN.matchesBaseAndScope(baseDN, scope))
    {
      final Entry entry = buildBaseChangelogEntry(hasSubordinates);
      if (filter.matchesEntry(entry) && !searchOperation.returnEntry(entry, null))
      {
        // Abandon, size limit reached.
        return false;
      }
    }
    if (baseDN.equals(baseChangelogDN) && scope.equals(SearchScope.BASE_OBJECT))
    {
      // Only the change log root entry was requested
      return false;
    }
    return true;
  }
  /**
   * @return {@code true} if search should continue, {@code false} otherwise
   */
  private boolean returnEntryForUpdateMessage(
      final SearchOperation searchOperation,
      final ChangeNumberIndexRecord cnIndexRecord,
      final UpdateMsg updateMsg)
          throws DirectoryException
  {
    final DN baseDN = cnIndexRecord.getBaseDN();
    final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
    final DN changeDN = cnIndexRecord.getBaseDN();
    cookie.update(changeDN, cnIndexRecord.getCSN());
    final Entry entry = createEntryFromMsg(changeDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
    if (matchBaseAndScopeAndFilter(entry, searchOperation))
    {
      return searchOperation.returnEntry(entry, null);
    }
    return true;
    cookie.update(baseDN, cnIndexRecord.getCSN());
    final String cookieString = cookie.toString();
    final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
    return entrySender.sendEntryIfMatches(entry, null);
  }
  private MultiDomainDBCursor initializeReplicaUpdatesCursor(final ChangelogDB changelogDB,
  private MultiDomainDBCursor initializeReplicaUpdatesCursor(
      final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
  {
    final MultiDomainServerState state = new MultiDomainServerState();
@@ -975,7 +1107,7 @@
    // No need for ECLMultiDomainDBCursor in this case
    // as updateMsg will be matched with cnIndexRecord
    final MultiDomainDBCursor replicaUpdatesCursor =
        changelogDB.getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY);
        getChangelogDB().getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY);
    replicaUpdatesCursor.next();
    return replicaUpdatesCursor;
  }
@@ -1023,10 +1155,10 @@
  }
  /** Returns a cursor on CNIndexDB for the provided first change number. */
  private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(final ChangelogDB changelogDB,
  private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(
      final long firstChangeNumber) throws ChangelogException
  {
    final ChangeNumberIndexDB cnIndexDB = changelogDB.getChangeNumberIndexDB();
    final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB();
    long changeNumberToUse = firstChangeNumber;
    if (changeNumberToUse <= 1)
    {
@@ -1036,31 +1168,6 @@
    return cnIndexDB.getCursorFrom(changeNumberToUse);
  }
  /** Indicates if the provided entry matches the filter, base and scope. */
  private boolean matchBaseAndScopeAndFilter(Entry entry, SearchOperation searchOp) throws DirectoryException
  {
    return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
        && searchOp.getFilter().matchesEntry(entry);
  }
  /**
   * Retrieves the base changelog entry.
   */
  private Entry buildBaseChangelogEntry(boolean hasSubordinates)
  {
    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType,List<Attribute>>();
    final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType,List<Attribute>>();
    addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
    addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
        ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
    addAttributeByUppercaseName("hassubordinates", "hasSubordinates", Boolean.toString(hasSubordinates),
        userAttrs, operationalAttrs);
    addAttributeByUppercaseName("entrydn", "entryDN", baseChangelogDN.toString(),
        userAttrs, operationalAttrs);
    return new Entry(baseChangelogDN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
  }
  /**
   * Creates a changelog entry.
   */
@@ -1225,16 +1332,16 @@
  {
    final CSN csn = msg.getCSN();
    String dnString;
    if (changeNumber == 0)
    {
      // Cookie mode
      dnString = "replicationCSN=" + csn + "," + baseDN.toString() + "," + DN_EXTERNAL_CHANGELOG_ROOT;
    }
    else
    if (changeNumber > 0)
    {
      // Draft compat mode
      dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
    }
    else
    {
      // Cookie mode
      dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT;
    }
    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
    final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
@@ -1247,7 +1354,7 @@
    addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs);
    // REQUIRED attributes
    if (changeNumber != 0)
    if (changeNumber > 0)
    {
      addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs);
    }
@@ -1277,7 +1384,8 @@
    {
      addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs);
    }
    addAttributeByType("changelogcookie", "changeLogCookie", cookie, userAttrs, opAttrs);
    final String cookie2 = cookie != null ? cookie : "";
    addAttributeByType("changelogcookie", "changeLogCookie", cookie2, userAttrs, opAttrs);
    final List<RawAttribute> includedAttributes = msg.getEclIncludes();
    if (includedAttributes != null && !includedAttributes.isEmpty())
@@ -1300,6 +1408,116 @@
    return new Entry(DN.decode(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs);
  }
  /**
   * Used to send entries to searches on cn=changelog. This class ensures the
   * base changelog entry is sent before sending any other entry. It is also
   * used as a store when going from the "initial search" phase to the
   * "persistent search" phase.
   */
  private static class EntrySender
  {
    private final SearchOperation searchOp;
    /**
     * Used by the cookie-based searches to communicate the cookie between the
     * initial search phase and the persistent search phase. This is unused with
     * draft change number searches.
     */
    private final MultiDomainServerState cookie;
    private final AtomicBoolean hasReturnedBaseEntry = new AtomicBoolean();
    public EntrySender(SearchOperation searchOp, MultiDomainServerState cookie)
    {
      this.searchOp = searchOp;
      this.cookie = cookie;
    }
    /**
     * Sends the entry if it matches the base, scope and filter of the current search operation.
     * It will also send the base changelog entry if it needs to be sent and was not sent before.
     *
     * @return {@code true} if search should continue, {@code false} otherwise
     */
    private boolean sendEntryIfMatches(Entry entry, String cookie) throws DirectoryException
    {
      // About to send one entry: ensure the base changelog entry is sent first
      if (!sendBaseChangelogEntry(true))
      {
        // only return the base entry: stop here
        return false;
      }
      if (matchBaseAndScopeAndFilter(entry))
      {
        return searchOp.returnEntry(entry, getControls(cookie));
      }
      // maybe the next entry will match?
      return true;
    }
    /** Indicates if the provided entry matches the filter, base and scope. */
    private boolean matchBaseAndScopeAndFilter(Entry entry) throws DirectoryException
    {
      return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
          && searchOp.getFilter().matchesEntry(entry);
    }
    private List<Control> getControls(String cookie)
    {
      if (cookie != null)
      {
        Control c = new EntryChangelogNotificationControl(true, cookie);
        return Arrays.asList(c);
      }
      return Collections.emptyList();
    }
    /**
     * Create and returns the base changelog entry to the underlying search operation.
     *
     * @return {@code true} if search should continue, {@code false} otherwise
     */
    private boolean sendBaseChangelogEntry(boolean hasSubordinates) throws DirectoryException
    {
      if (hasReturnedBaseEntry.compareAndSet(false, true))
      {
        final DN baseDN = searchOp.getBaseDN();
        final SearchFilter filter = searchOp.getFilter();
        final SearchScope scope = searchOp.getScope();
        if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope))
        {
          final Entry entry = buildBaseChangelogEntry(hasSubordinates);
          if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null))
          {
            // Abandon, size limit reached.
            return false;
          }
        }
        return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN)
            || !scope.equals(SearchScope.BASE_OBJECT);
      }
      return true;
    }
    private Entry buildBaseChangelogEntry(boolean hasSubordinates)
    {
      final Map<AttributeType, List<Attribute>> userAttrs =
          new LinkedHashMap<AttributeType, List<Attribute>>();
      final Map<AttributeType, List<Attribute>> operationalAttrs =
          new LinkedHashMap<AttributeType, List<Attribute>>();
      addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME,
          ChangelogBackend.BACKEND_ID, userAttrs, operationalAttrs);
      addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
          ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
      addAttributeByUppercaseName("hassubordinates", "hasSubordinates",
          Boolean.toString(hasSubordinates), userAttrs, operationalAttrs);
      addAttributeByUppercaseName("entrydn", "entryDN",
          ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
      return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
    }
  }
  private static void addAttribute(final Entry e, final String attrType, final String attrValue)
  {
    e.addAttribute(Attributes.create(attrType, attrValue), null);
@@ -1313,7 +1531,7 @@
    addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true);
  }
  private void addAttributeByUppercaseName(String attrNameLowercase,
  private static void addAttributeByUppercaseName(String attrNameLowercase,
      String attrNameUppercase,  String attrValue,
      Map<AttributeType, List<Attribute>> userAttrs,
      Map<AttributeType, List<Attribute>> operationalAttrs)
@@ -1331,8 +1549,9 @@
    {
      attrType = DirectoryServer.getDefaultAttributeType(attrNameUppercase);
    }
    final Attribute a = addByType ?
        Attributes.create(attrType, attrValue) : Attributes.create(attrNameUppercase, attrValue);
    final Attribute a = addByType
        ? Attributes.create(attrType, attrValue)
        : Attributes.create(attrNameUppercase, attrValue);
    final List<Attribute> attrList = Collections.singletonList(a);
    if (attrType.isOperational())
    {