mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
21.46.2014 9bf63efe4d78b204ff62871fd88b0d1ef4f50f63
OPENDJ-1606 (CR-4909) ConcurrentModificationException while performing modify operation against two replicated DS


Fixed ConcurrentModificationException.
I suspect the NoSuchElementException is thrown by the desugared foreach loop on an ArrayList.

JEChangelogDB.java, FileChangelogDB.java:
FOr the fields registeredDomainCursors and replicaCursors, replaced the use of ArrayList + synchronized keywords with using ConcurrentSkipListMap + CopyOnWriteArrayList to ensure thread safe access/modifications of these MultiMaps.
Created putInMultiMap().
Aligned code between the 2 implementations.
2 files modified
188 ■■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 106 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 82 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -26,9 +26,7 @@
package org.opends.server.replication.server.changelog.file;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -56,9 +54,9 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicaId;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicaId;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.je.DomainDBCursor;
@@ -98,15 +96,12 @@
   */
  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
  /**
   * \@GuardedBy("itself")
   */
  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
      new HashMap<DN, List<DomainDBCursor>>();
  private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors =
      new ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>>();
  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
      new CopyOnWriteArrayList<MultiDomainDBCursor>();
  private final ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>> replicaCursors =
      new ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>>();
  private final ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors =
      new ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>>();
  private ReplicationEnvironment replicationEnv;
  private final ReplicationServerCfg config;
  private final File dbDirectory;
@@ -169,8 +164,8 @@
    }
    catch (Exception e)
    {
      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(e.getLocalizedMessage()).append(" ")
          .append(String.valueOf(dbDirectory));
      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
          e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
    }
  }
@@ -272,6 +267,7 @@
    // on the domainMap to create a new ReplicaDB
    synchronized (domainMap)
    {
      // double-check
      currentValue = domainMap.get(serverId);
      if (currentValue != null)
      {
@@ -312,7 +308,7 @@
    catch (ChangelogException e)
    {
      logger.traceException(e);
      logger.error(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()));
      logger.error(ERR_COULD_NOT_READ_DB, this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage());
    }
  }
@@ -394,6 +390,7 @@
    {
      firstException = e;
    }
    for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it =
        this.domainToReplicaDBs.values().iterator(); it.hasNext();)
    {
@@ -464,7 +461,7 @@
          {
            firstException = e;
          }
          else if (logger.isTraceEnabled())
          else
          {
            logger.traceException(e);
          }
@@ -560,7 +557,7 @@
      {
        firstException = e;
      }
      else if (logger.isTraceEnabled())
      else
      {
        logger.traceException(e);
      }
@@ -639,7 +636,7 @@
        catch (Exception e)
        {
          logger.traceException(e);
          logger.error(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
          logger.error(ERR_CHANGENUMBER_DATABASE, e.getLocalizedMessage());
        }
      }
      return cnIndexDB;
@@ -656,7 +653,7 @@
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
  {
    final Set<DN> excludedDomainDns = Collections.emptySet();
    return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns);
@@ -666,8 +663,7 @@
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
      final Set<DN> excludedDomainDns)
          throws ChangelogException
      final Set<DN> excludedDomainDns) throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
@@ -699,18 +695,9 @@
  private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
        cursors = new ArrayList<DomainDBCursor>();
        registeredDomainCursors.put(baseDN, cursors);
      }
      cursors.add(cursor);
      return cursor;
    }
    final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
    putCursor(registeredDomainCursors, baseDN, cursor);
    return cursor;
  }
  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
@@ -739,22 +726,28 @@
      final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
      synchronized (replicaCursors)
      {
        List<ReplicaCursor> cursors = replicaCursors.get(replicaId);
        if (cursors == null)
        {
          cursors = new ArrayList<ReplicaCursor>();
          replicaCursors.put(replicaId, cursors);
        }
        cursors.add(replicaCursor);
      }
      putCursor(replicaCursors, replicaId, replicaCursor);
      return replicaCursor;
    }
    return EMPTY_CURSOR_REPLICA_DB;
  }
  private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor)
  {
    CopyOnWriteArrayList<V> cursors = map.get(key);
    if (cursors == null)
    {
      cursors = new CopyOnWriteArrayList<V>();
      CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors);
      if (previousValue != null)
      {
        cursors = previousValue;
      }
    }
    cursors.add(cursor);
  }
  /** {@inheritDoc} */
  @Override
  public void unregisterCursor(final DBCursor<?> cursor)
@@ -766,25 +759,19 @@
    else if (cursor instanceof DomainDBCursor)
    {
      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
      synchronized (registeredMultiDomainCursors)
      final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
      if (cursors != null)
      {
        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
        if (cursors != null)
        {
          cursors.remove(cursor);
        }
        cursors.remove(cursor);
      }
    }
    else if (cursor instanceof ReplicaCursor)
    {
      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
      synchronized (replicaCursors)
      final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId());
      if (cursors != null)
      {
        final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId());
        if (cursors != null)
        {
          cursors.remove(cursor);
        }
        cursors.remove(cursor);
      }
    }
  }
@@ -847,15 +834,12 @@
  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
  {
    synchronized (replicaCursors)
    final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId));
    if (cursors != null)
    {
      final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId));
      if (cursors != null)
      for (ReplicaCursor cursor : cursors)
      {
        for (ReplicaCursor cursor : cursors)
        {
          cursor.setOfflineCSN(offlineCSN);
        }
        cursor.setOfflineCSN(offlineCSN);
      }
    }
  }
@@ -939,7 +923,7 @@
        }
        catch (Exception e)
        {
          logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get(stackTraceToSingleLineString(e)));
          logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e));
          if (replicationServer != null)
          {
            replicationServer.shutdown();
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -26,9 +26,7 @@
package org.opends.server.replication.server.changelog.je;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -56,9 +54,9 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicaId;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicaId;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -93,15 +91,12 @@
   */
  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  /**
   * \@GuardedBy("itself")
   */
  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
      new HashMap<DN, List<DomainDBCursor>>();
  private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors =
      new ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>>();
  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
      new CopyOnWriteArrayList<MultiDomainDBCursor>();
  private final ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>> replicaCursors =
      new ConcurrentSkipListMap<ReplicaId, List<ReplicaCursor>>();
  private final ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors =
      new ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>>();
  private ReplicationDbEnv replicationEnv;
  private final ReplicationServerCfg config;
  private final File dbDirectory;
@@ -190,8 +185,6 @@
    }
    catch (Exception e)
    {
      logger.traceException(e);
      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
          e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
@@ -336,7 +329,6 @@
    catch (ChangelogException e)
    {
      logger.traceException(e);
      logger.error(ERR_COULD_NOT_READ_DB, this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage());
    }
  }
@@ -714,13 +706,14 @@
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
      final  Set<DN> excludedDomainDns) throws ChangelogException
      final Set<DN> excludedDomainDns) throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      if (!excludedDomainDns.contains(baseDN)) {
      if (!excludedDomainDns.contains(baseDN))
      {
        cursor.addDomain(baseDN, startState.getServerState(baseDN));
      }
    }
@@ -745,18 +738,9 @@
  private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
        cursors = new ArrayList<DomainDBCursor>();
        registeredDomainCursors.put(baseDN, cursors);
      }
      cursors.add(cursor);
      return cursor;
    }
    final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
    putCursor(registeredDomainCursors, baseDN, cursor);
    return cursor;
  }
  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
@@ -785,22 +769,28 @@
      final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
      synchronized (replicaCursors)
      {
        List<ReplicaCursor> cursors = replicaCursors.get(replicaId);
        if (cursors == null)
        {
          cursors = new ArrayList<ReplicaCursor>();
          replicaCursors.put(replicaId, cursors);
        }
        cursors.add(replicaCursor);
      }
      putCursor(replicaCursors, replicaId, replicaCursor);
      return replicaCursor;
    }
    return EMPTY_CURSOR_REPLICA_DB;
  }
  private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor)
  {
    CopyOnWriteArrayList<V> cursors = map.get(key);
    if (cursors == null)
    {
      cursors = new CopyOnWriteArrayList<V>();
      CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors);
      if (previousValue != null)
      {
        cursors = previousValue;
      }
    }
    cursors.add(cursor);
  }
  /** {@inheritDoc} */
  @Override
  public void unregisterCursor(final DBCursor<?> cursor)
@@ -812,19 +802,16 @@
    else if (cursor instanceof DomainDBCursor)
    {
      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
      synchronized (registeredMultiDomainCursors)
      final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
      if (cursors != null)
      {
        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
        if (cursors != null)
        {
          cursors.remove(cursor);
        }
        cursors.remove(cursor);
      }
    }
    else if (cursor instanceof ReplicaCursor)
    {
      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
      final List<ReplicaCursor> cursors =  replicaCursors.get(replicaCursor.getReplicaId());
      final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId());
      if (cursors != null)
      {
        cursors.remove(cursor);
@@ -847,7 +834,7 @@
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
      indexer.publishUpdateMsg(baseDN, updateMsg);
    }
    return pair.getSecond(); // replica DB was created
@@ -872,6 +859,7 @@
    {
      replicationEnv.notifyReplicaOnline(baseDN, serverId);
    }
    updateCursorsWithOfflineCSN(baseDN, serverId, null);
  }
  /** {@inheritDoc} */
@@ -887,10 +875,10 @@
    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
  }
  private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN)
  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
  {
    final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId));
    if (cursors != null && !cursors.isEmpty())
    if (cursors != null)
    {
      for (ReplicaCursor cursor : cursors)
      {