From b2335ed2f0acbb186c54f1a2d4c6661dece126a0 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 21 Oct 2014 09:13:24 +0000
Subject: [PATCH] OPENDJ-1606 (CR-4909) ConcurrentModificationException while performing modify operation against two replicated DS

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java     |   83 +++++++++++----------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java |   96 +++++++++++++------------------
 2 files changed, 75 insertions(+), 104 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index ec88341..e3ddb3e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -26,9 +26,7 @@
 package org.opends.server.replication.server.changelog.file;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -100,15 +98,12 @@
    */
   private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
       new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
-  /**
-   * \@GuardedBy("itself")
-   */
-  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
-      new HashMap<DN, List<DomainDBCursor>>();
+  private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors =
+      new ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>>();
   private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
       new CopyOnWriteArrayList<MultiDomainDBCursor>();
-  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
-      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
+  private final ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors =
+      new ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>>(Pair.COMPARATOR);
   private ReplicationEnvironment replicationEnv;
   private final ReplicationServerCfg config;
   private final File dbDirectory;
@@ -274,6 +269,7 @@
     // on the domainMap to create a new ReplicaDB
     synchronized (domainMap)
     {
+      // double-check
       currentValue = domainMap.get(serverId);
       if (currentValue != null)
       {
@@ -399,6 +395,7 @@
     {
       firstException = e;
     }
+
     for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it =
         this.domainToReplicaDBs.values().iterator(); it.hasNext();)
     {
@@ -644,7 +641,9 @@
         catch (Exception e)
         {
           if (debugEnabled())
+          {
             TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
           logError(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
         }
       }
@@ -662,7 +661,7 @@
   /** {@inheritDoc} */
   @Override
   public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
-      KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
+      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
   {
     final Set<DN> excludedDomainDns = Collections.emptySet();
     return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns);
@@ -672,8 +671,7 @@
   @Override
   public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
       final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
-      final Set<DN> excludedDomainDns)
-          throws ChangelogException
+      final Set<DN> excludedDomainDns) throws ChangelogException
   {
     final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
     registeredMultiDomainCursors.add(cursor);
@@ -705,18 +703,9 @@
   private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
       final PositionStrategy positionStrategy)
   {
-    synchronized (registeredDomainCursors)
-    {
-      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
-      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
-      if (cursors == null)
-      {
-        cursors = new ArrayList<DomainDBCursor>();
-        registeredDomainCursors.put(baseDN, cursors);
-      }
-      cursors.add(cursor);
-      return cursor;
-    }
+    final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
+    putCursor(registeredDomainCursors, baseDN, cursor);
+    return cursor;
   }
 
   private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
@@ -742,25 +731,31 @@
     {
       final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy);
       final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
-      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
-      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+      final Pair<DN, Integer> replicaId = Pair.of(baseDN, serverId);
+      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
 
-      synchronized (replicaCursors)
-      {
-        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
-        if (cursors == null)
-        {
-          cursors = new ArrayList<ReplicaCursor>();
-          replicaCursors.put(replicaID, cursors);
-        }
-        cursors.add(replicaCursor);
-      }
+      putCursor(replicaCursors, replicaId, replicaCursor);
 
       return replicaCursor;
     }
     return EMPTY_CURSOR_REPLICA_DB;
   }
 
+  private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor)
+  {
+    CopyOnWriteArrayList<V> cursors = map.get(key);
+    if (cursors == null)
+    {
+      cursors = new CopyOnWriteArrayList<V>();
+      CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors);
+      if (previousValue != null)
+      {
+        cursors = previousValue;
+      }
+    }
+    cursors.add(cursor);
+  }
+
   /** {@inheritDoc} */
   @Override
   public void unregisterCursor(final DBCursor<?> cursor)
@@ -772,25 +767,19 @@
     else if (cursor instanceof DomainDBCursor)
     {
       final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
-      synchronized (registeredMultiDomainCursors)
+      final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+      if (cursors != null)
       {
-        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
-        if (cursors != null)
-        {
-          cursors.remove(cursor);
-        }
+        cursors.remove(cursor);
       }
     }
     else if (cursor instanceof ReplicaCursor)
     {
       final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
-      synchronized (replicaCursors)
+      final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
+      if (cursors != null)
       {
-        final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
-        if (cursors != null)
-        {
-          cursors.remove(cursor);
-        }
+        cursors.remove(cursor);
       }
     }
   }
@@ -853,15 +842,12 @@
 
   private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
   {
-    synchronized (replicaCursors)
+    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
+    if (cursors != null)
     {
-      final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
-      if (cursors != null)
+      for (ReplicaCursor cursor : cursors)
       {
-        for (ReplicaCursor cursor : cursors)
-        {
-          cursor.setOfflineCSN(offlineCSN);
-        }
+        cursor.setOfflineCSN(offlineCSN);
       }
     }
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 9eb193f..085504d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -26,9 +26,7 @@
 package org.opends.server.replication.server.changelog.je;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -95,15 +93,12 @@
    */
   private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
       new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
-  /**
-   * \@GuardedBy("itself")
-   */
-  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
-      new HashMap<DN, List<DomainDBCursor>>();
+  private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors =
+      new ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>>();
   private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
       new CopyOnWriteArrayList<MultiDomainDBCursor>();
-  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
-      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
+  private final ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors =
+      new ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>>(Pair.COMPARATOR);
   private ReplicationDbEnv replicationEnv;
   private final ReplicationServerCfg config;
   private final File dbDirectory;
@@ -192,11 +187,6 @@
     }
     catch (Exception e)
     {
-      if (debugEnabled())
-      {
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      }
-
       final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ")
           .append(String.valueOf(dbDirectory));
       throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
@@ -724,13 +714,14 @@
   @Override
   public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
       final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
-      final  Set<DN> excludedDomainDns) throws ChangelogException
+      final Set<DN> excludedDomainDns) throws ChangelogException
   {
     final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
     registeredMultiDomainCursors.add(cursor);
     for (DN baseDN : domainToReplicaDBs.keySet())
     {
-      if (!excludedDomainDns.contains(baseDN)) {
+      if (!excludedDomainDns.contains(baseDN))
+      {
         cursor.addDomain(baseDN, startState.getServerState(baseDN));
       }
     }
@@ -755,18 +746,9 @@
   private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
       final PositionStrategy positionStrategy)
   {
-    synchronized (registeredDomainCursors)
-    {
-      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
-      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
-      if (cursors == null)
-      {
-        cursors = new ArrayList<DomainDBCursor>();
-        registeredDomainCursors.put(baseDN, cursors);
-      }
-      cursors.add(cursor);
-      return cursor;
-    }
+    final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
+    putCursor(registeredDomainCursors, baseDN, cursor);
+    return cursor;
   }
 
   private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
@@ -792,25 +774,31 @@
     {
       final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy);
       final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
-      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
-      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+      final Pair<DN, Integer> replicaId = Pair.of(baseDN, serverId);
+      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
 
-      synchronized (replicaCursors)
-      {
-        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
-        if (cursors == null)
-        {
-          cursors = new ArrayList<ReplicaCursor>();
-          replicaCursors.put(replicaID, cursors);
-        }
-        cursors.add(replicaCursor);
-      }
+      putCursor(replicaCursors, replicaId, replicaCursor);
 
       return replicaCursor;
     }
     return EMPTY_CURSOR_REPLICA_DB;
   }
 
+  private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor)
+  {
+    CopyOnWriteArrayList<V> cursors = map.get(key);
+    if (cursors == null)
+    {
+      cursors = new CopyOnWriteArrayList<V>();
+      CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors);
+      if (previousValue != null)
+      {
+        cursors = previousValue;
+      }
+    }
+    cursors.add(cursor);
+  }
+
   /** {@inheritDoc} */
   @Override
   public void unregisterCursor(final DBCursor<?> cursor)
@@ -822,19 +810,16 @@
     else if (cursor instanceof DomainDBCursor)
     {
       final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
-      synchronized (registeredMultiDomainCursors)
+      final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+      if (cursors != null)
       {
-        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
-        if (cursors != null)
-        {
-          cursors.remove(cursor);
-        }
+        cursors.remove(cursor);
       }
     }
     else if (cursor instanceof ReplicaCursor)
     {
       final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
-      final List<ReplicaCursor> cursors =  replicaCursors.get(replicaCursor.getReplicaID());
+      final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
       if (cursors != null)
       {
         cursors.remove(cursor);
@@ -898,10 +883,10 @@
     updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
   }
 
-  private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN)
+  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
   {
     final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
-    if (cursors != null && !cursors.isEmpty())
+    if (cursors != null)
     {
       for (ReplicaCursor cursor : cursors)
       {

--
Gitblit v1.10.0