From 2d735189c834108a2e5f7a795610372eb6d00aed Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 03 Jun 2014 08:45:08 +0000
Subject: [PATCH] OPENDJ-1467 :  File Based Changelog must support replicas temporarily leaving the topology

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                        |   17 +
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java                                     |   49 ++++
 opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java                             |  177 +++++++++++++++-
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                                  |   15 +
 opends/src/server/org/opends/server/replication/server/changelog/file/Log.java                                                |   27 +
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java |  256 +++++++++++++++++++++----
 opends/src/messages/messages/replication.properties                                                                           |   16 +
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                                           |    2 
 opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java                                    |   17 +
 opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java                                 |    6 
 10 files changed, 494 insertions(+), 88 deletions(-)

diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index e030d37..1ec6984 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -572,8 +572,8 @@
  to file system for log file '%s'
 SEVERE_ERR_CHANGELOG_UNABLE_TO_SEEK_260=Could not seek to position %d for reader \
  on log file '%s'
-SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root directory '%s' for \
- log file
+SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root \
+ directory '%s' for log file
 SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE_262=Could not decode DN \
  from domain state file '%s', from line '%s'
 SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE_263=Could not read domain state \
@@ -604,4 +604,14 @@
 SEVERE_ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \
  head log file from '%s' to '%s'
 INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \
- size of head log file is %d bytes
\ No newline at end of file
+ size of head log file is %d bytes
+SEVERE_ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \
+ offline for domain %s and server id %d because the path '%s' does not exist
+SEVERE_ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \
+ replica information for domain %s and server id %d, using path '%s' (offline CSN is %s)
+SEVERE_ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \
+ state file '%s' for domain %s, it should contain exactly one line corresponding to the offline CSN
+SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \
+ replica offline state file '%s' for domain %s
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
+ offline state file '%s' for domain %s and server id %d
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 3b9eab7..aa70fb2 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2507,7 +2507,7 @@
     {
       if (msg.isReplicaOfflineMsg())
       {
-        domainDB.replicaOffline(baseDN, msg.getCSN());
+        domainDB.notifyReplicaOffline(baseDN, msg.getCSN());
       }
       else
       {
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index f2ba666..0c8df53 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -166,8 +166,10 @@
    * @param heartbeatCSN
    *          The CSN heartbeat sent by this replica (contains the serverId and
    *          timestamp of the heartbeat)
+   * @throws ChangelogException
+   *            If a database problem happened
    */
-  void replicaHeartbeat(DN baseDN, CSN heartbeatCSN);
+  void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException;
 
   /**
    * Let the DB know this replica is going down.
@@ -186,5 +188,5 @@
    * @throws ChangelogException
    *           If a database problem happened
    */
-  void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
+  void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 36e8a7a..db0c9f3 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -672,6 +672,7 @@
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
+      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
       indexer.publishUpdateMsg(baseDN, updateMsg);
     }
     return wasCreated;
@@ -679,25 +680,35 @@
 
   /** {@inheritDoc} */
   @Override
-  public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN)
+  public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
   {
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
+      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
       indexer.publishHeartbeat(baseDN, heartbeatCSN);
     }
   }
 
+  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
+      throws ChangelogException
+  {
+    if (indexer.isReplicaOffline(baseDN, serverId))
+    {
+      replicationEnv.notifyReplicaOnline(baseDN, serverId);
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
-  public void replicaOffline(final DN baseDN, final CSN offlineCSN)
+  public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
   {
+    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
       indexer.replicaOffline(baseDN, offlineCSN);
     }
-    // TODO save this state in the changelogStateDB?
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 200c084..350b70e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -158,7 +158,7 @@
   /**
    * The last key appended to the log. In order to keep the ordering of the keys
    * in the log, any attempt to append a record with a key lower or equal to
-   * this key will silently fail.
+   * this is rejected (no error but an event is logged).
    */
   private K lastAppendedKey;
 
@@ -342,6 +342,10 @@
   /**
    * Add the provided record at the end of this log.
    * <p>
+   * The record must have a key strictly higher than the key
+   * of the last record added. If it is not the case, the record is not
+   * appended and the method returns immediately.
+   * <p>
    * In order to ensure that record is written out of buffers and persisted
    * to file system, it is necessary to explicitely call the
    * {@code syncToFileSystem()} method.
@@ -349,7 +353,7 @@
    * @param record
    *          The record to add.
    * @throws ChangelogException
-   *           If the record can't be added to the log.
+   *           If an error occurs while adding the record to the log.
    */
   public void append(final Record<K, V> record) throws ChangelogException
   {
@@ -766,7 +770,7 @@
     catch (IOException e)
     {
       throw new ChangelogException(
-          ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(HEAD_LOG_FILE_NAME, rotatedLogFile.getPath()), e);
+          ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e);
     }
   }
 
@@ -842,8 +846,8 @@
   private void openHeadLogFile() throws ChangelogException
   {
     final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser);
-    Record<K,V> newestRecord = head.getNewestRecord();
-    lastAppendedKey = newestRecord == null ? null : newestRecord.getKey();
+    final Record<K,V> newestRecord = head.getNewestRecord();
+    lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
     logFiles.put(recordParser.getMaxKey(), head);
   }
 
@@ -897,7 +901,7 @@
   /**
    * Represents a cursor than can be repositioned on a given key.
    */
-  static interface RepositionableCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
+  static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>>
   {
     /**
      * Position the cursor to the record corresponding to the provided key or to
@@ -1011,9 +1015,9 @@
         if (key != null)
         {
           boolean isFound = currentCursor.positionTo(key, findNearest);
-          if (isFound && getRecord() == null)
+          if (isFound && getRecord() == null && !log.isHeadLogFile(currentLogFile))
           {
-            // The key to position to may be in the next file, force the switch
+            // The key to position is probably in the next file, force the switch
             isFound = next();
           }
           return isFound;
@@ -1047,6 +1051,13 @@
       currentLogFile = logFile;
       currentCursor = currentLogFile.getCursor();
     }
+
+    /** {@inheritDoc} */
+    public String toString()
+    {
+      return String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
+          log.logPath, currentLogFile.getFile().getName(), currentCursor);
+    }
   }
 
   /** An empty cursor, that always return null records and false to {@code next()} method. */
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index ee69dd7..1e658c7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -30,10 +30,12 @@
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
 import java.io.Writer;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -77,15 +79,21 @@
  * <ul>
  * <li>A "generation_[id].id" file, where [id] is the generation id</li>
  * <li>One directory per server id, named after "[id].server" where [id] is the
- * id of the server. Each directory contains the log files for the given server
- * id.</li>
+ * id of the server.</li>
  * </ul>
- * All log files end with the ".log" suffix. Log files always include the "head.log"
- * file and optionally zero to many read-only log files named after the lowest key
- * and highest key present in the log file.
+ * Each server id directory contains the following files :
+ * <ul>
+ * <li>The "head.log" file, which is the more recent log file where records are appended.</li>
+ * <li>Zero to many read-only log files named after the lowest key
+ * and highest key present in the log file (they all end with the ".log" suffix.</li>
+ * <li>Optionally, a "offline.state" file that indicates that this particular server id
+ *  of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li>
+ * </ul>
+ * See {@code Log} class for details on the log files.
+ *
  * <p>
  * Layout example with two domains "o=test1" and "o=test2", each having server
- * ids 22 and 33 :
+ * ids 22 and 33, with server id 33 for domain "o=test1" being offline :
  *
  * <pre>
  * +---changelog
@@ -96,15 +104,16 @@
  * |   \---1.domain
  * |       \---generation1.id
  * |       \---22.server
- * |           \---head.log
+ * |           \---head.log [contains last records written]
  * |       \---33.server
- * |           \---head.log
+ * |           \---head.log [contains last records written]
+ *             \---offline.state
  * |   \---2.domain
  * |       \---generation1.id
  * |       \---22.server
- * |           \---head.log
+ * |           \---head.log [contains last records written]
  * |       \---33.server
- * |           \---head.log
+ * |           \---head.log [contains last records written]
  * </pre>
  */
 class ReplicationEnvironment
@@ -120,6 +129,8 @@
 
   private static final String DOMAINS_STATE_FILENAME = "domains.state";
 
+  static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
+
   private static final String DOMAIN_STATE_SEPARATOR = ":";
 
   private static final String DOMAIN_SUFFIX = ".domain";
@@ -130,6 +141,8 @@
 
   private static final String GENERATION_ID_FILE_SUFFIX = ".id";
 
+  private static final String UTF8_ENCODING = "UTF-8";
+
   private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter()
   {
     @Override
@@ -169,7 +182,6 @@
   private final Map<DN, String> domains = new HashMap<DN, String>();
 
   /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
-  // TODO : review the usefulness of this lock
   private final Object domainLock = new Object();
 
   /** The underlying replication server. */
@@ -358,6 +370,77 @@
     }
   }
 
+  /**
+   * Notify that the replica corresponding to provided domain and provided CSN
+   * is offline.
+   *
+   * @param domainDN
+   *          the domain of the offline replica
+   * @param offlineCSN
+   *          the offline replica serverId and offline timestamp
+   * @throws ChangelogException
+   *           if a problem occurs
+   */
+  void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
+  {
+    synchronized (domainLock)
+    {
+      final String domainId = domains.get(domainDN);
+      final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
+      if (!serverIdPath.exists())
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get(
+            domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath()));
+      }
+      final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+      Writer writer = null;
+      try
+      {
+        // Overwrite file, only the last sent offline CSN is kept
+        writer = newFileWriter(offlineFile);
+        writer.write(offlineCSN.toString());
+      }
+      catch (IOException e)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get(
+            domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e);
+      }
+      finally
+      {
+        StaticUtils.close(writer);
+      }
+    }
+  }
+
+  /**
+   * Notify that the replica corresponding to provided domain and server id
+   * is online.
+   *
+   * @param domainDN
+   *          the domain of the replica
+   * @param serverId
+   *          the replica serverId
+   * @throws ChangelogException
+   *           if a problem occurs
+   */
+  void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
+  {
+    synchronized (domainLock)
+    {
+      final String domainId = domains.get(domainDN);
+      final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
+      if (offlineFile.exists())
+      {
+        final boolean isDeleted = offlineFile.delete();
+        if (!isDeleted)
+        {
+          throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get(
+              offlineFile.getPath(), domainDN.toString(), serverId));
+        }
+      }
+    }
+  }
+
   /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
   private void readDomainsStateFile() throws ChangelogException
   {
@@ -368,7 +451,7 @@
       String line = null;
       try
       {
-        reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8"));
+        reader = newFileReader(domainsStateFile);
         while ((line = reader.readLine()) != null)
         {
           final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
@@ -425,7 +508,7 @@
    * Update the changelog state with the state corresponding to the provided
    * domain DN.
    */
-  private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState result)
+  private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state)
       throws ChangelogException
   {
     final File domainDirectory = getDomainPath(domainEntry.getValue());
@@ -436,7 +519,7 @@
           replicationRootPath, domainDirectory.getPath()));
     }
     final DN domainDN = domainEntry.getKey();
-    result.setDomainGenerationId(domainDN, toGenerationId(generationId));
+    state.setDomainGenerationId(domainDN, toGenerationId(generationId));
 
     final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
     if (serverIds == null)
@@ -446,7 +529,43 @@
     }
     for (final File serverId : serverIds)
     {
-      result.addServerIdToDomain(toServerId(serverId.getName()), domainDN);
+      readStateForServerId(domainDN, serverId, state);
+    }
+  }
+
+  private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException
+  {
+    state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN);
+
+    final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+    if (offlineFile.exists())
+    {
+      final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN);
+      state.addOfflineReplica(domainDN, offlineCSN);
+    }
+  }
+
+  private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException
+  {
+    BufferedReader reader = null;
+    try
+    {
+      reader = newFileReader(offlineFile);
+      String line = reader.readLine();
+      if (line == null || reader.readLine() != null)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get(
+            domainDN.toString(), offlineFile.getPath()));
+      }
+      return new CSN(line);
+    }
+    catch(IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get(
+          domainDN.toString(), offlineFile.getPath()), e);
+    }
+    finally {
+      StaticUtils.close(reader);
     }
   }
 
@@ -458,13 +577,13 @@
     Writer writer = null;
     try
     {
-      writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(domainsStateFile), "UTF-8"));
+      writer = newFileWriter(domainsStateFile);
       for (final Entry<DN, String> entry : domains.entrySet())
       {
         writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey()));
       }
     }
-    catch (Exception e)
+    catch (IOException e)
     {
       throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId,
           domainDN.toString(), domainsStateFile.getPath()), e);
@@ -561,7 +680,17 @@
     return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
   }
 
-  private File getServerIdPath(final String domainId, final int serverId)
+  /**
+   * Return the path for the provided domain id and server id.
+   * Package private to be usable in tests.
+   *
+   * @param domainId
+   *            The id corresponding to a domain DN
+   * @param serverId
+   *            The server id to retrieve
+   * @return the path
+   */
+  File getServerIdPath(final String domainId, final int serverId)
   {
     return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX);
   }
@@ -673,4 +802,16 @@
       throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e);
     }
   }
+
+  /** Returns a buffered writer on the provided file. */
+  private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException
+  {
+    return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING));
+  }
+
+  /** Returns a buffered reader on the provided file. */
+  private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException
+  {
+    return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING));
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 628ca99..2419dc4 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -187,6 +187,21 @@
   }
 
   /**
+   * Indicates if the replica corresponding to provided domain DN and server id
+   * is offline.
+   *
+   * @param domainDN
+   *          base DN of the replica
+   * @param serverId
+   *          server id of the replica
+   * @return {@code true} if replica is offline, {@code false} otherwise
+   */
+  public boolean isReplicaOffline(DN domainDN, int serverId)
+  {
+    return replicasOffline.getCSN(domainDN, serverId) != null;
+  }
+
+  /**
    * Ensures the medium consistency point is updated by UpdateMsg.
    *
    * @param baseDN
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index a9f69ba..e1dbcd3 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -756,6 +756,7 @@
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
+      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
       indexer.publishUpdateMsg(baseDN, updateMsg);
     }
     return wasCreated;
@@ -763,21 +764,31 @@
 
   /** {@inheritDoc} */
   @Override
-  public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
+  public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException
   {
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
+      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
       indexer.publishHeartbeat(baseDN, heartbeatCSN);
     }
   }
 
+  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
+      throws ChangelogException
+  {
+    if (indexer.isReplicaOffline(baseDN, serverId))
+    {
+      dbEnv.notifyReplicaOnline(baseDN, serverId);
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
-  public void replicaOffline(DN baseDN, CSN offlineCSN)
+  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
       throws ChangelogException
   {
-    dbEnv.addOfflineReplica(baseDN, offlineCSN);
+    dbEnv.notifyReplicaOffline(baseDN, offlineCSN);
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index c259985..df5d0c6 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -505,14 +505,32 @@
    */
   static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
   {
-    final byte[] key =
-        toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
-            + FIELD_SEPARATOR + baseDN.toNormalizedString());
+    final byte[] key = toReplicaOfflineKey(baseDN, offlineCSN.getServerId());
     final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
     data.append(offlineCSN.getTime());
     return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
   }
 
+  /**
+   * Return the key for a replica offline entry in the changelog state database.
+   *
+   * @param baseDN
+   *          the replica's baseDN
+   * @param serverId
+   *          the replica's serverId
+   * @return the key used in the database to store offline time of the replica
+   */
+  private static byte[] toReplicaOfflineKey(DN baseDN, int serverId)
+  {
+    return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedString());
+  }
+
+  /** Returns an entry with the provided key and a null value. */
+  private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key)
+  {
+    return new SimpleImmutableEntry<byte[], byte[]>(key, null);
+  }
+
   private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
       throws ChangelogException, RuntimeException
   {
@@ -689,7 +707,9 @@
   }
 
   /**
-   * Add the information about an offline replica to the changelog state DB.
+   * Notify that replica is offline.
+   * <p>
+   * This information is stored in the changelog state DB.
    *
    * @param baseDN
    *          the domain of the offline replica
@@ -698,7 +718,7 @@
    * @throws ChangelogException
    *           if a database problem occurred
    */
-  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
+  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
       throws ChangelogException
   {
     // just overwrite any older entry as it is assumed a newly received offline
@@ -707,6 +727,25 @@
         "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
   }
 
+  /**
+   * Notify that replica is online.
+   * <p>
+   * Update the changelog state DB if necessary (ie, replica was known to be
+   * offline).
+   *
+   * @param baseDN
+   *          the domain of replica
+   * @param serverId
+   *          the serverId of replica
+   * @throws ChangelogException
+   *           if a database problem occurred
+   */
+  public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
+  {
+    deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
+        "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+  }
+
   private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
       String methodInvocation) throws ChangelogException
   {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
index 8f3e173..de8c1f5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -26,8 +26,8 @@
 package org.opends.server.replication.server.changelog.file;
 
 import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
 
-import java.io.Closeable;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -37,12 +37,14 @@
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.CSNGenerator;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.types.DN;
 import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -50,6 +52,9 @@
 @SuppressWarnings("javadoc")
 public class ReplicationEnvironmentTest extends DirectoryServerTestCase
 {
+  private static final int SERVER_ID_1 = 1;
+  private static final int SERVER_ID_2 = 2;
+
   private static final String DN1_AS_STRING = "cn=test1,dc=company.com";
   private static final String DN2_AS_STRING = "cn=te::st2,dc=company.com";
   private static final String DN3_AS_STRING = "cn=test3,dc=company.com";
@@ -74,70 +79,231 @@
   }
 
   @Test
-  public void testCreateThenReadChangelogStateWithSingleDN() throws Exception
+  public void testReadChangelogStateWithSingleDN() throws Exception
   {
-    final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
-    final DN domainDN = DN.decode(DN1_AS_STRING);
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.decode(DN1_AS_STRING);
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
 
-    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
-    Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
-    Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
-    Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
-    StaticUtils.close(cnDB, replicaDB, replicaDB2);
+      ChangelogState state = environment.readChangelogState();
 
-    ChangelogState state = environment.readChangelogState();
-
-    assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
-    assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(1, 2);
-    assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+      assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
+      assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2);
+      assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB, replicaDB2);
+    }
   }
 
   @Test
-  public void testCreateThenReadChangelogStateWithMultipleDN() throws Exception
+  public void testReadChangelogStateWithMultipleDN() throws Exception
   {
-    File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
-    List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING));
-
-    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
-    Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
     List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>();
-    for (int i = 0; i <= 2 ; i++)
+    try
     {
-      for (int j = 1; j <= 10; j++)
+      File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING));
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      for (int i = 0; i <= 2 ; i++)
       {
-        replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1));
+        for (int j = 1; j <= 10; j++)
+        {
+          // 3 domains, 10 server id each, generation id is different for each domain
+          replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1));
+        }
       }
+
+      ChangelogState state = environment.readChangelogState();
+
+      assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
+      for (int i = 0; i <= 2 ; i++)
+      {
+        assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+      }
+      assertThat(state.getDomainToGenerationId()).containsOnly(
+          MapEntry.entry(domainDNs.get(0), 1L),
+          MapEntry.entry(domainDNs.get(1), 2L),
+          MapEntry.entry(domainDNs.get(2), 3L));
     }
-    StaticUtils.close(cnDB);
-    StaticUtils.close(replicaDBs.toArray(new Closeable[] {}));
-
-    ChangelogState state = environment.readChangelogState();
-
-    assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
-    for (int i = 0; i <= 2 ; i++)
+    finally
     {
-      assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+      StaticUtils.close(cnDB);
+      StaticUtils.close(replicaDBs);
     }
-    assertThat(state.getDomainToGenerationId()).containsOnly(
-        MapEntry.entry(domainDNs.get(0), 1L),
-        MapEntry.entry(domainDNs.get(1), 2L),
-        MapEntry.entry(domainDNs.get(2), 3L));
+  }
+
+  @Test
+  public void testReadChangelogStateWithReplicaOffline() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.decode(DN1_AS_STRING);
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+      // put server id 1 offline
+      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
+      environment.notifyReplicaOffline(domainDN, offlineCSN);
+
+      ChangelogState state = environment.readChangelogState();
+
+      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB);
+    }
+  }
+
+  @Test(expectedExceptions=ChangelogException.class)
+  public void testReadChangelogStateWithReplicaOfflineStateFileCorrupted() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.decode(DN1_AS_STRING);
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+      File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME);
+      offlineStateFile.createNewFile();
+
+      environment.readChangelogState();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB);
+    }
+  }
+
+  @Test
+  public void testReadChangelogStateWithReplicaOfflineSentTwice() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.decode(DN1_AS_STRING);
+
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+      // put server id 1 offline twice
+      CSNGenerator csnGenerator = new CSNGenerator(SERVER_ID_1, 100);
+      environment.notifyReplicaOffline(domainDN, csnGenerator.newCSN());
+      CSN lastOfflineCSN = csnGenerator.newCSN();
+      environment.notifyReplicaOffline(domainDN, lastOfflineCSN);
+
+      ChangelogState state = environment.readChangelogState();
+
+      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB);
+    }
+  }
+
+  @Test
+  public void testReadChangelogStateWithReplicaOfflineThenReplicaOnline() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.decode(DN1_AS_STRING);
+
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+      // put server id 1 offline
+      environment.notifyReplicaOffline(domainDN, new CSN(TimeThread.getTime(), 0, SERVER_ID_1));
+      // put server id 1 online again
+      environment.notifyReplicaOnline(domainDN, SERVER_ID_1);
+
+      ChangelogState state = environment.readChangelogState();
+
+      assertThat(state.getOfflineReplicas()).isEmpty();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB);
+    }
+  }
+
+  @Test
+  public void testCreateThenReadChangelogStateWithReplicaOffline() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.decode(DN1_AS_STRING);
+
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
+      environment.notifyReplicaOffline(domainDN, offlineCSN);
+
+      ChangelogState state = environment.readChangelogState();
+
+      assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
+      assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1);
+      assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+      assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB);
+    }
   }
 
   @Test(expectedExceptions=ChangelogException.class)
   public void testMissingDomainDirectory() throws Exception
   {
-    File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
-    DN domainDN = DN.decode(DN1_AS_STRING);
-    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
-    Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
-    Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
-    StaticUtils.close(replicaDB, replicaDB2);
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
+    try
+    {
+      File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      DN domainDN = DN.decode(DN1_AS_STRING);
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
 
-    // delete the domain directory created for the 2 replica DBs to break the
-    // consistency with domain state file
-    StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
+      // delete the domain directory created for the 2 replica DBs to break the
+      // consistency with domain state file
+      StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
 
-    environment.readChangelogState();
+      environment.readChangelogState();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB, replicaDB2);
+    }
   }
 }

--
Gitblit v1.10.0