mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
12.52.2013 c5ab3254dc665b820ca65ea919c5c2f5f5e0d110
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,8 @@
import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -165,9 +167,21 @@
  /**
   * This map contains the List of updates received from each LDAP server.
   * <p>
   * When removing a domainMap, code:
   * <ol>
   * <li>first get the domainMap</li>
   * <li>synchronized on the domainMap</li>
   * <li>remove the domainMap</li>
   * <li>then check it's not null</li>
   * <li>then close all inside</li>
   * </ol>
   * When creating a JEReplicaDB, synchronize on the domainMap to avoid
   * concurrent shutdown.
   */
  private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>();
  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
      domainToReplicaDBs =
          new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  private ReplicationDbEnv dbEnv;
  private final String dbDirectoryName;
  private final File dbDirectory;
@@ -185,6 +199,7 @@
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private AtomicBoolean shutdown = new AtomicBoolean();
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
      new DBCursor<UpdateMsg>()
@@ -305,27 +320,83 @@
  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
      int serverId, ReplicationServer rs) throws ChangelogException
  {
    synchronized (domainToReplicaDBs)
    while (!shutdown.get())
    {
      Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
      if (domainMap == null)
      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
          getExistingOrNewDomainMap(baseDN);
      final Pair<JEReplicaDB, Boolean> result =
          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs);
      if (result != null)
      {
        domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>();
        domainToReplicaDBs.put(baseDN, domainMap);
        return result;
      }
      JEReplicaDB replicaDB = domainMap.get(serverId);
      if (replicaDB == null)
      {
        replicaDB =
            new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize());
        domainMap.put(serverId, replicaDB);
        return Pair.of(replicaDB, true);
      }
      return Pair.of(replicaDB, false);
    }
    throw new ChangelogException(
        ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
  }
  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
      DN baseDN)
  {
    // happy path: the domainMap already exists
    final ConcurrentMap<Integer, JEReplicaDB> currentValue =
        domainToReplicaDBs.get(baseDN);
    if (currentValue != null)
    {
      return currentValue;
    }
    // unlucky, the domainMap does not exist: take the hit and create the
    // newValue, even though the same could be done concurrently by another
    // thread
    final ConcurrentMap<Integer, JEReplicaDB> newValue =
        new ConcurrentHashMap<Integer, JEReplicaDB>();
    final ConcurrentMap<Integer, JEReplicaDB> previousValue =
        domainToReplicaDBs.putIfAbsent(baseDN, newValue);
    if (previousValue != null)
    {
      // there was already a value associated to the key, let's use it
      return previousValue;
    }
    return newValue;
  }
  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
      DN baseDN, ReplicationServer rs) throws ChangelogException
  {
    // happy path: the JEReplicaDB already exists
    JEReplicaDB currentValue = domainMap.get(serverId);
    if (currentValue != null)
    {
      return Pair.of(currentValue, false);
    }
    // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
    // on the domainMap to create a new ReplicaDB
    synchronized (domainMap)
    {
      // double-check
      currentValue = domainMap.get(serverId);
      if (currentValue != null)
      {
        return Pair.of(currentValue, false);
      }
      if (domainToReplicaDBs.get(baseDN) != domainMap)
      {
        // the domainMap could have been concurrently removed because
        // 1) a shutdown was initiated or 2) an initialize was called.
        // Return will allow the code to:
        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
        return null;
      }
      final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv);
      domainMap.put(serverId, newValue);
      return Pair.of(newValue, true);
    }
  }
  /** {@inheritDoc} */
  @Override
@@ -378,6 +449,11 @@
  @Override
  public void shutdownDB() throws ChangelogException
  {
    if (!this.shutdown.compareAndSet(false, true))
    { // shutdown has already been initiated
      return;
    }
    // Remember the first exception because :
    // - we want to try to remove everything we want to remove
    // - then throw the first encountered exception
@@ -392,6 +468,17 @@
      firstException = e;
    }
    for (Iterator<ConcurrentMap<Integer, JEReplicaDB>> it =
        this.domainToReplicaDBs.values().iterator(); it.hasNext();)
    {
      final ConcurrentMap<Integer, JEReplicaDB> domainMap = it.next();
      synchronized (domainMap)
      {
        it.remove();
        innerShutdownDomain(domainMap);
      }
    }
    if (dbEnv != null)
    {
      dbEnv.shutdown();
@@ -498,19 +585,30 @@
  @Override
  public void shutdownDomain(DN baseDN)
  {
    shutdownReplicaDBs(baseDN, getDomainMap(baseDN));
    if (this.shutdown.get())
    { // shutdown has already been initiated
      return;
    }
    final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      synchronized (domainMap)
      {
        innerShutdownDomain(domainToReplicaDBs.remove(baseDN));
      }
    }
  }
  private void shutdownReplicaDBs(DN baseDN,
      Map<Integer, JEReplicaDB> domainMap)
  /**
   * This method assumes the domainMap is synchronized by calling code and that
   * the domainMap is not null.
   */
  private void innerShutdownDomain(final Map<Integer, JEReplicaDB> domainMap)
  {
    synchronized (domainMap)
    for (JEReplicaDB replicaDB : domainMap.values())
    {
      for (JEReplicaDB replicaDB : domainMap.values())
      {
        replicaDB.shutdown();
      }
      domainToReplicaDBs.remove(baseDN);
      replicaDB.shutdown();
    }
  }
@@ -548,21 +646,25 @@
    ChangelogException firstException = null;
    // 1- clear the replica DBs
    final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN);
    synchronized (domainMap)
    Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      for (JEReplicaDB replicaDB : domainMap.values())
      synchronized (domainMap)
      {
        try
        domainMap = domainToReplicaDBs.remove(baseDN);
        for (JEReplicaDB replicaDB : domainMap.values())
        {
          replicaDB.clear();
        }
        catch (ChangelogException e)
        {
          firstException = e;
          try
          {
            replicaDB.clear();
          }
          catch (ChangelogException e)
          {
            firstException = e;
          }
          replicaDB.shutdown();
        }
      }
      shutdownReplicaDBs(baseDN, domainMap);
    }
    // 2- clear the ChangeNumber index DB