mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.26.2014 39845070920c859cd1d24cb23090bfa1bfad7b1a
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -40,6 +41,7 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
@@ -72,13 +74,20 @@
   * <li>then check it's not null</li>
   * <li>then close all inside</li>
   * </ol>
   * When creating a JEReplicaDB, synchronize on the domainMap to avoid
   * When creating a replicaDB, synchronize on the domainMap to avoid
   * concurrent shutdown.
   */
  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
      domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  private ReplicationDbEnv dbEnv;
  private ReplicationServerCfg config;
  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  /**
   * \@GuardedBy("itself")
   */
  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
      new HashMap<DN, List<DomainDBCursor>>();
  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
      new CopyOnWriteArrayList<MultiDomainDBCursor>();
  private ReplicationDbEnv replicationEnv;
  private final ReplicationServerCfg config;
  private final File dbDirectory;
  /**
@@ -103,9 +112,9 @@
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private AtomicBoolean shutdown = new AtomicBoolean();
  private final AtomicBoolean shutdown = new AtomicBoolean();
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
      new DBCursor<UpdateMsg>()
  {
@@ -135,7 +144,7 @@
  };
  /**
   * Builds an instance of this class.
   * Creates a new changelog DB.
   *
   * @param replicationServer
   *          the local replication server.
@@ -144,15 +153,15 @@
   * @throws ConfigException
   *           if a problem occurs opening the supplied directory
   */
  public JEChangelogDB(ReplicationServer replicationServer,
      ReplicationServerCfg config) throws ConfigException
  public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
      throws ConfigException
  {
    this.config = config;
    this.replicationServer = replicationServer;
    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
  }
  private File makeDir(String dbDirName) throws ConfigException
  private File makeDir(final String dbDirName) throws ConfigException
  {
    // Check that this path exists or create it.
    final File dbDirectory = getFileForPath(dbDirName);
@@ -168,15 +177,13 @@
    {
      logger.traceException(e);
      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
      mb.append(e.getLocalizedMessage());
      mb.append(" ");
      mb.append(dbDirectory);
      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb), e);
      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
          e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
    }
  }
  private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN)
  private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN)
  {
    final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
@@ -186,29 +193,12 @@
    return Collections.emptyMap();
  }
  private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
  private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId)
  {
    return getDomainMap(baseDN).get(serverId);
  }
  /**
   * Provision resources for the specified serverId in the specified replication
   * domain.
   *
   * @param baseDN
   *          the replication domain where to add the serverId
   * @param serverId
   *          the server Id to add to the replication domain
   * @throws ChangelogException
   *           If a database error happened.
   */
  private void commission(DN baseDN, int serverId, ReplicationServer rs)
      throws ChangelogException
  {
    getOrCreateReplicaDB(baseDN, serverId, rs);
  }
  /**
   * Returns a {@link JEReplicaDB}, possibly creating it.
   *
   * @param baseDN
@@ -217,35 +207,42 @@
   *          the serverId for which to create a ReplicaDB
   * @param server
   *          the ReplicationServer
   * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
   *         to be created
   * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
   * @throws ChangelogException
   *           if a problem occurred with the database
   */
  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
      int serverId, ReplicationServer server) throws ChangelogException
  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
      final ReplicationServer server) throws ChangelogException
  {
    while (!shutdown.get())
    {
      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
          getExistingOrNewDomainMap(baseDN);
      final Pair<JEReplicaDB, Boolean> result =
          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
      final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
      final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
      if (result != null)
      {
        final Boolean dbWasCreated = result.getSecond();
        if (dbWasCreated)
        { // new replicaDB => update all cursors with it
          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
          if (cursors != null && !cursors.isEmpty())
          {
            for (DomainDBCursor cursor : cursors)
            {
              cursor.addReplicaDB(serverId, null);
            }
          }
        }
        return result;
      }
    }
    throw new ChangelogException(
        ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
  }
  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
      DN baseDN)
  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
  {
    // happy path: the domainMap already exists
    final ConcurrentMap<Integer, JEReplicaDB> currentValue =
        domainToReplicaDBs.get(baseDN);
    final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
    if (currentValue != null)
    {
      return currentValue;
@@ -254,30 +251,36 @@
    // unlucky, the domainMap does not exist: take the hit and create the
    // newValue, even though the same could be done concurrently by another
    // thread
    final ConcurrentMap<Integer, JEReplicaDB> newValue =
        new ConcurrentHashMap<Integer, JEReplicaDB>();
    final ConcurrentMap<Integer, JEReplicaDB> previousValue =
        domainToReplicaDBs.putIfAbsent(baseDN, newValue);
    final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>();
    final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
    if (previousValue != null)
    {
      // there was already a value associated to the key, let's use it
      return previousValue;
    }
    if (MultimasterReplication.isECLEnabledDomain(baseDN))
    {
      // we just created a new domain => update all cursors
      for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
      {
        cursor.addDomain(baseDN, null);
      }
    }
    return newValue;
  }
  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
      DN baseDN, ReplicationServer server) throws ChangelogException
  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
  {
    // happy path: the JEReplicaDB already exists
    // happy path: the replicaDB already exists
    JEReplicaDB currentValue = domainMap.get(serverId);
    if (currentValue != null)
    {
      return Pair.of(currentValue, false);
    }
    // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
    // unlucky, the replicaDB does not exist: take the hit and synchronize
    // on the domainMap to create a new ReplicaDB
    synchronized (domainMap)
    {
@@ -293,11 +296,11 @@
        // The domainMap could have been concurrently removed because
        // 1) a shutdown was initiated or 2) an initialize was called.
        // Return will allow the code to:
        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
        // 1) shutdown properly or 2) lazily recreate the replicaDB
        return null;
      }
      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
      domainMap.put(serverId, newDB);
      return Pair.of(newDB, true);
    }
@@ -310,8 +313,8 @@
    try
    {
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = dbEnv.getChangelogState();
      replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = replicationEnv.getChangelogState();
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
@@ -338,12 +341,12 @@
    {
      for (int serverId : entry.getValue())
      {
        commission(entry.getKey(), serverId, replicationServer);
        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
      }
    }
  }
  private void shutdownCNIndexDB() throws ChangelogException
  private void shutdownChangeNumberIndexDB() throws ChangelogException
  {
    synchronized (cnIndexDBLock)
    {
@@ -381,7 +384,7 @@
    try
    {
      shutdownCNIndexDB();
      shutdownChangeNumberIndexDB();
    }
    catch (ChangelogException e)
    {
@@ -402,7 +405,7 @@
      }
    }
    if (dbEnv != null)
    if (replicationEnv != null)
    {
      // wait for shutdown of the threads holding cursors
      try
@@ -421,7 +424,7 @@
        // do nothing: we are already shutting down
      }
      dbEnv.shutdown();
      replicationEnv.shutdown();
    }
    if (firstException != null)
@@ -431,11 +434,10 @@
  }
  /**
   * Clears all content from the changelog database, but leaves its directory on
   * the filesystem.
   * Clears all records from the changelog (does not remove the changelog itself).
   *
   * @throws ChangelogException
   *           If a database problem happened
   *           If an error occurs when clearing the changelog.
   */
  public void clearDB() throws ChangelogException
  {
@@ -469,7 +471,7 @@
        try
        {
          shutdownCNIndexDB();
          shutdownChangeNumberIndexDB();
        }
        catch (ChangelogException e)
        {
@@ -584,7 +586,7 @@
    // 3- clear the changelogstate DB
    try
    {
      dbEnv.clearGenerationId(baseDN);
      replicationEnv.clearGenerationId(baseDN);
    }
    catch (ChangelogException e)
    {
@@ -635,7 +637,7 @@
  {
    if (computeChangeNumber)
    {
      startIndexer(dbEnv.getChangelogState());
      startIndexer(replicationEnv.getChangelogState());
    }
    else
    {
@@ -673,7 +675,7 @@
      {
        try
        {
          cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
          cnIndexDB = new JEChangeNumberIndexDB(replicationEnv);
        }
        catch (Exception e)
        {
@@ -694,40 +696,57 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
      throws ChangelogException
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
  {
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    for (int serverId : serverIds)
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      replicaDBCursor.next();
      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
    }
    // recycle exhausted cursors,
    // because client code will not manage the cursors itself
    return new CompositeDBCursor<Void>(cursors, true);
    return cursor;
  }
  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
      ServerState startAfterServerState)
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
      throws ChangelogException
  {
    final ServerState domainState = offlineReplicas.getServerState(baseDN);
    if (domainState != null)
    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      for (CSN offlineCSN : domainState)
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
      cursor.addReplicaDB(serverId, lastCSN);
    }
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
        if (serverId == offlineCSN.getServerId()
            && !startAfterServerState.cover(offlineCSN))
        {
          return offlineCSN;
        }
        cursors = new ArrayList<DomainDBCursor>();
        registeredDomainCursors.put(baseDN, cursors);
      }
      cursors.add(cursor);
      return cursor;
    }
  }
  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
  {
    final MultiDomainServerState offlineReplicas =
        replicationEnv.getChangelogState().getOfflineReplicas();
    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
    if (offlineCSN != null
        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
    {
      return offlineCSN;
    }
    return null;
  }
@@ -737,31 +756,57 @@
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
      throws ChangelogException
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      return replicaDB.generateCursorFrom(startAfterCSN);
      final DBCursor<UpdateMsg> cursor =
          replicaDB.generateCursorFrom(startAfterCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);
    }
    return EMPTY_CURSOR;
    return EMPTY_CURSOR_REPLICA_DB;
  }
  /** {@inheritDoc} */
  @Override
  public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
      throws ChangelogException
  public void unregisterCursor(final DBCursor<?> cursor)
  {
    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
        updateMsg.getCSN().getServerId(), replicationServer);
    final JEReplicaDB replicaDB = pair.getFirst();
    final boolean wasCreated = pair.getSecond();
    if (cursor instanceof MultiDomainDBCursor)
    {
      registeredMultiDomainCursors.remove(cursor);
    }
    else if (cursor instanceof DomainDBCursor)
    {
      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
      synchronized (registeredMultiDomainCursors)
      {
        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
        if (cursors != null)
        {
          cursors.remove(cursor);
        }
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
  {
    final CSN csn = updateMsg.getCSN();
    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
        csn.getServerId(), replicationServer);
    final JEReplicaDB replicaDB = pair.getFirst();
    replicaDB.add(updateMsg);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.publishUpdateMsg(baseDN, updateMsg);
    }
    return wasCreated;
    return pair.getSecond(); // replica DB was created
  }
  /** {@inheritDoc} */
@@ -779,7 +824,7 @@
  @Override
  public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
  {
    dbEnv.addOfflineReplica(baseDN, offlineCSN);
    replicationEnv.addOfflineReplica(baseDN, offlineCSN);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {