mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
12.52.2013 c5ab3254dc665b820ca65ea919c5c2f5f5e0d110
opends/src/messages/messages/replication.properties
@@ -527,3 +527,5 @@
NOTICE_BEST_RS_233=RS(%d) has been evaluated to be the best replication server \
 for DS(%d) to connect to because it was the only one standing after all tests
NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d)
SEVERE_ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN_235=Could not \
 create replica database because the changelog database is shutting down
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,8 @@
import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -165,9 +167,21 @@
  /**
   * This map contains the List of updates received from each LDAP server.
   * <p>
   * When removing a domainMap, code:
   * <ol>
   * <li>first get the domainMap</li>
   * <li>synchronized on the domainMap</li>
   * <li>remove the domainMap</li>
   * <li>then check it's not null</li>
   * <li>then close all inside</li>
   * </ol>
   * When creating a JEReplicaDB, synchronize on the domainMap to avoid
   * concurrent shutdown.
   */
  private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>();
  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
      domainToReplicaDBs =
          new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  private ReplicationDbEnv dbEnv;
  private final String dbDirectoryName;
  private final File dbDirectory;
@@ -185,6 +199,7 @@
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private AtomicBoolean shutdown = new AtomicBoolean();
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
      new DBCursor<UpdateMsg>()
@@ -305,27 +320,83 @@
  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
      int serverId, ReplicationServer rs) throws ChangelogException
  {
    synchronized (domainToReplicaDBs)
    while (!shutdown.get())
    {
      Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
      if (domainMap == null)
      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
          getExistingOrNewDomainMap(baseDN);
      final Pair<JEReplicaDB, Boolean> result =
          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs);
      if (result != null)
      {
        domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>();
        domainToReplicaDBs.put(baseDN, domainMap);
        return result;
      }
      JEReplicaDB replicaDB = domainMap.get(serverId);
      if (replicaDB == null)
      {
        replicaDB =
            new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize());
        domainMap.put(serverId, replicaDB);
        return Pair.of(replicaDB, true);
      }
      return Pair.of(replicaDB, false);
    }
    throw new ChangelogException(
        ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
  }
  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
      DN baseDN)
  {
    // happy path: the domainMap already exists
    final ConcurrentMap<Integer, JEReplicaDB> currentValue =
        domainToReplicaDBs.get(baseDN);
    if (currentValue != null)
    {
      return currentValue;
    }
    // unlucky, the domainMap does not exist: take the hit and create the
    // newValue, even though the same could be done concurrently by another
    // thread
    final ConcurrentMap<Integer, JEReplicaDB> newValue =
        new ConcurrentHashMap<Integer, JEReplicaDB>();
    final ConcurrentMap<Integer, JEReplicaDB> previousValue =
        domainToReplicaDBs.putIfAbsent(baseDN, newValue);
    if (previousValue != null)
    {
      // there was already a value associated to the key, let's use it
      return previousValue;
    }
    return newValue;
  }
  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
      DN baseDN, ReplicationServer rs) throws ChangelogException
  {
    // happy path: the JEReplicaDB already exists
    JEReplicaDB currentValue = domainMap.get(serverId);
    if (currentValue != null)
    {
      return Pair.of(currentValue, false);
    }
    // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
    // on the domainMap to create a new ReplicaDB
    synchronized (domainMap)
    {
      // double-check
      currentValue = domainMap.get(serverId);
      if (currentValue != null)
      {
        return Pair.of(currentValue, false);
      }
      if (domainToReplicaDBs.get(baseDN) != domainMap)
      {
        // the domainMap could have been concurrently removed because
        // 1) a shutdown was initiated or 2) an initialize was called.
        // Return will allow the code to:
        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
        return null;
      }
      final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv);
      domainMap.put(serverId, newValue);
      return Pair.of(newValue, true);
    }
  }
  /** {@inheritDoc} */
  @Override
@@ -378,6 +449,11 @@
  @Override
  public void shutdownDB() throws ChangelogException
  {
    if (!this.shutdown.compareAndSet(false, true))
    { // shutdown has already been initiated
      return;
    }
    // Remember the first exception because :
    // - we want to try to remove everything we want to remove
    // - then throw the first encountered exception
@@ -392,6 +468,17 @@
      firstException = e;
    }
    for (Iterator<ConcurrentMap<Integer, JEReplicaDB>> it =
        this.domainToReplicaDBs.values().iterator(); it.hasNext();)
    {
      final ConcurrentMap<Integer, JEReplicaDB> domainMap = it.next();
      synchronized (domainMap)
      {
        it.remove();
        innerShutdownDomain(domainMap);
      }
    }
    if (dbEnv != null)
    {
      dbEnv.shutdown();
@@ -498,19 +585,30 @@
  @Override
  public void shutdownDomain(DN baseDN)
  {
    shutdownReplicaDBs(baseDN, getDomainMap(baseDN));
    if (this.shutdown.get())
    { // shutdown has already been initiated
      return;
    }
    final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      synchronized (domainMap)
      {
        innerShutdownDomain(domainToReplicaDBs.remove(baseDN));
      }
    }
  }
  private void shutdownReplicaDBs(DN baseDN,
      Map<Integer, JEReplicaDB> domainMap)
  /**
   * This method assumes the domainMap is synchronized by calling code and that
   * the domainMap is not null.
   */
  private void innerShutdownDomain(final Map<Integer, JEReplicaDB> domainMap)
  {
    synchronized (domainMap)
    for (JEReplicaDB replicaDB : domainMap.values())
    {
      for (JEReplicaDB replicaDB : domainMap.values())
      {
        replicaDB.shutdown();
      }
      domainToReplicaDBs.remove(baseDN);
      replicaDB.shutdown();
    }
  }
@@ -548,21 +646,25 @@
    ChangelogException firstException = null;
    // 1- clear the replica DBs
    final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN);
    synchronized (domainMap)
    Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      for (JEReplicaDB replicaDB : domainMap.values())
      synchronized (domainMap)
      {
        try
        domainMap = domainToReplicaDBs.remove(baseDN);
        for (JEReplicaDB replicaDB : domainMap.values())
        {
          replicaDB.clear();
        }
        catch (ChangelogException e)
        {
          firstException = e;
          try
          {
            replicaDB.clear();
          }
          catch (ChangelogException e)
          {
            firstException = e;
          }
          replicaDB.shutdown();
        }
      }
      shutdownReplicaDBs(baseDN, domainMap);
    }
    // 2- clear the ChangeNumber index DB
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -133,33 +133,34 @@
  /**
   * Creates a new ReplicaDB associated to a given LDAP server.
   *
   * @param id Identifier of the DB.
   * @param serverId The serverId for which changes will be stored in the DB.
   * @param baseDN the baseDN for which this DB was created.
   * @param replicationServer The ReplicationServer that creates this ReplicaDB.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @param queueSize The queueSize to use when creating the ReplicaDB.
   * @throws ChangelogException If a database problem happened
   */
  public JEReplicaDB(int id, DN baseDN, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv, int queueSize) throws ChangelogException
  public JEReplicaDB(int serverId, DN baseDN,
      ReplicationServer replicationServer, ReplicationDbEnv dbenv)
      throws ChangelogException
  {
    this.replicationServer = replicationServer;
    serverId = id;
    this.serverId = serverId;
    this.baseDN = baseDN;
    trimAge = replicationServer.getTrimAge();
    final int queueSize = replicationServer.getQueueSize();
    queueMaxSize = queueSize;
    queueLowmark = queueSize / 5;
    queueHimark = queueSize * 4 / 5;
    queueMaxBytes = 200 * queueMaxSize;
    queueLowmarkBytes = 200 * queueLowmark;
    queueHimarkBytes = 200 * queueLowmark;
    db = new ReplicationDB(id, baseDN, replicationServer, dbenv);
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    oldestCSN = db.readOldestCSN();
    newestCSN = db.readNewestCSN();
    thread = new DirectoryThread(this, "Replication server RS("
        + replicationServer.getServerId()
        + ") changelog checkpointer for Replica DS(" + id
        + ") changelog checkpointer for Replica DS(" + serverId
        + ") for domain \"" + baseDN + "\"");
    thread.start();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -408,7 +408,7 @@
      testRoot = createCleanDir();
      dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer);
      replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
      replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
      replicaDB.setCounterRecordWindowSize(counterWindow);
      // Populate the db with 'max' msg
@@ -467,7 +467,7 @@
      debugInfo(tn, "SHUTDOWN replicaDB and recreate");
      replicaDB.shutdown();
      replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
      replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
      replicaDB.setCounterRecordWindowSize(counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");