From 46e1cd9b8a361103d3591fda2ba687de8c51964c Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 22 Sep 2014 09:55:15 +0000
Subject: [PATCH] OPENDJ-1467 :  File Based Changelog must support replicas temporarily leaving the topology

---
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java           |    2 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java        |   15 ++++++-
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java |    2 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java  |   15 +++++++
 opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties                                           |   16 ++++++-
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java     |   49 ++++++++++++++++++++++--
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java    |   16 ++++----
 7 files changed, 95 insertions(+), 20 deletions(-)

diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
index 4178833..b6b55c2 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
@@ -574,8 +574,8 @@
  to file system for log file '%s'
 ERR_CHANGELOG_UNABLE_TO_SEEK_260=Could not seek to position %d for reader \
  on log file '%s'
-ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root directory '%s' for \
- log file
+ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root \
+ directory '%s' for log file
 ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE_262=Could not decode DN \
  from domain state file '%s', from line '%s'
 ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE_263=Could not read domain state \
@@ -606,4 +606,14 @@
 ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \
  head log file from '%s' to '%s'
 INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \
- size of head log file is %d bytes
\ No newline at end of file
+ size of head log file is %d bytes
+ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \
+ offline for domain %s and server id %d because the path '%s' does not exist
+ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \
+ replica information for domain %s and server id %d, using path '%s' (offline CSN is %s)
+ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \
+ state file '%s' for domain %s, it should contain exactly one line corresponding to the offline CSN
+ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \
+ replica offline state file '%s' for domain %s
+ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
+ offline state file '%s' for domain %s and server id %d
\ No newline at end of file
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 644b994..749b0e1 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -496,7 +496,7 @@
       if (updateMsg instanceof ReplicaOfflineMsg)
       {
         final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg;
-        this.domainDB.replicaOffline(baseDN, offlineMsg.getCSN());
+        this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN());
         return true;
       }
 
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 011a7c4..e275020 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -225,5 +225,5 @@
    * @throws ChangelogException
    *           If a database problem happened
    */
-  void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
+  void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
 }
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index b28a4bd..ad0f2fe 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -138,14 +138,6 @@
 
   /** {@inheritDoc} */
   @Override
-  public void replicaOffline(DN baseDN, CSN offlineCSN)
-      throws ChangelogException
-  {
-    throw new RuntimeException("Not implemented");
-  }
-
-  /** {@inheritDoc} */
-  @Override
   public void initializeDB()
   {
     throw new RuntimeException("Not implemented");
@@ -182,6 +174,14 @@
 
   /** {@inheritDoc} */
   @Override
+  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
+      throws ChangelogException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
   public ChangeNumberIndexDB getChangeNumberIndexDB()
   {
     throw new RuntimeException("Not implemented");
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 48ebc5f..ac97e20 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -174,6 +174,21 @@
   }
 
   /**
+   * Indicates if the replica corresponding to provided domain DN and server id
+   * is offline.
+   *
+   * @param domainDN
+   *          base DN of the replica
+   * @param serverId
+   *          server id of the replica
+   * @return {@code true} if replica is offline, {@code false} otherwise
+   */
+  public boolean isReplicaOffline(DN domainDN, int serverId)
+  {
+    return replicasOffline.getCSN(domainDN, serverId) != null;
+  }
+
+  /**
    * Ensures the medium consistency point is updated by UpdateMsg.
    *
    * @param baseDN
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index ceca244..b39b6d5 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -827,6 +827,7 @@
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
+      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
       indexer.publishUpdateMsg(baseDN, updateMsg);
     }
     return pair.getSecond(); // replica DB was created
@@ -839,15 +840,25 @@
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
+      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
       indexer.publishHeartbeat(baseDN, heartbeatCSN);
     }
   }
 
+  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
+      throws ChangelogException
+  {
+    if (indexer.isReplicaOffline(baseDN, serverId))
+    {
+      replicationEnv.notifyReplicaOnline(baseDN, serverId);
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
-  public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
+  public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
   {
-    replicationEnv.addOfflineReplica(baseDN, offlineCSN);
+    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index 217dd56..33330bf 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -530,14 +530,32 @@
    */
   static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
   {
-    final byte[] key =
-        toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
-            + FIELD_SEPARATOR + baseDN.toNormalizedString());
+    final byte[] key = toReplicaOfflineKey(baseDN, offlineCSN.getServerId());
     final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
     data.append(offlineCSN.getTime());
     return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
   }
 
+  /**
+   * Return the key for a replica offline entry in the changelog state database.
+   *
+   * @param baseDN
+   *          the replica's baseDN
+   * @param serverId
+   *          the replica's serverId
+   * @return the key used in the database to store offline time of the replica
+   */
+  private static byte[] toReplicaOfflineKey(DN baseDN, int serverId)
+  {
+    return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedString());
+  }
+
+  /** Returns an entry with the provided key and a null value. */
+  private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key)
+  {
+    return new SimpleImmutableEntry<byte[], byte[]>(key, null);
+  }
+
   private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
       throws ChangelogException, RuntimeException
   {
@@ -722,7 +740,9 @@
   }
 
   /**
-   * Add the information about an offline replica to the changelog state DB.
+   * Notify that replica is offline.
+   * <p>
+   * This information is stored in the changelog state DB.
    *
    * @param baseDN
    *          the domain of the offline replica
@@ -731,7 +751,7 @@
    * @throws ChangelogException
    *           if a database problem occurred
    */
-  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
+  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
       throws ChangelogException
   {
     synchronized (stateLock)
@@ -744,6 +764,25 @@
     }
   }
 
+  /**
+   * Notify that replica is online.
+   * <p>
+   * Update the changelog state DB if necessary (ie, replica was known to be
+   * offline).
+   *
+   * @param baseDN
+   *          the domain of replica
+   * @param serverId
+   *          the serverId of replica
+   * @throws ChangelogException
+   *           if a database problem occurred
+   */
+  public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
+  {
+    deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
+        "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+  }
+
   private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
       String methodInvocation) throws ChangelogException
   {

--
Gitblit v1.10.0