From b2335ed2f0acbb186c54f1a2d4c6661dece126a0 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 21 Oct 2014 09:13:24 +0000
Subject: [PATCH] OPENDJ-1606 (CR-4909) ConcurrentModificationException while performing modify operation against two replicated DS
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 83 +++++++++++----------------
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 96 +++++++++++++------------------
2 files changed, 75 insertions(+), 104 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index ec88341..e3ddb3e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -26,9 +26,7 @@
package org.opends.server.replication.server.changelog.file;
import java.io.File;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -100,15 +98,12 @@
*/
private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
- /**
- * \@GuardedBy("itself")
- */
- private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
- new HashMap<DN, List<DomainDBCursor>>();
+ private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors =
+ new ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>>();
private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
new CopyOnWriteArrayList<MultiDomainDBCursor>();
- private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
- new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
+ private final ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors =
+ new ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>>(Pair.COMPARATOR);
private ReplicationEnvironment replicationEnv;
private final ReplicationServerCfg config;
private final File dbDirectory;
@@ -274,6 +269,7 @@
// on the domainMap to create a new ReplicaDB
synchronized (domainMap)
{
+ // double-check
currentValue = domainMap.get(serverId);
if (currentValue != null)
{
@@ -399,6 +395,7 @@
{
firstException = e;
}
+
for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it =
this.domainToReplicaDBs.values().iterator(); it.hasNext();)
{
@@ -644,7 +641,9 @@
catch (Exception e)
{
if (debugEnabled())
+ {
TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
logError(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
}
}
@@ -662,7 +661,7 @@
/** {@inheritDoc} */
@Override
public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
- KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
+ final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
{
final Set<DN> excludedDomainDns = Collections.emptySet();
return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns);
@@ -672,8 +671,7 @@
@Override
public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
- final Set<DN> excludedDomainDns)
- throws ChangelogException
+ final Set<DN> excludedDomainDns) throws ChangelogException
{
final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
registeredMultiDomainCursors.add(cursor);
@@ -705,18 +703,9 @@
private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
final PositionStrategy positionStrategy)
{
- synchronized (registeredDomainCursors)
- {
- final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
- List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
- if (cursors == null)
- {
- cursors = new ArrayList<DomainDBCursor>();
- registeredDomainCursors.put(baseDN, cursors);
- }
- cursors.add(cursor);
- return cursor;
- }
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
+ putCursor(registeredDomainCursors, baseDN, cursor);
+ return cursor;
}
private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
@@ -742,25 +731,31 @@
{
final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy);
final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
- final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
- final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+ final Pair<DN, Integer> replicaId = Pair.of(baseDN, serverId);
+ final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
- synchronized (replicaCursors)
- {
- List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
- if (cursors == null)
- {
- cursors = new ArrayList<ReplicaCursor>();
- replicaCursors.put(replicaID, cursors);
- }
- cursors.add(replicaCursor);
- }
+ putCursor(replicaCursors, replicaId, replicaCursor);
return replicaCursor;
}
return EMPTY_CURSOR_REPLICA_DB;
}
+ private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor)
+ {
+ CopyOnWriteArrayList<V> cursors = map.get(key);
+ if (cursors == null)
+ {
+ cursors = new CopyOnWriteArrayList<V>();
+ CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors);
+ if (previousValue != null)
+ {
+ cursors = previousValue;
+ }
+ }
+ cursors.add(cursor);
+ }
+
/** {@inheritDoc} */
@Override
public void unregisterCursor(final DBCursor<?> cursor)
@@ -772,25 +767,19 @@
else if (cursor instanceof DomainDBCursor)
{
final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
- synchronized (registeredMultiDomainCursors)
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+ if (cursors != null)
{
- final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
- if (cursors != null)
- {
- cursors.remove(cursor);
- }
+ cursors.remove(cursor);
}
}
else if (cursor instanceof ReplicaCursor)
{
final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
- synchronized (replicaCursors)
+ final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
+ if (cursors != null)
{
- final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
- if (cursors != null)
- {
- cursors.remove(cursor);
- }
+ cursors.remove(cursor);
}
}
}
@@ -853,15 +842,12 @@
private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
{
- synchronized (replicaCursors)
+ final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
+ if (cursors != null)
{
- final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
- if (cursors != null)
+ for (ReplicaCursor cursor : cursors)
{
- for (ReplicaCursor cursor : cursors)
- {
- cursor.setOfflineCSN(offlineCSN);
- }
+ cursor.setOfflineCSN(offlineCSN);
}
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 9eb193f..085504d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -26,9 +26,7 @@
package org.opends.server.replication.server.changelog.je;
import java.io.File;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -95,15 +93,12 @@
*/
private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
- /**
- * \@GuardedBy("itself")
- */
- private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
- new HashMap<DN, List<DomainDBCursor>>();
+ private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors =
+ new ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>>();
private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
new CopyOnWriteArrayList<MultiDomainDBCursor>();
- private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
- new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
+ private final ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors =
+ new ConcurrentSkipListMap<Pair<DN, Integer>, CopyOnWriteArrayList<ReplicaCursor>>(Pair.COMPARATOR);
private ReplicationDbEnv replicationEnv;
private final ReplicationServerCfg config;
private final File dbDirectory;
@@ -192,11 +187,6 @@
}
catch (Exception e)
{
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
-
final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ")
.append(String.valueOf(dbDirectory));
throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
@@ -724,13 +714,14 @@
@Override
public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
- final Set<DN> excludedDomainDns) throws ChangelogException
+ final Set<DN> excludedDomainDns) throws ChangelogException
{
final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
registeredMultiDomainCursors.add(cursor);
for (DN baseDN : domainToReplicaDBs.keySet())
{
- if (!excludedDomainDns.contains(baseDN)) {
+ if (!excludedDomainDns.contains(baseDN))
+ {
cursor.addDomain(baseDN, startState.getServerState(baseDN));
}
}
@@ -755,18 +746,9 @@
private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
final PositionStrategy positionStrategy)
{
- synchronized (registeredDomainCursors)
- {
- final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
- List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
- if (cursors == null)
- {
- cursors = new ArrayList<DomainDBCursor>();
- registeredDomainCursors.put(baseDN, cursors);
- }
- cursors.add(cursor);
- return cursor;
- }
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
+ putCursor(registeredDomainCursors, baseDN, cursor);
+ return cursor;
}
private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
@@ -792,25 +774,31 @@
{
final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy);
final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
- final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
- final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+ final Pair<DN, Integer> replicaId = Pair.of(baseDN, serverId);
+ final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
- synchronized (replicaCursors)
- {
- List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
- if (cursors == null)
- {
- cursors = new ArrayList<ReplicaCursor>();
- replicaCursors.put(replicaID, cursors);
- }
- cursors.add(replicaCursor);
- }
+ putCursor(replicaCursors, replicaId, replicaCursor);
return replicaCursor;
}
return EMPTY_CURSOR_REPLICA_DB;
}
+ private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor)
+ {
+ CopyOnWriteArrayList<V> cursors = map.get(key);
+ if (cursors == null)
+ {
+ cursors = new CopyOnWriteArrayList<V>();
+ CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors);
+ if (previousValue != null)
+ {
+ cursors = previousValue;
+ }
+ }
+ cursors.add(cursor);
+ }
+
/** {@inheritDoc} */
@Override
public void unregisterCursor(final DBCursor<?> cursor)
@@ -822,19 +810,16 @@
else if (cursor instanceof DomainDBCursor)
{
final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
- synchronized (registeredMultiDomainCursors)
+ final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+ if (cursors != null)
{
- final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
- if (cursors != null)
- {
- cursors.remove(cursor);
- }
+ cursors.remove(cursor);
}
}
else if (cursor instanceof ReplicaCursor)
{
final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
- final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
+ final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
if (cursors != null)
{
cursors.remove(cursor);
@@ -898,10 +883,10 @@
updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
}
- private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN)
+ private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
{
final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
- if (cursors != null && !cursors.isEmpty())
+ if (cursors != null)
{
for (ReplicaCursor cursor : cursors)
{
--
Gitblit v1.10.0