From 7ea8ac48d10e033ba0d6ca0ec0d66ace144062a0 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 07 Jul 2014 15:29:48 +0000
Subject: [PATCH] OPENDJ-1453 Replica offline messages should be synced with updates

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java |   72 ++++++++++++++++++++++++++++-------
 1 files changed, 57 insertions(+), 15 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index cc9e286..3f5aa10 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -66,6 +66,16 @@
 {
   private Environment dbEnvironment;
   private Database changelogStateDb;
+  /**
+   * The current changelogState. This is in-memory version of what is inside the
+   * on-disk changelogStateDB. It improves performances in case the
+   * changelogState is read often.
+   *
+   * @GuardedBy("stateLock")
+   */
+  private final ChangelogState changelogState;
+  /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState */
+  private final Object stateLock = new Object();
   private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
   private ReplicationServer replicationServer;
   private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
@@ -101,6 +111,7 @@
        * of all the servers that have been seen in the past.
        */
       changelogStateDb = openDatabase("changelogstate");
+      changelogState = readOnDiskChangelogState();
     }
     catch (RuntimeException e)
     {
@@ -222,13 +233,23 @@
   }
 
   /**
-   * Read and return the list of known servers from the database.
+   * Return the current changelog state.
+   *
+   * @return the current {@link ChangelogState}
+   */
+  public ChangelogState getChangelogState()
+  {
+    return changelogState;
+  }
+
+  /**
+   * Read and return the changelog state from the database.
    *
    * @return the {@link ChangelogState} read from the changelogState DB
    * @throws ChangelogException
    *           if a database problem occurs
    */
-  public ChangelogState readChangelogState() throws ChangelogException
+  protected ChangelogState readOnDiskChangelogState() throws ChangelogException
   {
     return decodeChangelogState(readWholeState());
   }
@@ -434,8 +455,13 @@
       // Opens the DB for the changes received from this server on this domain.
       final Database replicaDB = openDatabase(replicaEntry.getKey());
 
-      putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
-      putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
+      synchronized (stateLock)
+      {
+        putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
+        changelogState.addServerIdToDomain(serverId, baseDN);
+        putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
+        changelogState.setDomainGenerationId(baseDN, generationId);
+      }
       return replicaDB;
     }
     catch (RuntimeException e)
@@ -638,9 +664,13 @@
    */
   public void clearGenerationId(DN baseDN) throws ChangelogException
   {
-    final int unusedGenId = 0;
-    deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
-        "clearGenerationId(baseDN=" + baseDN + ")");
+    synchronized (stateLock)
+    {
+      final int unusedGenId = 0;
+      deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
+          "clearGenerationId(baseDN=" + baseDN + ")");
+      changelogState.setDomainGenerationId(baseDN, unusedGenId);
+    }
   }
 
   /**
@@ -656,8 +686,12 @@
    */
   public void clearServerId(DN baseDN, int serverId) throws ChangelogException
   {
-    deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
-        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
+    synchronized (stateLock)
+    {
+      deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
+          "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
+      changelogState.setDomainGenerationId(baseDN, -1);
+    }
   }
 
   private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
@@ -721,10 +755,14 @@
   public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
       throws ChangelogException
   {
-    // just overwrite any older entry as it is assumed a newly received offline
-    // CSN is newer than the previous one
-    putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
-        "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+    synchronized (stateLock)
+    {
+      // just overwrite any older entry as it is assumed a newly received offline
+      // CSN is newer than the previous one
+      putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
+          "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+      changelogState.addOfflineReplica(baseDN, offlineCSN);
+    }
   }
 
   /**
@@ -742,8 +780,12 @@
    */
   public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
   {
-    deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
-        "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+    synchronized (stateLock)
+    {
+      deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
+          "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+      changelogState.removeOfflineReplica(baseDN, serverId);
+    }
   }
 
   private void putInChangelogStateDB(Entry<byte[], byte[]> entry,

--
Gitblit v1.10.0