mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.01.2013 891159050af4aa3fe47c67e3ba7d3f21299027a4
OPENDJ-1174 (CR-2631) Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB


Hooked the ChangeNumberIndexer thread into our code base.
Managed thread lifecycle.
Added configuration to turn it on / off.
Removed code that builds the ChangeNumberIndexDB from ECLServerHandler.


ChangeNumberIndexer.java:
Added clear() + doClear field for unit tests
Used thread safe versions of MultiDomainServerState.
In run(), merged all the try blocks to factorize the catch blocks + supported doClear + fixed a bug with the mediumConsistencyRUV.
In removeCursor(), closed the cursors.
Added removeAllCursors(), getPrecedingCSN().
Renamed crossDomainDBCursor to nextChangeForInsertDBCursor and newCompositeDBCursor() to resetNextChangeForInsertDBCursor().
Changed newCursors from ConcurrentMap<CSN, DN> to ConcurrentMap<Pair<DN, Integer>, CSN> to ensure minimum memory consumption.

JEChangelogDB.java:
Removed dbDirectoryName field.
Added config, cnIndexer fields.
Changed ctor to accept ReplicationServerCfg.
In initializeDB(), shutdownDB(), clearDB(), publishUpdateMsg() took appropriate actions with the cnIndexer thread.
Added setComputeChangeNumber().
In getCursorFrom(), accept null parameter.
In setPurgeDelay(), also call it on the cnIndexDB.


MultiDomainServerState.java:
Made implementation to be fully thread safe.
Changed list from Map to ConcurrentMap + removed synchrnoized blocks.
Renamed update(DN, ServerState) to replace(DN, ServerState).
Added new version of update(DN, ServerState).

ECLServerHandler.java:
Removed assignNewChangeNumberAndStore() that builds the ChangeNumberIndexDB + adapted the code that was using this method.
Used MultiDomainServerState.replace().


ReplicationServerConfiguration.xml, ReplicationServerCfgDefn.properties:
Added ds-cfg-compute-changenumber.

ExternalChangelogDomainConfiguration.xml, ExternalChangelogDomainCfgDefn.properties:
Updated description

ReplicationServer.java:
In applyConfigurationChange(), handled computeChangenumber config.
Used MultiDomainServerState.replace().

ChangelogDB.java:
Added setComputeChangeNumber().

ReplServerFakeConfiguration.java:
Added isComputeChangenumber() and setComputeChangenumber().


ExternalChangeLogTest.java:
Adapted the code to the use of the ChangeNumberIndexer thread.

JEChangeNumberIndexDB.java:
Extracted runPurge() from run().


ReplicationDbEnv.java
Code cleanup.
15 files modified
573 ■■■■ changed files
opends/src/admin/defn/org/opends/server/admin/std/ExternalChangelogDomainConfiguration.xml 8 ●●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml 27 ●●●●● patch | view | raw | blame | history
opends/src/admin/messages/ExternalChangelogDomainCfgDefn.properties 2 ●●● patch | view | raw | blame | history
opends/src/admin/messages/ReplicationServerCfgDefn.properties 2 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java 51 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 39 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 34 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 200 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java 32 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 84 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 12 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 29 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java 35 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 5 ●●●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/ExternalChangelogDomainConfiguration.xml
@@ -24,7 +24,7 @@
  !
  !
  !      Copyright 2009 Sun Microsystems, Inc.
  !      Portions copyright 2011 ForgeRock AS
  !      Portions copyright 2011-2013 ForgeRock AS
  ! -->
<adm:managed-object name="external-changelog-domain"
  plural-name="external-changelog-domains"
@@ -45,9 +45,9 @@
  </adm:profile>
  <adm:property name="enabled" mandatory="true">
    <adm:synopsis>
      Indicates whether the
      <adm:user-friendly-name />
      is enabled.
      Indicates whether the <adm:user-friendly-name /> is enabled.
      To enable computing the change numbers, set the Replication Server's
      "ds-cfg-compute-changenumber" property to true.
    </adm:synopsis>
    <adm:syntax>
      <adm:boolean />
opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -328,4 +328,31 @@
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="compute-changenumber" mandatory="false">
    <adm:synopsis>
      Whether the replication server will compute changenumbers.
    </adm:synopsis>
    <adm:description>
      This boolean tells the replication server to compute changenumbers for
      each replicated change by maintaining a changenumber index database.
      Changenumbers are computed according to
      http://tools.ietf.org/html/draft-good-ldap-changelog-04.
      Note this functionality has an impact on CPU, disk accesses and storage.
      If changenumbers are not required, it is advisable to set this value to
      false.
    </adm:description>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>true</adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:boolean />
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-compute-changenumber</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
</adm:managed-object>
opends/src/admin/messages/ExternalChangelogDomainCfgDefn.properties
@@ -5,4 +5,4 @@
property.ecl-include.description=The list of attributes may include wild cards such as "*" and "+" as well as object class references prefixed with an ampersand, for example "@person". The included attributes will be published using the "includedAttributes" operational attribute as a single LDIF value rather like the "changes" attribute. For modify and modifyDN operations the included attributes will be taken from the entry before any changes were applied.
property.ecl-include-for-deletes.synopsis=Specifies a list of attributes which should be published with every delete operation change log entry, in addition to those specified by the "ecl-include" property.
property.ecl-include-for-deletes.description=This property provides a means for applications to archive entries after they have been deleted. See the description of the "ecl-include" property for further information about how the included attributes are published.
property.enabled.synopsis=Indicates whether the External Changelog Domain is enabled.
property.enabled.synopsis=Indicates whether the External Changelog Domain is enabled. To enable computing the change numbers, set the Replication Server's "ds-cfg-compute-changenumber" property to true.
opends/src/admin/messages/ReplicationServerCfgDefn.properties
@@ -3,6 +3,8 @@
synopsis=Replication Servers publish updates to Directory Servers within a Replication Domain.
property.assured-timeout.synopsis=The timeout value when waiting for assured mode acknowledgments.
property.assured-timeout.description=Defines the number of milliseconds that the replication server will wait for assured acknowledgments (in either Safe Data or Safe Read assured sub modes) before forgetting them and answer to the entity that sent an update and is waiting for acknowledgment.
property.compute-changenumber.synopsis=Whether the replication server will compute changenumbers.
property.compute-changenumber.description=This boolean tells the replication server to compute changenumbers for each replicated change by maintaining a changenumber index database. Changenumbers are computed according to http://tools.ietf.org/html/draft-good-ldap-changelog-04. Note this functionality has an impact on CPU, disk accesses and storage. If changenumbers are not required, it is advisable to set this value to false.
property.degraded-status-threshold.synopsis=The number of pending changes as threshold value for putting a directory server in degraded status.
property.degraded-status-threshold.description=This value represents a number of pending changes a replication server has in queue for sending to a directory server. Once this value is crossed, the matching directory server goes in degraded status. When number of pending changes goes back under this value, the directory server is put back in normal status. 0 means status analyzer is disabled and directory servers are never put in degraded status.
property.group-id.synopsis=The group id for the replication server.
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -54,14 +56,14 @@
  /**
   * The list of (domain service id, ServerState).
   */
  private Map<DN, ServerState> list;
  private final ConcurrentMap<DN, ServerState> list;
  /**
   * Creates a new empty object.
   */
  public MultiDomainServerState()
  {
    list = new TreeMap<DN, ServerState>();
    list = new ConcurrentSkipListMap<DN, ServerState>();
  }
  /**
@@ -71,10 +73,10 @@
   */
  public MultiDomainServerState(String mdss) throws DirectoryException
  {
    list = splitGenStateToServerStates(mdss);
    list = new ConcurrentSkipListMap<DN, ServerState>(
        splitGenStateToServerStates(mdss));
  }
  /**
   * Empty the object..
   * After this call the object will be in the same state as if it
@@ -82,11 +84,8 @@
   */
  public void clear()
  {
    synchronized (this)
    {
      list.clear();
    }
  }
  /**
   * Update the ServerState of the provided baseDN with the replication
@@ -102,22 +101,22 @@
    if (csn == null)
      return false;
    synchronized(this)
    ServerState serverState = list.get(baseDN);
    if (serverState == null)
    {
      ServerState oldServerState = list.get(baseDN);
      if (oldServerState == null)
      serverState = new ServerState();
      final ServerState existingSS = list.putIfAbsent(baseDN, serverState);
      if (existingSS != null)
      {
        oldServerState = new ServerState();
        list.put(baseDN, oldServerState);
        serverState = existingSS;
      }
      return oldServerState.update(csn);
    }
    return serverState.update(csn);
  }
  /**
   * Update the ServerState of the provided baseDN with the provided server
   * state. The provided server state will be owned by this instance, so care
   * must be taken by calling code to duplicate it if needed.
   * state.
   *
   * @param baseDN
   *          The provided baseDN.
@@ -126,6 +125,28 @@
   */
  public void update(DN baseDN, ServerState serverState)
  {
    for (CSN csn : serverState)
    {
      update(baseDN, csn);
    }
  }
  /**
   * Replace the ServerState of the provided baseDN with the provided server
   * state. The provided server state will be owned by this instance, so care
   * must be taken by calling code to duplicate it if needed.
   *
   * @param baseDN
   *          The provided baseDN.
   * @param serverState
   *          The provided serverState.
   */
  public void replace(DN baseDN, ServerState serverState)
  {
    if (serverState == null)
    {
      throw new IllegalArgumentException("ServerState must not be null");
    }
    list.put(baseDN, serverState);
  }
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -841,7 +841,7 @@
      domain.registerHandler(mh);
      newDomainCtxt.mh = mh;
      previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(),
      previousCookie.replace(newDomainCtxt.rsDomain.getBaseDN(),
                            newDomainCtxt.startState.duplicate());
      results.add(newDomainCtxt);
@@ -1260,13 +1260,9 @@
        final DomainContext oldestContext = findDomainCtxtWithOldestChange();
        if (oldestContext != null)
        {
          final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
          oldestContext.currentState.update(change.getUpdateMsg().getCSN());
          if (draftCompat)
          {
            assignNewChangeNumberAndStore(change);
          }
          oldestChange = change;
          oldestChange = newECLUpdateMsg(oldestContext);
          oldestContext.currentState.update(
              oldestChange.getUpdateMsg().getCSN());
        }
      }
    }
@@ -1326,7 +1322,7 @@
   *           if a database problem occurs.
   */
  private boolean assignChangeNumber(final ECLUpdateMsg replicaDBChange)
      throws ChangelogException
      throws ChangelogException, DirectoryException
  {
    // We also need to check if the CNIndexDB is consistent with the replicaDBs.
    // If not, 2 potential reasons:
@@ -1337,15 +1333,8 @@
    CSN csnFromReplicaDB = replicaDBChange.getUpdateMsg().getCSN();
    DN baseDNFromReplicaDB = replicaDBChange.getBaseDN();
    while (true)
    while (!isEndOfCNIndexDBReached)
    {
      if (isEndOfCNIndexDBReached)
      {
        // we are at the end of the CNIndexDB in the append mode
        assignNewChangeNumberAndStore(replicaDBChange);
        return true;
      }
      final ChangeNumberIndexRecord currentRecord = cnIndexDBCursor.getRecord();
      final CSN csnFromCNIndexDB = currentRecord.getCSN();
      final DN baseDNFromCNIndexDB = currentRecord.getBaseDN();
@@ -1366,6 +1355,9 @@
              + currentRecord.getChangeNumber() + " to change="
              + replicaDBChange);
        previousCookie =
            new MultiDomainServerState(currentRecord.getPreviousCookie());
        replicaDBChange.setCookie(previousCookie);
        replicaDBChange.setChangeNumber(currentRecord.getChangeNumber());
        return true;
      }
@@ -1411,6 +1403,7 @@
        // continuously throws ChangelogExceptions
      }
    }
    return false;
  }
  private Date asDate(CSN csn)
@@ -1425,18 +1418,6 @@
    return sameDN && sameCSN;
  }
  private void assignNewChangeNumberAndStore(ECLUpdateMsg change)
      throws ChangelogException
  {
    final ChangeNumberIndexRecord record =
        new ChangeNumberIndexRecord(previousCookie.toString(),
            change.getBaseDN(), change.getUpdateMsg().getCSN());
    // store in CNIndexDB the pair
    // (change number of the current change, state before this change)
    change.setChangeNumber(
        replicationServer.getChangeNumberIndexDB().addRecord(record));
  }
  /**
   * Terminates the first (non persistent) phase of the search on the ECL.
   */
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -144,9 +144,7 @@
    throws ConfigException
  {
    this.config = configuration;
    this.changelogDB =
        new JEChangelogDB(this, configuration.getReplicationDBDirectory());
    this.changelogDB = new JEChangelogDB(this, configuration);
    replSessionSecurity = new ReplSessionSecurity();
    initialize();
@@ -764,6 +762,9 @@
  public ConfigChangeResult applyConfigurationChange(
      ReplicationServerCfg configuration)
  {
    ResultCode resultCode = ResultCode.SUCCESS;
    boolean adminActionRequired = false;
    // Some of those properties change don't need specific code.
    // They will be applied for next connections. Some others have immediate
    // effect
@@ -779,6 +780,20 @@
    {
      this.changelogDB.setPurgeDelay(getTrimAge());
    }
    final boolean computeCN = config.isComputeChangenumber();
    if (computeCN != oldConfig.isComputeChangenumber())
    {
      try
      {
        this.changelogDB.setComputeChangeNumber(computeCN);
      }
      catch (ChangelogException e)
      {
        if (debugEnabled())
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        resultCode = ResultCode.OPERATIONS_ERROR;
      }
    }
    // changing the listen port requires to stop the listen thread
    // and restart it.
@@ -800,10 +815,14 @@
      }
      catch (IOException e)
      {
        if (debugEnabled())
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        logError(ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString()));
      }
      catch (InterruptedException e)
      {
        if (debugEnabled())
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        logError(ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString()));
      }
    }
@@ -849,10 +868,9 @@
    final String newDir = config.getReplicationDBDirectory();
    if (newDir != null && !newDir.equals(oldConfig.getReplicationDBDirectory()))
    {
      return new ConfigChangeResult(ResultCode.SUCCESS, true);
      adminActionRequired = true;
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
    return new ConfigChangeResult(resultCode, adminActionRequired);
  }
  /**
@@ -1505,7 +1523,7 @@
  public MultiDomainServerState getNewestECLCookie(Set<String> excludedBaseDNs)
  {
    // Initialize start state for all running domains with empty state
    MultiDomainServerState result = new MultiDomainServerState();
    final MultiDomainServerState result = new MultiDomainServerState();
    for (ReplicationServerDomain rsDomain : getReplicationServerDomains())
    {
      if (contains(excludedBaseDNs, rsDomain.getBaseDN().toNormalizedString()))
@@ -1513,7 +1531,7 @@
      final ServerState latestDBServerState = rsDomain.getLatestServerState();
      if (latestDBServerState.isEmpty())
        continue;
      result.update(rsDomain.getBaseDN(), latestDBServerState);
      result.replace(rsDomain.getBaseDN(), latestDBServerState);
    }
    return result;
  }
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -57,6 +57,19 @@
  void setPurgeDelay(long delayInMillis);
  /**
   * Sets whether the replication database must compute change numbers for
   * replicated changes. Change numbers are computed using a separate new
   * thread.
   *
   * @param computeChangeNumber
   *          whether to compute change numbers for replicated changes
   * @throws ChangelogException
   *           If a database problem happened
   */
  void setComputeChangeNumber(boolean computeChangeNumber)
      throws ChangelogException;
  /**
   * Shutdown the replication database.
   *
   * @throws ChangelogException
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -30,6 +30,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
@@ -43,6 +44,9 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -57,6 +61,11 @@
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
   * If this is true, then the {@link #run()} method must clear its state.
   * Otherwise the run method executes normally.
   */
  private final AtomicBoolean doClear = new AtomicBoolean();
  private final ChangelogDB changelogDB;
  /** Only used for initialization, and then discarded. */
  private ChangelogState changelogState;
@@ -101,12 +110,12 @@
      new MultiDomainServerState();
  /**
   * Composite cursor across all the replicaDBs for all the replication domains.
   * It is volatile to ensure it supports concurrent update. Each time it is
   * used more than once in a method, the method must take a local copy to
   * ensure the cursor does not get updated in the middle of the method.
   * Cursor across all the replicaDBs for all the replication domains. It is
   * positioned on the next change that needs to be inserted in the CNIndexDB.
   * <p>
   * Note: it is only accessed from the {@link #run()} method.
   */
  private volatile CompositeDBCursor<DN> crossDomainDBCursor;
  private CompositeDBCursor<DN> nextChangeForInsertDBCursor;
  /**
   * New cursors for this Map must be created from the {@link #run()} method,
@@ -116,9 +125,27 @@
   */
  private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
      new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
  /** This map can be updated by multiple threads. */
  private ConcurrentMap<CSN, DN> newCursors =
      new ConcurrentSkipListMap<CSN, DN>();
  /**
   * Holds the newCursors that will have to be created in the next iteration
   * inside the {@link #run()} method.
   * <p>
   * This map can be updated by multiple threads.
   */
  private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
      new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
          new Comparator<Pair<DN, Integer>>()
          {
            @Override
            public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
            {
              final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
              if (compareBaseDN == 0)
              {
                return o1.getSecond().compareTo(o2.getSecond());
              }
              return compareBaseDN;
            }
          });
  /**
   * Builds a ChangeNumberIndexer object.
@@ -164,7 +191,8 @@
  {
    final CSN csn = updateMsg.getCSN();
    lastSeenUpdates.update(baseDN, csn);
    newCursors.put(csn, baseDN);
    // only keep the oldest CSN that will be the new cursor's starting point
    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
    tryNotify(baseDN);
  }
@@ -210,17 +238,23 @@
    return true;
  }
  /**
   * Restores in memory data needed to build the CNIndexDB, including the medium
   * consistency point.
   */
  private void initialize() throws ChangelogException, DirectoryException
  {
    final ChangeNumberIndexRecord newestRecord =
        changelogDB.getChangeNumberIndexDB().getNewestRecord();
    if (newestRecord != null)
    {
      // restore the mediumConsistencyRUV from DB
      mediumConsistencyRUV.update(
          new MultiDomainServerState(newestRecord.getPreviousCookie()));
    }
    // initialize the cross domain DB cursor
    // initialize the DB cursor and the last seen updates
    // to ensure the medium consistency CSN can move forward
    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
    for (Entry<DN, List<Integer>> entry
        : changelogState.getDomainToServerIds().entrySet())
@@ -235,12 +269,12 @@
      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
      lastSeenUpdates.update(baseDN, latestKnownState);
    }
    resetNextChangeForInsertDBCursor();
    crossDomainDBCursor = newCompositeDBCursor();
    if (newestRecord != null)
    {
      // restore the "previousCookie" state before shutdown
      final UpdateMsg record = crossDomainDBCursor.getRecord();
      final UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
      if (!record.getCSN().equals(newestRecord.getCSN()))
      {
        // TODO JNR i18n safety check, should never happen
@@ -248,14 +282,14 @@
            + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()));
      }
      mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
      crossDomainDBCursor.next();
      nextChangeForInsertDBCursor.next();
    }
    // this will not be used any more. Discard for garbage collection.
    this.changelogState = null;
  }
  private CompositeDBCursor<DN> newCompositeDBCursor() throws ChangelogException
  private void resetNextChangeForInsertDBCursor() throws ChangelogException
  {
    final Map<DBCursor<UpdateMsg>, DN> cursors =
        new HashMap<DBCursor<UpdateMsg>, DN>();
@@ -270,7 +304,7 @@
    }
    final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors);
    result.next();
    return result;
    nextChangeForInsertDBCursor = result;
  }
  private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn)
@@ -286,13 +320,27 @@
    if (cursor == null)
    {
      final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
      cursor = domainDB.getCursorFrom(baseDN, serverId, csn);
      // use an older CSN because getCursorFrom() starts after the given CSN
      final CSN anOlderCSN = getPrecedingCSN(csn);
      cursor = domainDB.getCursorFrom(baseDN, serverId, anOlderCSN);
      map.put(serverId, cursor);
      return false;
    }
    return true;
  }
  /**
   * Returns the immediately preceding CSN.
   */
  private CSN getPrecedingCSN(CSN csn)
  {
    if (csn.getSeqnum() > 0)
    {
      return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
    }
    return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
  }
  /** {@inheritDoc} */
  @Override
  public void run()
@@ -305,29 +353,24 @@
       * used.
       */
      initialize();
    }
    catch (DirectoryException e)
    {
      // TODO JNR error message i18n
      if (debugEnabled())
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      return;
    }
    catch (ChangelogException e)
    {
      // TODO JNR error message i18n
      if (debugEnabled())
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      return;
    }
    while (!isShutdownInitiated())
    {
      try
      {
          if (doClear.get())
          {
            removeAllCursors();
            // No need to use CAS here because it is only for unit tests and at
            // this point all will have been cleaned up anyway.
            doClear.set(false);
          }
          else
          {
        createNewCursors();
          }
        final UpdateMsg msg = crossDomainDBCursor.getRecord();
          final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
        if (msg == null)
        {
          synchronized (this)
@@ -335,13 +378,14 @@
            wait();
          }
          // advance cursor, success/failure will be checked later
          crossDomainDBCursor.next();
          // loop to check whether new changes have been added to the ReplicaDBs
            nextChangeForInsertDBCursor.next();
            // loop to check whether new changes have been added to the
            // ReplicaDBs
          continue;
        }
        final CSN csn = msg.getCSN();
        final DN baseDN = crossDomainDBCursor.getData();
          final DN baseDN = nextChangeForInsertDBCursor.getData();
        // FIXME problem: what if the serverId is not part of the ServerState?
        // right now, thread will be blocked
        if (!canMoveForwardMediumConsistencyPoint(baseDN))
@@ -361,8 +405,16 @@
          }
        }
        // OK, the oldest change is older than the medium consistency point
        // let's publish it to the CNIndexDB
          // let's publish it to the CNIndexDB.
          // Next if statement is ugly but ensures the first change will not be
          // immediately trimmed from the CNIndexDB. Yuck!
          if (mediumConsistencyRUV.isEmpty())
          {
            mediumConsistencyRUV.replace(baseDN, new ServerState());
          }
        final String previousCookie = mediumConsistencyRUV.toString();
        final ChangeNumberIndexRecord record =
            new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
@@ -370,7 +422,15 @@
        moveForwardMediumConsistencyPoint(csn, baseDN);
        // advance cursor, success/failure will be checked later
        crossDomainDBCursor.next();
          nextChangeForInsertDBCursor.next();
        }
        catch (InterruptedException ignored)
        {
          // was shutdown called? loop to figure it out.
          Thread.currentThread().interrupt();
        }
      }
      removeAllCursors();
      }
      catch (ChangelogException e)
      {
@@ -378,10 +438,11 @@
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        // TODO JNR error message i18n
      }
      catch (InterruptedException ignored)
    catch (DirectoryException e)
      {
        // was shutdown called?
      }
      if (debugEnabled())
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      // TODO JNR error message i18n
    }
  }
@@ -402,20 +463,32 @@
    }
  }
  private void removeAllCursors() throws ChangelogException
  {
    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
    {
      StaticUtils.close(map.values());
    }
    allCursors.clear();
    newCursors.clear();
    resetNextChangeForInsertDBCursor();
  }
  private void removeCursor(final DN baseDN, final CSN csn)
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors
        .entrySet())
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
        : allCursors.entrySet())
    {
      if (baseDN.equals(entry.getKey()))
      if (baseDN.equals(entry1.getKey()))
      {
        final Set<Integer> serverIds = entry.getValue().keySet();
        for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();)
        for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
            entry1.getValue().entrySet().iterator(); iter.hasNext();)
        {
          final int serverId = iter.next();
          if (csn.getServerId() == serverId)
          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
          if (csn.getServerId() == entry2.getKey())
          {
            iter.remove();
            StaticUtils.close(entry2.getValue());
            return;
          }
        }
@@ -428,12 +501,13 @@
    if (!newCursors.isEmpty())
    {
      boolean newCursorAdded = false;
      for (Iterator<Entry<CSN, DN>> iter = newCursors.entrySet().iterator();
          iter.hasNext();)
      for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
          newCursors.entrySet().iterator(); iter.hasNext();)
      {
        final Entry<CSN, DN> entry = iter.next();
        final CSN csn = entry.getKey();
        if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null))
        final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
        final DN baseDN = entry.getKey().getFirst();
        final CSN csn = entry.getValue();
        if (!ensureCursorExists(baseDN, csn.getServerId(), csn))
        {
          newCursorAdded = true;
        }
@@ -441,9 +515,29 @@
      }
      if (newCursorAdded)
      {
        crossDomainDBCursor = newCompositeDBCursor();
        resetNextChangeForInsertDBCursor();
      }
    }
  }
  /**
   * Asks the current thread to clear its state.
   * <p>
   * This method is only useful for unit tests.
   */
  public void clear()
  {
    doClear.set(true);
    synchronized (this)
    {
      notify();
    }
    while (doClear.get())
    {
      // wait until clear() has been done by thread
      // ensures unit tests wait that this thread's state is cleaned up
      Thread.yield();
    }
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -295,6 +295,30 @@
          replicationServer.shutdown();
        break;
      }
      try {
        trim(shutdown);
        synchronized (this)
        {
          try
          {
            wait(1000);
          } catch (InterruptedException e)
          {
            Thread.currentThread().interrupt();
          }
        }
      } catch (Exception end)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(end));
        logError(mb.toMessage());
        if (replicationServer != null)
          replicationServer.shutdown();
        break;
      }
    }
    synchronized (this)
@@ -306,9 +330,13 @@
  /**
   * Trim old changes from this database.
   * @throws ChangelogException In case of database problem.
   *
   * @param shutdown
   *          AtomicBoolean telling whether the current run must be stopped
   * @throws ChangelogException
   *           In case of database problem.
   */
  private void trim(AtomicBoolean shutdown) throws ChangelogException
  public void trim(AtomicBoolean shutdown) throws ChangelogException
  {
    if (trimAge == 0)
      return;
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -31,9 +31,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
@@ -79,7 +81,7 @@
      domainToReplicaDBs =
          new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  private ReplicationDbEnv dbEnv;
  private final String dbDirectoryName;
  private ReplicationServerCfg config;
  private final File dbDirectory;
  /**
@@ -89,6 +91,8 @@
   * Guarded by cnIndexDBLock
   */
  private JEChangeNumberIndexDB cnIndexDB;
  private final AtomicReference<ChangeNumberIndexer> cnIndexer =
      new AtomicReference<ChangeNumberIndexer>();
  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
  private final Object cnIndexDBLock = new Object();
@@ -131,17 +135,17 @@
   *
   * @param replicationServer
   *          the local replication server.
   * @param dbDirName
   *          the directory for use by the replication database
   * @param config
   *          the replication server configuration
   * @throws ConfigException
   *           if a problem occurs opening the supplied directory
   */
  public JEChangelogDB(ReplicationServer replicationServer, String dbDirName)
      throws ConfigException
  public JEChangelogDB(ReplicationServer replicationServer,
      ReplicationServerCfg config) throws ConfigException
  {
    this.config = config;
    this.replicationServer = replicationServer;
    this.dbDirectoryName = dbDirName != null ? dbDirName : "changelogDb";
    this.dbDirectory = makeDir(this.dbDirectoryName);
    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
  }
  private File makeDir(String dbDirName) throws ConfigException
@@ -303,9 +307,19 @@
  {
    try
    {
      dbEnv = new ReplicationDbEnv(
          getFileForPath(dbDirectoryName).getAbsolutePath(), replicationServer);
      initializeChangelogState(dbEnv.readChangelogState());
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = dbEnv.readChangelogState();
      initializeChangelogState(changelogState);
      if (config.isComputeChangenumber())
      {
        final ChangeNumberIndexer indexer =
            new ChangeNumberIndexer(this, changelogState);
        if (cnIndexer.compareAndSet(null, indexer))
        {
          indexer.start();
        }
      }
    }
    catch (ChangelogException e)
    {
@@ -361,6 +375,12 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.initiateShutdown();
      cnIndexer.compareAndSet(indexer, null);
    }
    try
    {
      shutdownCNIndexDB();
@@ -411,6 +431,12 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.clear();
    }
    for (DN baseDN : this.domainToReplicaDBs.keySet())
    {
      removeDomain(baseDN);
@@ -617,6 +643,11 @@
  @Override
  public void setPurgeDelay(long delay)
  {
    final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
    if (cnIndexDB != null)
    {
      cnIndexDB.setPurgeDelay(delay);
    }
    for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
    {
      for (JEReplicaDB replicaDB : domainMap.values())
@@ -628,6 +659,31 @@
  /** {@inheritDoc} */
  @Override
  public void setComputeChangeNumber(boolean computeChangeNumber)
      throws ChangelogException
  {
    final ChangeNumberIndexer indexer;
    if (computeChangeNumber)
    {
      final ChangelogState changelogState = dbEnv.readChangelogState();
      indexer = new ChangeNumberIndexer(this, changelogState);
      if (cnIndexer.compareAndSet(null, indexer))
      {
        indexer.start();
      }
    }
    else
    {
      indexer = cnIndexer.getAndSet(null);
      if (indexer != null)
      {
        indexer.initiateShutdown();
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public long getDomainLatestTrimDate(DN baseDN)
  {
    long latest = 0;
@@ -693,7 +749,8 @@
    for (int serverId : serverIds)
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState.getCSN(serverId);
      final CSN lastCSN = startAfterServerState != null ?
          startAfterServerState.getCSN(serverId) : null;
      cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
    }
    return new CompositeDBCursor<Void>(cursors);
@@ -751,6 +808,11 @@
    final boolean wasCreated = pair.getSecond();
    replicaDB.add(updateMsg);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.publishUpdateMsg(baseDN, updateMsg);
    }
    return wasCreated;
  }
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -48,6 +48,7 @@
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
import static org.opends.messages.JebMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -246,11 +247,12 @@
    }
    catch (RuntimeException e)
    {
      throw new ChangelogException(e);
      final Message message = ERR_JEB_DATABASE_EXCEPTION.get(e.getMessage());
      throw new ChangelogException(message, e);
    }
    catch (DirectoryException e)
    {
      throw new ChangelogException(e);
      throw new ChangelogException(e.getMessageObject(), e);
    }
    finally
    {
@@ -439,7 +441,7 @@
    }
    catch (DatabaseException e)
    {
      logError(newErrorMessage(null, e));
      logError(closeDBErrorMessage(null, e));
    }
  }
@@ -452,11 +454,11 @@
    }
    catch (DatabaseException e)
    {
      logError(newErrorMessage(db.getDatabaseName(), e));
      logError(closeDBErrorMessage(db.getDatabaseName(), e));
    }
  }
  private Message newErrorMessage(String dbName, DatabaseException e)
  private Message closeDBErrorMessage(String dbName, DatabaseException e)
  {
    if (dbName != null)
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -151,6 +151,7 @@
      new ReplServerFakeConfiguration(
          replicationServerPort, "ExternalChangeLogTestDb",
          0, 71, 0, maxWindow, null);
    conf1.setComputeChangenumber(true);
    replicationServer = new ReplicationServer(conf1);
    debugInfo("configure", "ReplicationServer created"+replicationServer);
@@ -169,6 +170,7 @@
  @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerPreTest"})
  public void ECLReplicationServerTest() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    // Following test does not create RSDomain (only broker) but want to test
    // ECL .. so let's enable ECl manually
    // Now that we tested that ECl is not available
@@ -191,6 +193,7 @@
  @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerTest1() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    // Test with a mix of domains, a mix of DSes
    ECLTwoDomains();
  }
@@ -205,6 +208,7 @@
  @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerTest3() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    // Write changes and read ECL from start
    ECLCompatWriteReadAllOps(1);
@@ -263,6 +267,7 @@
  @Test(enabled=true, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerFullTest3() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    // Test all types of ops.
    ECLAllOps(); // Do not clean the db for the next test
@@ -353,6 +358,8 @@
  @Test(enabled=true, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerFullTest15() throws Exception
  {
    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    cnIndexDB.setPurgeDelay(0);
    // Write 4 changes and read ECL from start
    ECLCompatWriteReadAllOps(1);
@@ -373,6 +380,8 @@
    ECLCompatTestLimitsAndAdd(1, 8, 4);
    // Test CNIndexDB is purged when replication change log is purged
    cnIndexDB.setPurgeDelay(1);
    cnIndexDB.trim(null);
    ECLPurgeCNIndexDBAfterChangelogClear();
    // Test first and last are updated
@@ -1949,6 +1958,16 @@
    clearChangelogDB(replicationServer);
  }
  @AfterTest
  public void setPurgeDelayToInitialValue() throws Exception
  {
    JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    if (cnIndexDB != null)
    {
      cnIndexDB.setPurgeDelay(1);
    }
  }
  /**
   * After the tests stop the replicationServer.
   */
@@ -2593,6 +2612,15 @@
    debugInfo(tn, "Ending test with success");
  }
  private JEChangeNumberIndexDB getCNIndexDB()
  {
    if (replicationServer != null)
    {
      return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
    }
    return null;
  }
  private void ECLGetEligibleCountTest() throws Exception
  {
    String tn = "ECLGetEligibleCountTest";
@@ -2604,6 +2632,7 @@
    final CSN csn2 = csns[1];
    final CSN csn3 = csns[2];
    getCNIndexDB().setPurgeDelay(0);
    ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN);
    // this empty state will force to count from the start of the DB
    final ServerState fromStart = new ServerState();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -40,6 +40,7 @@
 * This Class implements an object that can be used to instantiate
 * The ReplicationServer class for tests purpose.
 */
@SuppressWarnings("javadoc")
public class ReplServerFakeConfiguration implements ReplicationServerCfg
{
  private int port;
@@ -67,6 +68,7 @@
  /** The monitoring publisher period. */
  private long monitoringPeriod = 3000;
  private boolean computeChangenumber;
  /**
   * Constructor without group id, assured info and weight
@@ -140,15 +142,17 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void addChangeListener(
      ConfigurationChangeListener<ReplicationServerCfg> listener)
  {
    // not supported
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public Class<? extends ReplicationServerCfg> configurationClass()
  {
    return null;
@@ -157,6 +161,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public String getReplicationDBDirectory()
  {
    return dirName;
@@ -165,6 +170,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public int getReplicationPort()
  {
    return port;
@@ -173,6 +179,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public long getReplicationPurgeDelay()
  {
    return purgeDelay;
@@ -181,6 +188,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public SortedSet<String> getReplicationServer()
  {
     return servers;
@@ -189,6 +197,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public int getReplicationServerId()
  {
    return serverId;
@@ -197,6 +206,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public int getQueueSize()
  {
    return queueSize;
@@ -205,6 +215,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public int getWindowSize()
  {
    return windowSize;
@@ -213,15 +224,17 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void removeChangeListener(
      ConfigurationChangeListener<ReplicationServerCfg> listener)
  {
    // not supported
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public DN dn()
  {
    return null;
@@ -234,16 +247,19 @@
    return null;
  }
  @Override
  public int getGroupId()
  {
    return groupId;
  }
  @Override
  public long getAssuredTimeout()
  {
    return assuredTimeout;
  }
  @Override
  public int getDegradedStatusThreshold()
  {
    return degradedStatusThreshold;
@@ -254,22 +270,31 @@
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
  @Override
  public int getWeight()
  {
    return weight;
  }
  @Override
  public long getMonitoringPeriod()
  {
    return monitoringPeriod;
  }
  /**
   * @param monitoringPeriod the monitoringPeriod to set
   */
  public void setMonitoringPeriod(long monitoringPeriod)
  {
    this.monitoringPeriod = monitoringPeriod;
  }
  @Override
  public boolean isComputeChangenumber()
  {
    return computeChangenumber;
  }
  public void setComputeChangenumber(boolean computeChangenumber)
  {
    this.computeChangenumber = computeChangenumber;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -402,6 +402,11 @@
    {
      final ReplicatedUpdateMsg msg = msgs[i];
      final ChangeNumberIndexRecord record = allValues.get(i);
      if (previousCookie.isEmpty())
      {
        // ugly hack to go round strange legacy code
        previousCookie.replace(record.getBaseDN(), new ServerState());
      }
      // check content in order
      String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">";
      assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());