From de36fa06856d8d04652401bb24e49c3259aef154 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 10:26:42 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3443) Make the Medium Consistency Point support replicas temporarily leaving the topology

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java |  142 ++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 121 insertions(+), 21 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index e4d7283..c259985 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -38,9 +38,12 @@
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 
@@ -67,6 +70,7 @@
   private ReplicationServer replicationServer;
   private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
   private static final String GENERATION_ID_TAG = "GENID";
+  private static final String OFFLINE_TAG = "OFFLINE";
   private static final String FIELD_SEPARATOR = " ";
   /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
@@ -268,6 +272,19 @@
           }
           result.setDomainGenerationId(baseDN, generationId);
         }
+        else if (prefix.equals(OFFLINE_TAG))
+        {
+          final String[] str = stringKey.split(FIELD_SEPARATOR, 3);
+          final int serverId = toInt(str[1]);
+          final DN baseDN = DN.decode(str[2]);
+          long timestamp = ByteString.wrap(entry.getValue()).asReader().getLong();
+          if (debugEnabled())
+          {
+            debug("has read replica offline: baseDN=" + baseDN + " serverId="
+                + serverId);
+          }
+          result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId));
+        }
         else
         {
           final String[] str = stringData.split(FIELD_SEPARATOR, 2);
@@ -275,7 +292,8 @@
           final DN baseDN = DN.decode(str[1]);
           if (debugEnabled())
           {
-            debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId);
+            debug("has read replica: baseDN=" + baseDN + " serverId="
+                + serverId);
           }
           result.addServerIdToDomain(serverId, baseDN);
         }
@@ -416,7 +434,7 @@
       // Opens the DB for the changes received from this server on this domain.
       final Database replicaDB = openDatabase(replicaEntry.getKey());
 
-      putInChangelogStateDBIfNotExist(replicaEntry);
+      putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
       putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
       return replicaDB;
     }
@@ -436,7 +454,7 @@
    *          the replica's serverId
    * @return a database entry for the replica
    */
-  Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
+  static Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
   {
     final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
     return new SimpleImmutableEntry<String, String>(key, key);
@@ -452,30 +470,65 @@
    *          the domain's generationId
    * @return a database entry for the generationId
    */
-  Entry<String, String> toGenIdEntry(DN baseDN, long generationId)
+  static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId)
   {
     final String normDn = baseDN.toNormalizedString();
     final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn;
     final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
         + FIELD_SEPARATOR + normDn;
-    return new SimpleImmutableEntry<String, String>(key, data);
+    return new SimpleImmutableEntry<byte[], byte[]>(toBytes(key),toBytes(data));
   }
 
-  private void putInChangelogStateDBIfNotExist(Entry<String, String> entry)
-      throws RuntimeException
+  /**
+   * Converts an Entry&lt;String, String&gt; to an Entry&lt;byte[], byte[]&gt;.
+   *
+   * @param entry
+   *          the entry to convert
+   * @return the converted entry
+   */
+  static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry)
   {
-    DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
+    return new SimpleImmutableEntry<byte[], byte[]>(
+        toBytes(entry.getKey()),
+        toBytes(entry.getValue()));
+  }
+
+  /**
+   * Return an entry to store in the changelog state database representing the
+   * time a replica went offline.
+   *
+   * @param baseDN
+   *          the replica's baseDN
+   * @param offlineCSN
+   *          the replica's serverId and offline timestamp
+   * @return a database entry representing the time a replica went offline
+   */
+  static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
+  {
+    final byte[] key =
+        toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
+            + FIELD_SEPARATOR + baseDN.toNormalizedString());
+    final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
+    data.append(offlineCSN.getTime());
+    return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
+  }
+
+  private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
+      throws ChangelogException, RuntimeException
+  {
+    DatabaseEntry key = new DatabaseEntry(entry.getKey());
     DatabaseEntry data = new DatabaseEntry();
     if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
     {
       Transaction txn = dbEnvironment.beginTransaction(null, null);
       try
       {
-        data.setData(toBytes(entry.getValue()));
+        data.setData(entry.getValue());
         if (debugEnabled())
         {
           debug("putting record in the changelogstate Db key=["
-              + entry.getKey() + "] value=[" + entry.getValue() + "]");
+              + toString(entry.getKey()) + "] value=["
+              + toString(entry.getValue()) + "]");
         }
         changelogStateDb.put(txn, key, data);
         txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
@@ -585,11 +638,11 @@
    */
   public void clearServerId(DN baseDN, int serverId) throws ChangelogException
   {
-    deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId),
+    deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
         "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
   }
 
-  private void deleteFromChangelogStateDB(Entry<String, ?> entry,
+  private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
       String methodInvocation) throws ChangelogException
   {
     if (debugEnabled())
@@ -599,7 +652,7 @@
 
     try
     {
-      final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
+      final DatabaseEntry key = new DatabaseEntry(entry.getKey());
       final DatabaseEntry data = new DatabaseEntry();
       if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
       {
@@ -620,18 +673,65 @@
           throw dbe;
         }
       }
-      else
+      else if (debugEnabled())
       {
-        if (debugEnabled())
-        {
-          debug(methodInvocation + " failed: key=[ " + entry.getKey()
-              + "] not found");
-        }
+        debug(methodInvocation + " failed: key not found");
       }
     }
-    catch (RuntimeException dbe)
+    catch (RuntimeException e)
     {
-      throw new ChangelogException(dbe);
+      if (debugEnabled())
+      {
+        debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
+      }
+      throw new ChangelogException(e);
+    }
+  }
+
+  /**
+   * Add the information about an offline replica to the changelog state DB.
+   *
+   * @param baseDN
+   *          the domain of the offline replica
+   * @param offlineCSN
+   *          the offline replica serverId and offline timestamp
+   * @throws ChangelogException
+   *           if a database problem occurred
+   */
+  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
+      throws ChangelogException
+  {
+    // just overwrite any older entry as it is assumed a newly received offline
+    // CSN is newer than the previous one
+    putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
+        "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+  }
+
+  private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
+      String methodInvocation) throws ChangelogException
+  {
+    if (debugEnabled())
+    {
+      debug(methodInvocation + " starting");
+    }
+
+    try
+    {
+      final DatabaseEntry key = new DatabaseEntry(entry.getKey());
+      final DatabaseEntry data = new DatabaseEntry(entry.getValue());
+      changelogStateDb.put(null, key, data);
+      if (debugEnabled())
+      {
+        debug(methodInvocation + " succeeded");
+      }
+    }
+    catch (RuntimeException e)
+    {
+      if (debugEnabled())
+      {
+        debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
+      }
+      throw new ChangelogException(e);
     }
   }
 

--
Gitblit v1.10.0