From 40c19698a7c27ef73ae26439a962c62e373813a8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Dec 2013 10:43:35 +0000
Subject: [PATCH] OPENDJ-1231 (CR-2724) Make the Medium Consistency Point support replica heartbeats

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java |   13 ++
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java        |   41 +++++--
 opends/src/messages/messages/replication.properties                                                 |   10 +
 opends/src/server/org/opends/server/replication/common/ServerState.java                             |  207 ++++++++++++++++-------------------------
 4 files changed, 129 insertions(+), 142 deletions(-)

diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index bc9d697..5d66ce7 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -527,4 +527,12 @@
  for DS(%d) to connect to because it was the only one standing after all tests
 NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d)
 SEVERE_ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN_235=Could not \
- create replica database because the changelog database is shutting down
\ No newline at end of file
+ create replica database because the changelog database is shutting down
+FATAL_ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION_236=An unexpected error \
+ forced the %s thread to shutdown: %s. \
+ The changeNumber attribute will not move forward anymore. \
+ You can reenable this thread by first setting the "compute-change-number" \
+ property to false and then back to true
+FATAL_ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ_237=Aborting initialization: \
+ expected the newest change number index record CSN '%s' to be equal to \
+ the CSN read from the replica DBs '%s'
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 761d21c..81cb59d 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -30,11 +30,14 @@
 import java.io.UnsupportedEncodingException;
 import java.util.*;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.zip.DataFormatException;
 
 import org.opends.server.protocols.asn1.ASN1Writer;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.types.ByteString;
+import org.opends.server.util.StaticUtils;
 
 /**
  * This class is used to associate serverIds with {@link CSN}s.
@@ -46,7 +49,8 @@
 {
 
   /** Associates a serverId with a CSN. */
-  private final Map<Integer, CSN> serverIdToCSN = new HashMap<Integer, CSN>();
+  private final ConcurrentMap<Integer, CSN> serverIdToCSN =
+      new ConcurrentSkipListMap<Integer, CSN>();
   /**
    * Whether the state has been saved to persistent storage. It starts at true,
    * and moves to false when an update is made to the current object.
@@ -68,10 +72,7 @@
    */
   public void clear()
   {
-    synchronized (serverIdToCSN)
-    {
-      serverIdToCSN.clear();
-    }
+    serverIdToCSN.clear();
   }
 
 
@@ -154,14 +155,27 @@
 
     saved = false;
 
-    synchronized (serverIdToCSN)
+    final int serverId = csn.getServerId();
+    while (true)
     {
-      final int serverId = csn.getServerId();
       final CSN existingCSN = serverIdToCSN.get(serverId);
-      if (existingCSN == null || csn.isNewerThan(existingCSN))
+      if (existingCSN == null)
       {
-        serverIdToCSN.put(serverId, csn);
-        return true;
+        if (serverIdToCSN.putIfAbsent(serverId, csn) == null)
+        {
+          return true;
+        }
+        // oops, a concurrent modification happened, run the same process again
+        continue;
+      }
+      else if (csn.isNewerThan(existingCSN))
+      {
+        if (serverIdToCSN.replace(serverId, existingCSN, csn))
+        {
+          return true;
+        }
+        // oops, a concurrent modification happened, run the same process again
+        continue;
       }
       return false;
     }
@@ -203,19 +217,10 @@
     if (expectedCSN == null)
       return false;
 
-    synchronized (serverIdToCSN)
+    if (serverIdToCSN.remove(expectedCSN.getServerId(), expectedCSN))
     {
-      for (Iterator<CSN> iter = serverIdToCSN.values().iterator();
-          iter.hasNext();)
-      {
-        final CSN csn = iter.next();
-        if (expectedCSN.equals(csn))
-        {
-          iter.remove();
-          saved = false;
-          return true;
-        }
-      }
+      saved = false;
+      return true;
     }
     return false;
   }
@@ -232,11 +237,8 @@
       return false;
     }
 
-    synchronized (serverIdToCSN)
-    {
-      clear();
-      return update(serverState);
-    }
+    clear();
+    return update(serverState);
   }
 
   /**
@@ -252,18 +254,13 @@
    */
   public Set<String> toStringSet()
   {
-    Set<String> set = new HashSet<String>();
-
-    synchronized (serverIdToCSN)
+    final Set<String> result = new HashSet<String>();
+    for (CSN change : serverIdToCSN.values())
     {
-      for (CSN change : serverIdToCSN.values())
-      {
-        Date date = new Date(change.getTime());
-        set.add(change + " " + date + " " + change.getTime());
-      }
+      Date date = new Date(change.getTime());
+      result.add(change + " " + date + " " + change.getTime());
     }
-
-    return set;
+    return result;
   }
 
   /**
@@ -274,14 +271,10 @@
    */
   public ArrayList<ByteString> toASN1ArrayList()
   {
-    ArrayList<ByteString> values = new ArrayList<ByteString>(0);
-
-    synchronized (serverIdToCSN)
+    final ArrayList<ByteString> values = new ArrayList<ByteString>(0);
+    for (CSN csn : serverIdToCSN.values())
     {
-      for (CSN csn : serverIdToCSN.values())
-      {
-        values.add(ByteString.valueOf(csn.toString()));
-      }
+      values.add(ByteString.valueOf(csn.toString()));
     }
     return values;
   }
@@ -301,21 +294,18 @@
   public void writeTo(ASN1Writer writer, short protocolVersion)
       throws IOException
   {
-    synchronized (serverIdToCSN)
+    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
     {
-      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+      for (CSN csn : serverIdToCSN.values())
       {
-        for (CSN csn : serverIdToCSN.values())
-        {
-          writer.writeOctetString(csn.toByteString());
-        }
+        writer.writeOctetString(csn.toByteString());
       }
-      else
+    }
+    else
+    {
+      for (CSN csn : serverIdToCSN.values())
       {
-        for (CSN csn : serverIdToCSN.values())
-        {
-          writer.writeOctetString(csn.toString());
-        }
+        writer.writeOctetString(csn.toString());
       }
     }
   }
@@ -327,19 +317,7 @@
   @Override
   public String toString()
   {
-    StringBuilder buffer = new StringBuilder();
-
-    synchronized (serverIdToCSN)
-    {
-      for (CSN change : serverIdToCSN.values())
-      {
-        buffer.append(change).append(" ");
-      }
-      if (!serverIdToCSN.isEmpty())
-        buffer.deleteCharAt(buffer.length() - 1);
-    }
-
-    return buffer.toString();
+    return StaticUtils.collectionToString(serverIdToCSN.values(), " ");
   }
 
   /**
@@ -357,26 +335,6 @@
   }
 
   /**
-   * Returns the largest (most recent) {@code CSN} in this server state.
-   *
-   * @return The largest (most recent) {@code CSN} in this server state.
-   */
-  public CSN getMaxCSN()
-  {
-    CSN maxCSN = null;
-
-    synchronized (serverIdToCSN)
-    {
-      for (CSN csn : serverIdToCSN.values())
-      {
-        if (maxCSN == null || csn.isNewerThan(maxCSN))
-          maxCSN = csn;
-      }
-    }
-    return maxCSN;
-  }
-
-  /**
    * Add the tail into resultByteArray at position pos.
    */
   private int addByteArray(byte[] tail, byte[] resultByteArray, int pos)
@@ -397,37 +355,38 @@
    */
   public byte[] getBytes() throws UnsupportedEncodingException
   {
-    synchronized (serverIdToCSN)
+    // copy to protect from concurrent updates
+    // that could change the number of elements in the Map
+    final Map<Integer, CSN> copy = new HashMap<Integer, CSN>(serverIdToCSN);
+
+    final int size = copy.size();
+    List<String> idList = new ArrayList<String>(size);
+    List<String> csnList = new ArrayList<String>(size);
+    // calculate the total length needed to allocate byte array
+    int length = 0;
+    for (Entry<Integer, CSN> entry : copy.entrySet())
     {
-      final int size = serverIdToCSN.size();
-      List<String> idList = new ArrayList<String>(size);
-      List<String> csnList = new ArrayList<String>(size);
-      // calculate the total length needed to allocate byte array
-      int length = 0;
-      for (Entry<Integer, CSN> entry : serverIdToCSN.entrySet())
-      {
-        // serverId is useless, see comment in ServerState ctor
-        final String serverIdStr = String.valueOf(entry.getKey());
-        idList.add(serverIdStr);
-        length += serverIdStr.length() + 1;
+      // serverId is useless, see comment in ServerState ctor
+      final String serverIdStr = String.valueOf(entry.getKey());
+      idList.add(serverIdStr);
+      length += serverIdStr.length() + 1;
 
-        final String csnStr = entry.getValue().toString();
-        csnList.add(csnStr);
-        length += csnStr.length() + 1;
-      }
-      byte[] result = new byte[length];
-
-      // write the server state into the byte array
-      int pos = 0;
-      for (int i = 0; i < size; i++)
-      {
-        String str = idList.get(i);
-        pos = addByteArray(str.getBytes("UTF-8"), result, pos);
-        str = csnList.get(i);
-        pos = addByteArray(str.getBytes("UTF-8"), result, pos);
-      }
-      return result;
+      final String csnStr = entry.getValue().toString();
+      csnList.add(csnStr);
+      length += csnStr.length() + 1;
     }
+    byte[] result = new byte[length];
+
+    // write the server state into the byte array
+    int pos = 0;
+    for (int i = 0; i < size; i++)
+    {
+      String str = idList.get(i);
+      pos = addByteArray(str.getBytes("UTF-8"), result, pos);
+      str = csnList.get(i);
+      pos = addByteArray(str.getBytes("UTF-8"), result, pos);
+    }
+    return result;
   }
 
   /**
@@ -488,11 +447,8 @@
    */
   public ServerState duplicate()
   {
-    ServerState newState = new ServerState();
-    synchronized (serverIdToCSN)
-    {
-      newState.serverIdToCSN.putAll(serverIdToCSN);
-    }
+    final ServerState newState = new ServerState();
+    newState.serverIdToCSN.putAll(serverIdToCSN);
     return newState;
   }
 
@@ -571,14 +527,11 @@
   {
     final CSN csn = new CSN(timestamp, 0, 0);
     final ServerState newState = new ServerState();
-    synchronized (serverIdToCSN)
+    for (CSN change : serverIdToCSN.values())
     {
-      for (CSN change : serverIdToCSN.values())
+      if (change.isOlderThan(csn))
       {
-        if (change.isOlderThan(csn))
-        {
-          newState.serverIdToCSN.put(change.getServerId(), change);
-        }
+        newState.serverIdToCSN.put(change.getServerId(), change);
       }
     }
     return newState;
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 9979535..4c46759 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -41,13 +41,14 @@
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.changelog.api.*;
 import org.opends.server.types.DN;
-import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.util.StaticUtils;
 
 import com.forgerock.opendj.util.Pair;
 
+import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
 
 /**
  * Thread responsible for inserting replicated changes into the ChangeNumber
@@ -101,7 +102,9 @@
 
   /**
    * Holds the most recent changes or heartbeats received for each serverIds
-   * cross domain.
+   * cross domain. changes are stored in the replicaDBs and hence persistent,
+   * heartbeats are transient because they are easily constructed on normal
+   * operations.
    */
   private final MultiDomainServerState lastSeenUpdates =
       new MultiDomainServerState();
@@ -276,9 +279,9 @@
       final UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
       if (!record.getCSN().equals(newestRecord.getCSN()))
       {
-        // TODO JNR i18n safety check, should never happen
-        throw new ChangelogException(Message.raw("They do not equal! recordCSN="
-            + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()));
+        throw new ChangelogException(
+            ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(newestRecord
+                .getCSN().toStringUI(), record.getCSN().toStringUI()));
       }
       mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
       nextChangeForInsertDBCursor.next();
@@ -406,7 +409,7 @@
           final CSN csn = msg.getCSN();
           final DN baseDN = nextChangeForInsertDBCursor.getData();
           // FIXME problem: what if the serverId is not part of the ServerState?
-          // right now, thread will be blocked
+          // right now, change number will be blocked
           if (!canMoveForwardMediumConsistencyPoint(baseDN))
           {
             // the oldest record to insert is newer than the medium consistency
@@ -451,17 +454,27 @@
         }
       }
     }
-    catch (ChangelogException e)
+    catch (RuntimeException e)
     {
-      if (debugEnabled())
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      // TODO JNR error message i18n
+      // Nothing can be done about it.
+      // Rely on the DirectoryThread uncaught exceptions handler
+      // for logging error + alert.
+      // Message logged here gives corrective information to the administrator.
+      Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
+          getClass().getSimpleName(), stackTraceToSingleLineString(e));
+      TRACER.debugError(msg.toString());
+      throw e;
     }
-    catch (DirectoryException e)
+    catch (Exception e)
     {
-      if (debugEnabled())
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      // TODO JNR error message i18n
+      // Nothing can be done about it.
+      // Rely on the DirectoryThread uncaught exceptions handler
+      // for logging error + alert.
+      // Message logged here gives corrective information to the administrator.
+      Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
+          getClass().getSimpleName(), stackTraceToSingleLineString(e));
+      TRACER.debugError(msg.toString());
+      throw new RuntimeException(e);
     }
     finally
     {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
index f491e04..68cf2c1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -162,9 +162,22 @@
     final ServerState state = new ServerState();
     assertTrue(state.update(csn1Server1));
 
+    // test 1
     assertFalse(state.removeCSN(null));
+
+    // test 2
+    assertEquals(csn1Server1, state.getCSN(1));
     assertFalse(state.removeCSN(csn2Server1));
+    assertEquals(csn1Server1, state.getCSN(1));
+
+    // test 3
+    assertNull(state.getCSN(2));
     assertFalse(state.removeCSN(csn1Server2));
+    assertNull(state.getCSN(2));
+
+    // test 4
+    assertEquals(csn1Server1, state.getCSN(1));
     assertTrue(state.removeCSN(csn1Server1));
+    assertNull(state.getCSN(1));
   }
 }

--
Gitblit v1.10.0