From 2f54bf6d307ffcdd3c65937e181b16bffb277a2c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 07 Jul 2014 15:29:48 +0000
Subject: [PATCH] OPENDJ-1453 Replica offline messages should be synced with updates

---
 opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java                                            |   27 +++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                        |   19 +-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java                                     |   72 +++++++-
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java         |   18 +
 opendj-sdk/opends/src/messages/messages/replication.properties                                                                           |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java                             |  100 ++++++++----
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                                  |   13 -
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java |   52 ++++--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java                                    |   19 +-
 opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java                                                       |   14 +
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogState.java                                                    |   89 ++++++++--
 11 files changed, 305 insertions(+), 120 deletions(-)

diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index d049647..e8a0ac6 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -531,8 +531,6 @@
  change %s to replicaDB %s %s because: %s
 SEVERE_ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB_240=Could not add \
  change %s to replicaDB %s %s because flushing thread is shutting down
-SEVERE_ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \
- state from root path '%s' : directory might not exist
 SEVERE_ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY_243=Error when retrieving \
  changelog state from root path '%s' : IO error on domain directory '%s' when retrieving \
  list of server ids
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index f451cab..705a9a2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -26,7 +26,10 @@
  */
 package org.opends.server.replication.common;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
@@ -168,7 +171,31 @@
   }
 
   /**
+   * Returns a snapshot of this object.
+   *
+   * @return an unmodifiable Map representing a snapshot of this object.
+   */
+  public Map<DN, List<CSN>> getSnapshot()
+  {
+    if (list.isEmpty())
+    {
+      return Collections.emptyMap();
+    }
+    final Map<DN, List<CSN>> map = new HashMap<DN, List<CSN>>();
+    for (Entry<DN, ServerState> entry : list.entrySet())
+    {
+      final List<CSN> l = entry.getValue().getSnapshot();
+      if (!l.isEmpty())
+      {
+        map.put(entry.getKey(), l);
+      }
+    }
+    return Collections.unmodifiableMap(map);
+  }
+
+  /**
    * Returns a string representation of this object.
+   *
    * @return The string representation.
    */
   @Override
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index 73d9714..c084dec 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -250,6 +250,20 @@
   }
 
   /**
+   * Returns a snapshot of this object.
+   *
+   * @return an unmodifiable List representing a snapshot of this object.
+   */
+  public List<CSN> getSnapshot()
+  {
+    if (serverIdToCSN.isEmpty())
+    {
+      return Collections.emptyList();
+    }
+    return Collections.unmodifiableList(new ArrayList<CSN>(serverIdToCSN.values()));
+  }
+
+  /**
    * Return the text representation of ServerState.
    * @return the text representation of ServerState
    */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogState.java
index a0cb04b..503851e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogState.java
@@ -25,12 +25,13 @@
  */
 package org.opends.server.replication.server;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.types.DN;
 
 /**
@@ -43,15 +44,16 @@
  * <p>
  * This class is used during replication initialization to decouple the code
  * that reads the changelogStateDB from the code that makes use of its data.
+ *
+ * @ThreadSafe
  */
 public class ChangelogState
 {
 
-  private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>();
-  private final Map<DN, List<Integer>> domainToServerIds =
-      new HashMap<DN, List<Integer>>();
-  private final Map<DN, List<CSN>> offlineReplicas =
-      new HashMap<DN, List<CSN>>();
+  private final ConcurrentSkipListMap<DN, Long> domainToGenerationId = new ConcurrentSkipListMap<DN, Long>();
+  private final ConcurrentSkipListMap<DN, Set<Integer>> domainToServerIds =
+      new ConcurrentSkipListMap<DN, Set<Integer>>();
+  private final MultiDomainServerState offlineReplicas = new MultiDomainServerState();
 
   /**
    * Sets the generationId for the supplied replication domain.
@@ -76,11 +78,16 @@
    */
   public void addServerIdToDomain(int serverId, DN baseDN)
   {
-    List<Integer> serverIds = domainToServerIds.get(baseDN);
+    Set<Integer> serverIds = domainToServerIds.get(baseDN);
     if (serverIds == null)
     {
-      serverIds = new LinkedList<Integer>();
-      domainToServerIds.put(baseDN, serverIds);
+      serverIds = new HashSet<Integer>();
+      final Set<Integer> existingServerIds =
+          domainToServerIds.putIfAbsent(baseDN, serverIds);
+      if (existingServerIds != null)
+      {
+        serverIds = existingServerIds;
+      }
     }
     serverIds.add(serverId);
   }
@@ -95,13 +102,25 @@
    */
   public void addOfflineReplica(DN baseDN, CSN offlineCSN)
   {
-    List<CSN> offlineCSNs = offlineReplicas.get(baseDN);
-    if (offlineCSNs == null)
+    offlineReplicas.update(baseDN, offlineCSN);
+  }
+
+  /**
+   * Removes the following replica information from the offline list.
+   *
+   * @param baseDN
+   *          the baseDN of the offline replica
+   * @param serverId
+   *          the serverId that is not offline anymore
+   */
+  public void removeOfflineReplica(DN baseDN, int serverId)
+  {
+    CSN csn;
+    do
     {
-      offlineCSNs = new LinkedList<CSN>();
-      offlineReplicas.put(baseDN, offlineCSNs);
+      csn = offlineReplicas.getCSN(baseDN, serverId);
     }
-    offlineCSNs.add(offlineCSN);
+    while (csn != null && !offlineReplicas.removeCSN(baseDN, csn));
   }
 
   /**
@@ -119,21 +138,51 @@
    *
    * @return a Map of domainBaseDN => List&lt;serverId&gt;.
    */
-  public Map<DN, List<Integer>> getDomainToServerIds()
+  public Map<DN, Set<Integer>> getDomainToServerIds()
   {
     return domainToServerIds;
   }
 
   /**
-   * Returns the Map of domainBaseDN => List&lt;offlineCSN&gt;.
+   * Returns the internal MultiDomainServerState for offline replicas.
    *
-   * @return a Map of domainBaseDN => List&lt;offlineCSN&gt;.
+   * @return the MultiDomainServerState for offline replicas.
    */
-  public Map<DN, List<CSN>> getOfflineReplicas()
+  public MultiDomainServerState getOfflineReplicas()
   {
     return offlineReplicas;
   }
 
+  /**
+   * Returns whether the current ChangelogState is equal to the provided
+   * ChangelogState.
+   * <p>
+   * Note: Only use for tests!!<br>
+   * This method should only be used by tests because it creates a lot of
+   * intermediate objects which is not suitable for production.
+   *
+   * @param other
+   *          the ChangelogState to compare with
+   * @return true if the current ChangelogState is equal to the provided
+   *         ChangelogState, false otherwise.
+   */
+  public boolean isEqualTo(ChangelogState other)
+  {
+    if (other == null)
+    {
+      return false;
+    }
+    if (this == other)
+    {
+      return true;
+    }
+    return domainToGenerationId.equals(other.domainToGenerationId)
+        && domainToServerIds.equals(other.domainToServerIds)
+        // Note: next line is not suitable for production
+        // because it creates lots of Lists and Maps
+        && offlineReplicas.getSnapshot().equals(other.offlineReplicas.getSnapshot());
+  }
+
   /** {@inheritDoc} */
   @Override
   public String toString()
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 19e68b6..7592fd9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -38,6 +38,7 @@
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
@@ -259,7 +260,7 @@
     {
       final File dbDir = getFileForPath(config.getReplicationDBDirectory());
       replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer);
-      final ChangelogState changelogState = replicationEnv.readChangelogState();
+      final ChangelogState changelogState = replicationEnv.getChangelogState();
       initializeToChangelogState(changelogState);
       if (config.isComputeChangeNumber())
       {
@@ -284,7 +285,7 @@
     {
       replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
     }
-    for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
+    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
     {
       for (int serverId : entry.getValue())
       {
@@ -568,7 +569,7 @@
   {
     if (computeChangeNumber)
     {
-      startIndexer(replicationEnv.readChangelogState());
+      startIndexer(replicationEnv.getChangelogState());
     }
     else
     {
@@ -632,7 +633,7 @@
       throws ChangelogException
   {
     final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
-    final ChangelogState state = replicationEnv.readChangelogState();
+    final MultiDomainServerState offlineReplicas = replicationEnv.getChangelogState().getOfflineReplicas();
     final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
     for (int serverId : serverIds)
     {
@@ -640,7 +641,7 @@
       final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
       final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
       replicaDBCursor.next();
-      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
+      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
       cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
     }
     // recycle exhausted cursors,
@@ -648,13 +649,13 @@
     return new CompositeDBCursor<Void>(cursors, true);
   }
 
-  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
+  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
       ServerState startAfterServerState)
   {
-    final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
-    if (domain != null)
+    final ServerState domainState = offlineReplicas.getServerState(baseDN);
+    if (domainState != null)
     {
-      for (CSN offlineCSN : domain)
+      for (CSN offlineCSN : domainState)
       {
         if (serverId == offlineCSN.getServerId()
             && !startAfterServerState.cover(offlineCSN))
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index 65cd39c..c04fc94 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -173,15 +173,33 @@
 
   /** Root path where the replication log is stored. */
   private final String replicationRootPath;
+  /**
+   * The current changelogState. This is in-memory version of what is inside the
+   * on-disk changelogStateDB. It improves performances in case the
+   * changelogState is read often.
+   *
+   * @GuardedBy("domainsLock")
+   */
+  private final ChangelogState changelogState;
 
   /** The list of logs that are in use. */
   private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
 
-  /** Maps each domain DN to a domain id that is used to name directory in file system. */
+  /**
+   * Maps each domain DN to a domain id that is used to name directory in file system.
+   *
+   * @GuardedBy("domainsLock")
+   */
   private final Map<DN, String> domains = new HashMap<DN, String>();
 
-  /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
-  private final Object domainLock = new Object();
+  /**
+   * Exclusive lock to synchronize:
+   * <ul>
+   * <li>the domains mapping</li>
+   * <li>changes to the in-memory changelogState</li>
+   * <li>changes to the on-disk state of a domain</li>
+   */
+  private final Object domainsLock = new Object();
 
   /** The underlying replication server. */
   private final ReplicationServer replicationServer;
@@ -203,21 +221,21 @@
   {
     this.replicationRootPath = rootPath;
     this.replicationServer = replicationServer;
+    this.changelogState = readOnDiskChangelogState();
   }
 
   /**
-   * Returns the state of the replication changelog, which includes the list of
-   * known servers and the generation id.
+   * Returns the state of the replication changelog.
    *
-   * @return the {@link ChangelogState}
+   * @return the {@link ChangelogState} read from the changelogState DB
    * @throws ChangelogException
-   *           if a problem occurs while retrieving the state.
+   *           if a database problem occurs
    */
-  ChangelogState readChangelogState() throws ChangelogException
+  ChangelogState readOnDiskChangelogState() throws ChangelogException
   {
     final ChangelogState state = new ChangelogState();
     final File changelogPath = new File(replicationRootPath);
-    synchronized (domainLock)
+    synchronized (domainsLock)
     {
       readDomainsStateFile();
       checkDomainDirectories(changelogPath);
@@ -230,6 +248,16 @@
   }
 
   /**
+   * Returns the current state of the replication changelog.
+   *
+   * @return the current {@link ChangelogState}
+   */
+  ChangelogState getChangelogState()
+  {
+    return changelogState;
+  }
+
+  /**
    * Finds or creates the log used to store changes from the replication server
    * with the given serverId and the given baseDN.
    *
@@ -256,7 +284,7 @@
       ensureRootDirectoryExists();
 
       String domainId = null;
-      synchronized (domainLock)
+      synchronized (domainsLock)
       {
         domainId = domains.get(domainDN);
         if (domainId == null)
@@ -266,9 +294,11 @@
 
         final File serverIdPath = getServerIdPath(domainId, serverId);
         ensureServerIdDirectoryExists(serverIdPath);
+        changelogState.addServerIdToDomain(serverId, domainDN);
 
         final File generationIdPath = getGenerationIdPath(domainId, generationId);
         ensureGenerationIdFileExists(generationIdPath);
+        changelogState.setDomainGenerationId(domainDN, generationId);
 
         return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
       }
@@ -333,12 +363,12 @@
    */
   void clearGenerationId(final DN domainDN) throws ChangelogException
   {
-    synchronized (domainLock)
+    synchronized (domainsLock)
     {
       final String domainId = domains.get(domainDN);
       if (domainId == null)
       {
-        return; // unknow domain => no-op
+        return; // unknown domain => no-op
       }
       final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
       if (idFile != null)
@@ -350,6 +380,7 @@
               ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
         }
       }
+      changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID);
     }
   }
 
@@ -364,16 +395,17 @@
    */
   void resetGenerationId(final DN baseDN) throws ChangelogException
   {
-    synchronized (domainLock)
+    synchronized (domainsLock)
     {
       clearGenerationId(baseDN);
       final String domainId = domains.get(baseDN);
       if (domainId == null)
       {
-        return; // unknow domain => no-op
+        return; // unknown domain => no-op
       }
       final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
       ensureGenerationIdFileExists(generationIdPath);
+      changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID);
     }
   }
 
@@ -390,12 +422,12 @@
    */
   void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
   {
-    synchronized (domainLock)
+    synchronized (domainsLock)
     {
       final String domainId = domains.get(domainDN);
       if (domainId == null)
       {
-        return; // unknow domain => no-op
+        return; // unknown domain => no-op
       }
       final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
       if (!serverIdPath.exists())
@@ -409,6 +441,7 @@
         // Overwrite file, only the last sent offline CSN is kept
         writer = newFileWriter(offlineFile);
         writer.write(offlineCSN.toString());
+        changelogState.addOfflineReplica(domainDN, offlineCSN);
       }
       catch (IOException e)
       {
@@ -435,12 +468,12 @@
    */
   void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
   {
-    synchronized (domainLock)
+    synchronized (domainsLock)
     {
       final String domainId = domains.get(domainDN);
       if (domainId == null)
       {
-        return; // unknow domain => no-op
+        return; // unknown domain => no-op
       }
       final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
       if (offlineFile.exists())
@@ -452,6 +485,7 @@
               offlineFile.getPath(), domainDN.toString(), serverId));
         }
       }
+      changelogState.removeOfflineReplica(domainDN, serverId);
     }
   }
 
@@ -497,24 +531,22 @@
   private void checkDomainDirectories(final File changelogPath) throws ChangelogException
   {
     final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
-    if (dnDirectories == null)
+    if (dnDirectories != null)
     {
-      throw new ChangelogException(ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH.get(replicationRootPath));
-    }
+      final Set<String> domainIdsFromFileSystem = new HashSet<String>();
+      for (final File dnDir : dnDirectories)
+      {
+        final String fileName = dnDir.getName();
+        final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
+        domainIdsFromFileSystem.add(domainId);
+      }
 
-    Set<String> domainIdsFromFileSystem = new HashSet<String>();
-    for (final File dnDir : dnDirectories)
-    {
-      final String fileName = dnDir.getName();
-      final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
-      domainIdsFromFileSystem.add(domainId);
-    }
-
-    Set<String> expectedDomainIds = new HashSet<String>(domains.values());
-    if (!domainIdsFromFileSystem.equals(expectedDomainIds))
-    {
-      throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
-          domainIdsFromFileSystem.toString()));
+      final Set<String> expectedDomainIds = new HashSet<String>(domains.values());
+      if (!domainIdsFromFileSystem.equals(expectedDomainIds))
+      {
+        throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
+            domainIdsFromFileSystem.toString()));
+      }
     }
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 052b0d0..deacf2a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -33,9 +33,9 @@
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -378,8 +378,7 @@
     // initialize the DB cursor and the last seen updates
     // to ensure the medium consistency CSN can move forward
     final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
-    for (Entry<DN, List<Integer>> entry
-        : changelogState.getDomainToServerIds().entrySet())
+    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
     {
       final DN baseDN = entry.getKey();
       if (!isECLEnabledDomain(baseDN))
@@ -422,12 +421,10 @@
       nextChangeForInsertDBCursor.next();
     }
 
-    for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas()
-        .entrySet())
+    final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
+    for (DN baseDN : offlineReplicas)
     {
-      final DN baseDN = entry.getKey();
-      final List<CSN> offlineCSNs = entry.getValue();
-      for (CSN offlineCSN : offlineCSNs)
+      for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
       {
         if (isECLEnabledDomain(baseDN))
         {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 9d575dc..723b682 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -39,6 +39,7 @@
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
@@ -316,7 +317,7 @@
     {
       final File dbDir = getFileForPath(config.getReplicationDBDirectory());
       dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
-      final ChangelogState changelogState = dbEnv.readChangelogState();
+      final ChangelogState changelogState = dbEnv.getChangelogState();
       initializeToChangelogState(changelogState);
       if (config.isComputeChangeNumber())
       {
@@ -341,7 +342,7 @@
     {
       replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
     }
-    for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
+    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
     {
       for (int serverId : entry.getValue())
       {
@@ -643,7 +644,7 @@
   {
     if (computeChangeNumber)
     {
-      startIndexer(dbEnv.readChangelogState());
+      startIndexer(dbEnv.getChangelogState());
     }
     else
     {
@@ -707,7 +708,7 @@
       throws ChangelogException
   {
     final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
-    final ChangelogState state = dbEnv.readChangelogState();
+    final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
     final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
     for (int serverId : serverIds)
     {
@@ -715,7 +716,7 @@
       final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
       final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
       replicaDBCursor.next();
-      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
+      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
       cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
     }
     // recycle exhausted cursors,
@@ -723,13 +724,13 @@
     return new CompositeDBCursor<Void>(cursors, true);
   }
 
-  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
+  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
       ServerState startAfterServerState)
   {
-    final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
-    if (domain != null)
+    final ServerState domainState = offlineReplicas.getServerState(baseDN);
+    if (domainState != null)
     {
-      for (CSN offlineCSN : domain)
+      for (CSN offlineCSN : domainState)
       {
         if (serverId == offlineCSN.getServerId()
             && !startAfterServerState.cover(offlineCSN))
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index cc9e286..3f5aa10 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -66,6 +66,16 @@
 {
   private Environment dbEnvironment;
   private Database changelogStateDb;
+  /**
+   * The current changelogState. This is in-memory version of what is inside the
+   * on-disk changelogStateDB. It improves performances in case the
+   * changelogState is read often.
+   *
+   * @GuardedBy("stateLock")
+   */
+  private final ChangelogState changelogState;
+  /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState */
+  private final Object stateLock = new Object();
   private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
   private ReplicationServer replicationServer;
   private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
@@ -101,6 +111,7 @@
        * of all the servers that have been seen in the past.
        */
       changelogStateDb = openDatabase("changelogstate");
+      changelogState = readOnDiskChangelogState();
     }
     catch (RuntimeException e)
     {
@@ -222,13 +233,23 @@
   }
 
   /**
-   * Read and return the list of known servers from the database.
+   * Return the current changelog state.
+   *
+   * @return the current {@link ChangelogState}
+   */
+  public ChangelogState getChangelogState()
+  {
+    return changelogState;
+  }
+
+  /**
+   * Read and return the changelog state from the database.
    *
    * @return the {@link ChangelogState} read from the changelogState DB
    * @throws ChangelogException
    *           if a database problem occurs
    */
-  public ChangelogState readChangelogState() throws ChangelogException
+  protected ChangelogState readOnDiskChangelogState() throws ChangelogException
   {
     return decodeChangelogState(readWholeState());
   }
@@ -434,8 +455,13 @@
       // Opens the DB for the changes received from this server on this domain.
       final Database replicaDB = openDatabase(replicaEntry.getKey());
 
-      putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
-      putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
+      synchronized (stateLock)
+      {
+        putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
+        changelogState.addServerIdToDomain(serverId, baseDN);
+        putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
+        changelogState.setDomainGenerationId(baseDN, generationId);
+      }
       return replicaDB;
     }
     catch (RuntimeException e)
@@ -638,9 +664,13 @@
    */
   public void clearGenerationId(DN baseDN) throws ChangelogException
   {
-    final int unusedGenId = 0;
-    deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
-        "clearGenerationId(baseDN=" + baseDN + ")");
+    synchronized (stateLock)
+    {
+      final int unusedGenId = 0;
+      deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
+          "clearGenerationId(baseDN=" + baseDN + ")");
+      changelogState.setDomainGenerationId(baseDN, unusedGenId);
+    }
   }
 
   /**
@@ -656,8 +686,12 @@
    */
   public void clearServerId(DN baseDN, int serverId) throws ChangelogException
   {
-    deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
-        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
+    synchronized (stateLock)
+    {
+      deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
+          "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
+      changelogState.setDomainGenerationId(baseDN, -1);
+    }
   }
 
   private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
@@ -721,10 +755,14 @@
   public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
       throws ChangelogException
   {
-    // just overwrite any older entry as it is assumed a newly received offline
-    // CSN is newer than the previous one
-    putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
-        "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+    synchronized (stateLock)
+    {
+      // just overwrite any older entry as it is assumed a newly received offline
+      // CSN is newer than the previous one
+      putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
+          "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+      changelogState.addOfflineReplica(baseDN, offlineCSN);
+    }
   }
 
   /**
@@ -742,8 +780,12 @@
    */
   public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
   {
-    deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
-        "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+    synchronized (stateLock)
+    {
+      deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
+          "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+      changelogState.removeOfflineReplica(baseDN, serverId);
+    }
   }
 
   private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
index de8c1f5..dab5f44 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -25,9 +25,6 @@
  */
 package org.opends.server.replication.server.changelog.file;
 
-import static org.assertj.core.api.Assertions.*;
-import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
-
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,10 +42,14 @@
 import org.opends.server.types.DN;
 import org.opends.server.util.StaticUtils;
 import org.opends.server.util.TimeThread;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
+
 @SuppressWarnings("javadoc")
 public class ReplicationEnvironmentTest extends DirectoryServerTestCase
 {
@@ -65,7 +66,13 @@
   public void setUp() throws Exception
   {
     // This test suite depends on having the schema available for DN decoding.
-    TestCaseUtils.startServer();
+    TestCaseUtils.startFakeServer();
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception
+  {
+    TestCaseUtils.shutdownFakeServer();
   }
 
   @AfterMethod
@@ -92,11 +99,13 @@
       replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
       replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
 
-      ChangelogState state = environment.readChangelogState();
+      final ChangelogState state = environment.readOnDiskChangelogState();
 
       assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
       assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2);
       assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+
+      assertThat(state).isEqualTo(environment.getChangelogState());
     }
     finally
     {
@@ -124,7 +133,7 @@
         }
       }
 
-      ChangelogState state = environment.readChangelogState();
+      final ChangelogState state = environment.readOnDiskChangelogState();
 
       assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
       for (int i = 0; i <= 2 ; i++)
@@ -135,6 +144,8 @@
           MapEntry.entry(domainDNs.get(0), 1L),
           MapEntry.entry(domainDNs.get(1), 2L),
           MapEntry.entry(domainDNs.get(2), 3L));
+
+      assertThat(state).isEqualTo(environment.getChangelogState());
     }
     finally
     {
@@ -160,9 +171,12 @@
       CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
       environment.notifyReplicaOffline(domainDN, offlineCSN);
 
-      ChangelogState state = environment.readChangelogState();
+      final ChangelogState state = environment.readOnDiskChangelogState();
 
-      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+      assertThat(state.getOfflineReplicas().getSnapshot())
+          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+
+      assertThat(state).isEqualTo(environment.getChangelogState());
     }
     finally
     {
@@ -186,7 +200,7 @@
       File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME);
       offlineStateFile.createNewFile();
 
-      environment.readChangelogState();
+      environment.readOnDiskChangelogState();
     }
     finally
     {
@@ -214,9 +228,10 @@
       CSN lastOfflineCSN = csnGenerator.newCSN();
       environment.notifyReplicaOffline(domainDN, lastOfflineCSN);
 
-      ChangelogState state = environment.readChangelogState();
-
-      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
+      final ChangelogState state = environment.readOnDiskChangelogState();
+      assertThat(state.getOfflineReplicas().getSnapshot())
+          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
+      assertThat(state).isEqualTo(environment.getChangelogState());
     }
     finally
     {
@@ -243,9 +258,9 @@
       // put server id 1 online again
       environment.notifyReplicaOnline(domainDN, SERVER_ID_1);
 
-      ChangelogState state = environment.readChangelogState();
-
+      final ChangelogState state = environment.readOnDiskChangelogState();
       assertThat(state.getOfflineReplicas()).isEmpty();
+      assertThat(state).isEqualTo(environment.getChangelogState());
     }
     finally
     {
@@ -269,12 +284,15 @@
       CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
       environment.notifyReplicaOffline(domainDN, offlineCSN);
 
-      ChangelogState state = environment.readChangelogState();
+      final ChangelogState state = environment.readOnDiskChangelogState();
 
       assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
       assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1);
       assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
-      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+      assertThat(state.getOfflineReplicas().getSnapshot())
+          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+
+      assertThat(state).isEqualTo(environment.getChangelogState());
     }
     finally
     {
@@ -299,7 +317,7 @@
       // consistency with domain state file
       StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
 
-      environment.readChangelogState();
+      environment.readOnDiskChangelogState();
     }
     finally
     {
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
index 6aac87b..6af0702 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.HashSet;
 
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
@@ -70,11 +71,16 @@
     }
 
     @Override
-    protected Database openDatabase(String databaseName)
-        throws ChangelogException, RuntimeException
+    protected Database openDatabase(String databaseName) throws ChangelogException, RuntimeException
     {
       return null;
     }
+
+    @Override
+    protected ChangelogState readOnDiskChangelogState() throws ChangelogException
+    {
+      return new ChangelogState();
+    }
   }
 
   @BeforeClass
@@ -130,8 +136,8 @@
         entry(baseDN, generationId));
     if (!replicas.isEmpty())
     {
-      assertThat(state.getDomainToServerIds()).containsExactly(
-          entry(baseDN, replicas));
+      assertThat(state.getDomainToServerIds())
+          .containsExactly(entry(baseDN, new HashSet<Integer>(replicas)));
     }
     else
     {
@@ -139,8 +145,8 @@
     }
     if (!offlineReplicas.isEmpty())
     {
-      assertThat(state.getOfflineReplicas()).containsExactly(
-          entry(baseDN, offlineReplicas));
+      assertThat(state.getOfflineReplicas().getSnapshot())
+          .containsExactly(entry(baseDN, offlineReplicas));
     }
     else
     {

--
Gitblit v1.10.0