mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
07.29.2014 7ea8ac48d10e033ba0d6ca0ec0d66ace144062a0
OPENDJ-1453 Replica offline messages should be synced with updates

In r10840, the change to JE/FileChangelogDB.getCursorFrom(DN, ServerState) unnecessarily triggers a lot of calls to the underlying DB (JE or file based) to retrieve the ChangelogState.
As an optimization, keeping an in-memory version of the ChangelogState in synch with the on-disk version will help.


ChangelogState.java:
Now thread safe.
Added removeOfflineReplica(), isEqualTo().
Changed domainToServerIds from Map<DN, List<Integer>> to Map<DN, Set<Integer>>.
In getOfflineReplicas(), now return a MultiDomainServerState.

ChangeNumberIndexer.java:
Consequence of the changes to ChangelogState.


ReplicationDbEnv.java:
Added changelogState field, updated at the same time as the on-disk changelogstate DB
Added getChangelogState(), called by client code instead of readChangelogState().
Renamed readChangelogState() to private readOnDiskChangelogState().
Added stateLock field to sync updates to in-memory and on-disk changelog state.

ReplicationEnvironment.java:
Added changelogState field, updated at the same time as the on-disk changelogstate DB
Added getChangelogState(), called by client code instead of readChangelogState().
Renamed readChangelogState() to private readOnDiskChangelogState().
Renamed domainLock field to domainsLock.

replication.properties:
Removed now unused error message.

FileChangelogDB.java, JEChangelogDB.java:
Consequence of the changes to ChangelogState and ReplicationEnvironment/ReplicationDbEnv.


MultiDomainServerState.java, ServerState.java:
Added getSnapshot() for unit tests.

ReplicationEnvironmentTest.java, ReplicationDbEnvTest.java:
Consequence of the changes to ReplicationEnvironment and ChangelogState.
Used the fake server.
11 files modified
425 ■■■■ changed files
opends/src/messages/messages/replication.properties 2 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java 27 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java 14 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ChangelogState.java 89 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 19 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java 100 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 19 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 72 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java 52 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java 18 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -531,8 +531,6 @@
 change %s to replicaDB %s %s because: %s
SEVERE_ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB_240=Could not add \
 change %s to replicaDB %s %s because flushing thread is shutting down
SEVERE_ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \
 state from root path '%s' : directory might not exist
SEVERE_ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY_243=Error when retrieving \
 changelog state from root path '%s' : IO error on domain directory '%s' when retrieving \
 list of server ids
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -26,7 +26,10 @@
 */
package org.opends.server.replication.common;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -168,7 +171,31 @@
  }
  /**
   * Returns a snapshot of this object.
   *
   * @return an unmodifiable Map representing a snapshot of this object.
   */
  public Map<DN, List<CSN>> getSnapshot()
  {
    if (list.isEmpty())
    {
      return Collections.emptyMap();
    }
    final Map<DN, List<CSN>> map = new HashMap<DN, List<CSN>>();
    for (Entry<DN, ServerState> entry : list.entrySet())
    {
      final List<CSN> l = entry.getValue().getSnapshot();
      if (!l.isEmpty())
      {
        map.put(entry.getKey(), l);
      }
    }
    return Collections.unmodifiableMap(map);
  }
  /**
   * Returns a string representation of this object.
   *
   * @return The string representation.
   */
  @Override
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -250,6 +250,20 @@
  }
  /**
   * Returns a snapshot of this object.
   *
   * @return an unmodifiable List representing a snapshot of this object.
   */
  public List<CSN> getSnapshot()
  {
    if (serverIdToCSN.isEmpty())
    {
      return Collections.emptyList();
    }
    return Collections.unmodifiableList(new ArrayList<CSN>(serverIdToCSN.values()));
  }
  /**
   * Return the text representation of ServerState.
   * @return the text representation of ServerState
   */
opends/src/server/org/opends/server/replication/server/ChangelogState.java
@@ -25,12 +25,13 @@
 */
package org.opends.server.replication.server;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.types.DN;
/**
@@ -43,15 +44,16 @@
 * <p>
 * This class is used during replication initialization to decouple the code
 * that reads the changelogStateDB from the code that makes use of its data.
 *
 * @ThreadSafe
 */
public class ChangelogState
{
  private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>();
  private final Map<DN, List<Integer>> domainToServerIds =
      new HashMap<DN, List<Integer>>();
  private final Map<DN, List<CSN>> offlineReplicas =
      new HashMap<DN, List<CSN>>();
  private final ConcurrentSkipListMap<DN, Long> domainToGenerationId = new ConcurrentSkipListMap<DN, Long>();
  private final ConcurrentSkipListMap<DN, Set<Integer>> domainToServerIds =
      new ConcurrentSkipListMap<DN, Set<Integer>>();
  private final MultiDomainServerState offlineReplicas = new MultiDomainServerState();
  /**
   * Sets the generationId for the supplied replication domain.
@@ -76,11 +78,16 @@
   */
  public void addServerIdToDomain(int serverId, DN baseDN)
  {
    List<Integer> serverIds = domainToServerIds.get(baseDN);
    Set<Integer> serverIds = domainToServerIds.get(baseDN);
    if (serverIds == null)
    {
      serverIds = new LinkedList<Integer>();
      domainToServerIds.put(baseDN, serverIds);
      serverIds = new HashSet<Integer>();
      final Set<Integer> existingServerIds =
          domainToServerIds.putIfAbsent(baseDN, serverIds);
      if (existingServerIds != null)
      {
        serverIds = existingServerIds;
      }
    }
    serverIds.add(serverId);
  }
@@ -95,13 +102,25 @@
   */
  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
  {
    List<CSN> offlineCSNs = offlineReplicas.get(baseDN);
    if (offlineCSNs == null)
    offlineReplicas.update(baseDN, offlineCSN);
  }
  /**
   * Removes the following replica information from the offline list.
   *
   * @param baseDN
   *          the baseDN of the offline replica
   * @param serverId
   *          the serverId that is not offline anymore
   */
  public void removeOfflineReplica(DN baseDN, int serverId)
  {
    CSN csn;
    do
    {
      offlineCSNs = new LinkedList<CSN>();
      offlineReplicas.put(baseDN, offlineCSNs);
      csn = offlineReplicas.getCSN(baseDN, serverId);
    }
    offlineCSNs.add(offlineCSN);
    while (csn != null && !offlineReplicas.removeCSN(baseDN, csn));
  }
  /**
@@ -119,21 +138,51 @@
   *
   * @return a Map of domainBaseDN => List&lt;serverId&gt;.
   */
  public Map<DN, List<Integer>> getDomainToServerIds()
  public Map<DN, Set<Integer>> getDomainToServerIds()
  {
    return domainToServerIds;
  }
  /**
   * Returns the Map of domainBaseDN => List&lt;offlineCSN&gt;.
   * Returns the internal MultiDomainServerState for offline replicas.
   *
   * @return a Map of domainBaseDN => List&lt;offlineCSN&gt;.
   * @return the MultiDomainServerState for offline replicas.
   */
  public Map<DN, List<CSN>> getOfflineReplicas()
  public MultiDomainServerState getOfflineReplicas()
  {
    return offlineReplicas;
  }
  /**
   * Returns whether the current ChangelogState is equal to the provided
   * ChangelogState.
   * <p>
   * Note: Only use for tests!!<br>
   * This method should only be used by tests because it creates a lot of
   * intermediate objects which is not suitable for production.
   *
   * @param other
   *          the ChangelogState to compare with
   * @return true if the current ChangelogState is equal to the provided
   *         ChangelogState, false otherwise.
   */
  public boolean isEqualTo(ChangelogState other)
  {
    if (other == null)
    {
      return false;
    }
    if (this == other)
    {
      return true;
    }
    return domainToGenerationId.equals(other.domainToGenerationId)
        && domainToServerIds.equals(other.domainToServerIds)
        // Note: next line is not suitable for production
        // because it creates lots of Lists and Maps
        && offlineReplicas.getSnapshot().equals(other.offlineReplicas.getSnapshot());
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -38,6 +38,7 @@
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -259,7 +260,7 @@
    {
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = replicationEnv.readChangelogState();
      final ChangelogState changelogState = replicationEnv.getChangelogState();
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
@@ -284,7 +285,7 @@
    {
      replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
    }
    for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      for (int serverId : entry.getValue())
      {
@@ -568,7 +569,7 @@
  {
    if (computeChangeNumber)
    {
      startIndexer(replicationEnv.readChangelogState());
      startIndexer(replicationEnv.getChangelogState());
    }
    else
    {
@@ -632,7 +633,7 @@
      throws ChangelogException
  {
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final ChangelogState state = replicationEnv.readChangelogState();
    final MultiDomainServerState offlineReplicas = replicationEnv.getChangelogState().getOfflineReplicas();
    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    for (int serverId : serverIds)
    {
@@ -640,7 +641,7 @@
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      replicaDBCursor.next();
      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
    }
    // recycle exhausted cursors,
@@ -648,13 +649,13 @@
    return new CompositeDBCursor<Void>(cursors, true);
  }
  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
      ServerState startAfterServerState)
  {
    final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
    if (domain != null)
    final ServerState domainState = offlineReplicas.getServerState(baseDN);
    if (domainState != null)
    {
      for (CSN offlineCSN : domain)
      for (CSN offlineCSN : domainState)
      {
        if (serverId == offlineCSN.getServerId()
            && !startAfterServerState.cover(offlineCSN))
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -173,15 +173,33 @@
  /** Root path where the replication log is stored. */
  private final String replicationRootPath;
  /**
   * The current changelogState. This is in-memory version of what is inside the
   * on-disk changelogStateDB. It improves performances in case the
   * changelogState is read often.
   *
   * @GuardedBy("domainsLock")
   */
  private final ChangelogState changelogState;
  /** The list of logs that are in use. */
  private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
  /** Maps each domain DN to a domain id that is used to name directory in file system. */
  /**
   * Maps each domain DN to a domain id that is used to name directory in file system.
   *
   * @GuardedBy("domainsLock")
   */
  private final Map<DN, String> domains = new HashMap<DN, String>();
  /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
  private final Object domainLock = new Object();
  /**
   * Exclusive lock to synchronize:
   * <ul>
   * <li>the domains mapping</li>
   * <li>changes to the in-memory changelogState</li>
   * <li>changes to the on-disk state of a domain</li>
   */
  private final Object domainsLock = new Object();
  /** The underlying replication server. */
  private final ReplicationServer replicationServer;
@@ -203,21 +221,21 @@
  {
    this.replicationRootPath = rootPath;
    this.replicationServer = replicationServer;
    this.changelogState = readOnDiskChangelogState();
  }
  /**
   * Returns the state of the replication changelog, which includes the list of
   * known servers and the generation id.
   * Returns the state of the replication changelog.
   *
   * @return the {@link ChangelogState}
   * @return the {@link ChangelogState} read from the changelogState DB
   * @throws ChangelogException
   *           if a problem occurs while retrieving the state.
   *           if a database problem occurs
   */
  ChangelogState readChangelogState() throws ChangelogException
  ChangelogState readOnDiskChangelogState() throws ChangelogException
  {
    final ChangelogState state = new ChangelogState();
    final File changelogPath = new File(replicationRootPath);
    synchronized (domainLock)
    synchronized (domainsLock)
    {
      readDomainsStateFile();
      checkDomainDirectories(changelogPath);
@@ -230,6 +248,16 @@
  }
  /**
   * Returns the current state of the replication changelog.
   *
   * @return the current {@link ChangelogState}
   */
  ChangelogState getChangelogState()
  {
    return changelogState;
  }
  /**
   * Finds or creates the log used to store changes from the replication server
   * with the given serverId and the given baseDN.
   *
@@ -256,7 +284,7 @@
      ensureRootDirectoryExists();
      String domainId = null;
      synchronized (domainLock)
      synchronized (domainsLock)
      {
        domainId = domains.get(domainDN);
        if (domainId == null)
@@ -266,9 +294,11 @@
        final File serverIdPath = getServerIdPath(domainId, serverId);
        ensureServerIdDirectoryExists(serverIdPath);
        changelogState.addServerIdToDomain(serverId, domainDN);
        final File generationIdPath = getGenerationIdPath(domainId, generationId);
        ensureGenerationIdFileExists(generationIdPath);
        changelogState.setDomainGenerationId(domainDN, generationId);
        return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
      }
@@ -333,12 +363,12 @@
   */
  void clearGenerationId(final DN domainDN) throws ChangelogException
  {
    synchronized (domainLock)
    synchronized (domainsLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
        return; // unknown domain => no-op
      }
      final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
      if (idFile != null)
@@ -350,6 +380,7 @@
              ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
        }
      }
      changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID);
    }
  }
@@ -364,16 +395,17 @@
   */
  void resetGenerationId(final DN baseDN) throws ChangelogException
  {
    synchronized (domainLock)
    synchronized (domainsLock)
    {
      clearGenerationId(baseDN);
      final String domainId = domains.get(baseDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
        return; // unknown domain => no-op
      }
      final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
      ensureGenerationIdFileExists(generationIdPath);
      changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID);
    }
  }
@@ -390,12 +422,12 @@
   */
  void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
  {
    synchronized (domainLock)
    synchronized (domainsLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
        return; // unknown domain => no-op
      }
      final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
      if (!serverIdPath.exists())
@@ -409,6 +441,7 @@
        // Overwrite file, only the last sent offline CSN is kept
        writer = newFileWriter(offlineFile);
        writer.write(offlineCSN.toString());
        changelogState.addOfflineReplica(domainDN, offlineCSN);
      }
      catch (IOException e)
      {
@@ -435,12 +468,12 @@
   */
  void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
  {
    synchronized (domainLock)
    synchronized (domainsLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknow domain => no-op
        return; // unknown domain => no-op
      }
      final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
      if (offlineFile.exists())
@@ -452,6 +485,7 @@
              offlineFile.getPath(), domainDN.toString(), serverId));
        }
      }
      changelogState.removeOfflineReplica(domainDN, serverId);
    }
  }
@@ -497,24 +531,22 @@
  private void checkDomainDirectories(final File changelogPath) throws ChangelogException
  {
    final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
    if (dnDirectories == null)
    if (dnDirectories != null)
    {
      throw new ChangelogException(ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH.get(replicationRootPath));
    }
      final Set<String> domainIdsFromFileSystem = new HashSet<String>();
      for (final File dnDir : dnDirectories)
      {
        final String fileName = dnDir.getName();
        final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
        domainIdsFromFileSystem.add(domainId);
      }
    Set<String> domainIdsFromFileSystem = new HashSet<String>();
    for (final File dnDir : dnDirectories)
    {
      final String fileName = dnDir.getName();
      final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
      domainIdsFromFileSystem.add(domainId);
    }
    Set<String> expectedDomainIds = new HashSet<String>(domains.values());
    if (!domainIdsFromFileSystem.equals(expectedDomainIds))
    {
      throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
          domainIdsFromFileSystem.toString()));
      final Set<String> expectedDomainIds = new HashSet<String>(domains.values());
      if (!domainIdsFromFileSystem.equals(expectedDomainIds))
      {
        throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
            domainIdsFromFileSystem.toString()));
      }
    }
  }
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -33,9 +33,9 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -378,8 +378,7 @@
    // initialize the DB cursor and the last seen updates
    // to ensure the medium consistency CSN can move forward
    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
    for (Entry<DN, List<Integer>> entry
        : changelogState.getDomainToServerIds().entrySet())
    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
      if (!isECLEnabledDomain(baseDN))
@@ -422,12 +421,10 @@
      nextChangeForInsertDBCursor.next();
    }
    for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas()
        .entrySet())
    final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
    for (DN baseDN : offlineReplicas)
    {
      final DN baseDN = entry.getKey();
      final List<CSN> offlineCSNs = entry.getValue();
      for (CSN offlineCSN : offlineCSNs)
      for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
      {
        if (isECLEnabledDomain(baseDN))
        {
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -39,6 +39,7 @@
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -316,7 +317,7 @@
    {
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = dbEnv.readChangelogState();
      final ChangelogState changelogState = dbEnv.getChangelogState();
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
@@ -341,7 +342,7 @@
    {
      replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
    }
    for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      for (int serverId : entry.getValue())
      {
@@ -643,7 +644,7 @@
  {
    if (computeChangeNumber)
    {
      startIndexer(dbEnv.readChangelogState());
      startIndexer(dbEnv.getChangelogState());
    }
    else
    {
@@ -707,7 +708,7 @@
      throws ChangelogException
  {
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final ChangelogState state = dbEnv.readChangelogState();
    final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    for (int serverId : serverIds)
    {
@@ -715,7 +716,7 @@
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      replicaDBCursor.next();
      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
    }
    // recycle exhausted cursors,
@@ -723,13 +724,13 @@
    return new CompositeDBCursor<Void>(cursors, true);
  }
  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
      ServerState startAfterServerState)
  {
    final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
    if (domain != null)
    final ServerState domainState = offlineReplicas.getServerState(baseDN);
    if (domainState != null)
    {
      for (CSN offlineCSN : domain)
      for (CSN offlineCSN : domainState)
      {
        if (serverId == offlineCSN.getServerId()
            && !startAfterServerState.cover(offlineCSN))
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -66,6 +66,16 @@
{
  private Environment dbEnvironment;
  private Database changelogStateDb;
  /**
   * The current changelogState. This is in-memory version of what is inside the
   * on-disk changelogStateDB. It improves performances in case the
   * changelogState is read often.
   *
   * @GuardedBy("stateLock")
   */
  private final ChangelogState changelogState;
  /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState */
  private final Object stateLock = new Object();
  private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
  private ReplicationServer replicationServer;
  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
@@ -101,6 +111,7 @@
       * of all the servers that have been seen in the past.
       */
      changelogStateDb = openDatabase("changelogstate");
      changelogState = readOnDiskChangelogState();
    }
    catch (RuntimeException e)
    {
@@ -222,13 +233,23 @@
  }
  /**
   * Read and return the list of known servers from the database.
   * Return the current changelog state.
   *
   * @return the current {@link ChangelogState}
   */
  public ChangelogState getChangelogState()
  {
    return changelogState;
  }
  /**
   * Read and return the changelog state from the database.
   *
   * @return the {@link ChangelogState} read from the changelogState DB
   * @throws ChangelogException
   *           if a database problem occurs
   */
  public ChangelogState readChangelogState() throws ChangelogException
  protected ChangelogState readOnDiskChangelogState() throws ChangelogException
  {
    return decodeChangelogState(readWholeState());
  }
@@ -434,8 +455,13 @@
      // Opens the DB for the changes received from this server on this domain.
      final Database replicaDB = openDatabase(replicaEntry.getKey());
      putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
      putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
      synchronized (stateLock)
      {
        putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
        changelogState.addServerIdToDomain(serverId, baseDN);
        putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
        changelogState.setDomainGenerationId(baseDN, generationId);
      }
      return replicaDB;
    }
    catch (RuntimeException e)
@@ -638,9 +664,13 @@
   */
  public void clearGenerationId(DN baseDN) throws ChangelogException
  {
    final int unusedGenId = 0;
    deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
        "clearGenerationId(baseDN=" + baseDN + ")");
    synchronized (stateLock)
    {
      final int unusedGenId = 0;
      deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
          "clearGenerationId(baseDN=" + baseDN + ")");
      changelogState.setDomainGenerationId(baseDN, unusedGenId);
    }
  }
  /**
@@ -656,8 +686,12 @@
   */
  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
    synchronized (stateLock)
    {
      deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
          "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
      changelogState.setDomainGenerationId(baseDN, -1);
    }
  }
  private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
@@ -721,10 +755,14 @@
  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    // just overwrite any older entry as it is assumed a newly received offline
    // CSN is newer than the previous one
    putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
        "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
    synchronized (stateLock)
    {
      // just overwrite any older entry as it is assumed a newly received offline
      // CSN is newer than the previous one
      putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
          "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
      changelogState.addOfflineReplica(baseDN, offlineCSN);
    }
  }
  /**
@@ -742,8 +780,12 @@
   */
  public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
        "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
    synchronized (stateLock)
    {
      deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
          "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
      changelogState.removeOfflineReplica(baseDN, serverId);
    }
  }
  private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -25,9 +25,6 @@
 */
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,10 +42,14 @@
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
@SuppressWarnings("javadoc")
public class ReplicationEnvironmentTest extends DirectoryServerTestCase
{
@@ -65,7 +66,13 @@
  public void setUp() throws Exception
  {
    // This test suite depends on having the schema available for DN decoding.
    TestCaseUtils.startServer();
    TestCaseUtils.startFakeServer();
  }
  @AfterClass
  public void tearDown() throws Exception
  {
    TestCaseUtils.shutdownFakeServer();
  }
  @AfterMethod
@@ -92,11 +99,13 @@
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
      ChangelogState state = environment.readChangelogState();
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
      assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2);
      assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
      assertThat(state).isEqualTo(environment.getChangelogState());
    }
    finally
    {
@@ -124,7 +133,7 @@
        }
      }
      ChangelogState state = environment.readChangelogState();
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
      for (int i = 0; i <= 2 ; i++)
@@ -135,6 +144,8 @@
          MapEntry.entry(domainDNs.get(0), 1L),
          MapEntry.entry(domainDNs.get(1), 2L),
          MapEntry.entry(domainDNs.get(2), 3L));
      assertThat(state).isEqualTo(environment.getChangelogState());
    }
    finally
    {
@@ -160,9 +171,12 @@
      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
      environment.notifyReplicaOffline(domainDN, offlineCSN);
      ChangelogState state = environment.readChangelogState();
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
      assertThat(state.getOfflineReplicas().getSnapshot())
          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
      assertThat(state).isEqualTo(environment.getChangelogState());
    }
    finally
    {
@@ -186,7 +200,7 @@
      File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME);
      offlineStateFile.createNewFile();
      environment.readChangelogState();
      environment.readOnDiskChangelogState();
    }
    finally
    {
@@ -214,9 +228,10 @@
      CSN lastOfflineCSN = csnGenerator.newCSN();
      environment.notifyReplicaOffline(domainDN, lastOfflineCSN);
      ChangelogState state = environment.readChangelogState();
      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getOfflineReplicas().getSnapshot())
          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
      assertThat(state).isEqualTo(environment.getChangelogState());
    }
    finally
    {
@@ -243,9 +258,9 @@
      // put server id 1 online again
      environment.notifyReplicaOnline(domainDN, SERVER_ID_1);
      ChangelogState state = environment.readChangelogState();
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getOfflineReplicas()).isEmpty();
      assertThat(state).isEqualTo(environment.getChangelogState());
    }
    finally
    {
@@ -269,12 +284,15 @@
      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
      environment.notifyReplicaOffline(domainDN, offlineCSN);
      ChangelogState state = environment.readChangelogState();
      final ChangelogState state = environment.readOnDiskChangelogState();
      assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
      assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1);
      assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
      assertThat(state.getOfflineReplicas().getSnapshot())
          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
      assertThat(state).isEqualTo(environment.getChangelogState());
    }
    finally
    {
@@ -299,7 +317,7 @@
      // consistency with domain state file
      StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
      environment.readChangelogState();
      environment.readOnDiskChangelogState();
    }
    finally
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.HashSet;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
@@ -70,11 +71,16 @@
    }
    @Override
    protected Database openDatabase(String databaseName)
        throws ChangelogException, RuntimeException
    protected Database openDatabase(String databaseName) throws ChangelogException, RuntimeException
    {
      return null;
    }
    @Override
    protected ChangelogState readOnDiskChangelogState() throws ChangelogException
    {
      return new ChangelogState();
    }
  }
  @BeforeClass
@@ -130,8 +136,8 @@
        entry(baseDN, generationId));
    if (!replicas.isEmpty())
    {
      assertThat(state.getDomainToServerIds()).containsExactly(
          entry(baseDN, replicas));
      assertThat(state.getDomainToServerIds())
          .containsExactly(entry(baseDN, new HashSet<Integer>(replicas)));
    }
    else
    {
@@ -139,8 +145,8 @@
    }
    if (!offlineReplicas.isEmpty())
    {
      assertThat(state.getOfflineReplicas()).containsExactly(
          entry(baseDN, offlineReplicas));
      assertThat(state.getOfflineReplicas().getSnapshot())
          .containsExactly(entry(baseDN, offlineReplicas));
    }
    else
    {