mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.25.2014 b6ccb560e9056cc9c028812f5f63ff2e80c95c87
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -45,7 +46,8 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.je.CompositeDBCursor;
import org.opends.server.replication.server.changelog.je.DomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
@@ -64,6 +66,7 @@
 */
public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
{
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
@@ -77,11 +80,18 @@
   * <li>then check it's not null</li>
   * <li>then close all inside</li>
   * </ol>
   * When creating a FileReplicaDB, synchronize on the domainMap to avoid
   * When creating a replicaDB, synchronize on the domainMap to avoid
   * concurrent shutdown.
   */
  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>>
      domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
  /**
   * \@GuardedBy("itself")
   */
  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
      new HashMap<DN, List<DomainDBCursor>>();
  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
      new CopyOnWriteArrayList<MultiDomainDBCursor>();
  private ReplicationEnvironment replicationEnv;
  private final ReplicationServerCfg config;
  private final File dbDirectory;
@@ -124,10 +134,10 @@
   *           if a problem occurs opening the supplied directory
   */
  public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
     throws ConfigException
      throws ConfigException
  {
    this.replicationServer = replicationServer;
    this.config = config;
    this.replicationServer = replicationServer;
    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
  }
@@ -175,8 +185,7 @@
   *          the serverId for which to create a ReplicaDB
   * @param server
   *          the ReplicationServer
   * @return a Pair with the FileReplicaDB and a boolean indicating whether it had
   *         to be created
   * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created
   * @throws ChangelogException
   *           if a problem occurred with the database
   */
@@ -189,6 +198,19 @@
      final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
      if (result != null)
      {
        final Boolean dbWasCreated = result.getSecond();
        if (dbWasCreated)
        { // new replicaDB => update all cursors with it
          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
          if (cursors != null && !cursors.isEmpty())
          {
            for (DomainDBCursor cursor : cursors)
            {
              cursor.addReplicaDB(serverId, null);
            }
          }
        }
        return result;
      }
    }
@@ -214,20 +236,26 @@
      // there was already a value associated to the key, let's use it
      return previousValue;
    }
    // we just created a new domain => update all cursors
    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
    {
      cursor.addDomain(baseDN, null);
    }
    return newValue;
  }
  private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
  {
    // happy path: the FileReplicaDB already exists
    // happy path: the replicaDB already exists
    FileReplicaDB currentValue = domainMap.get(serverId);
    if (currentValue != null)
    {
      return Pair.of(currentValue, false);
    }
    // unlucky, the FileReplicaDB does not exist: take the hit and synchronize
    // unlucky, the replicaDB does not exist: take the hit and synchronize
    // on the domainMap to create a new ReplicaDB
    synchronized (domainMap)
    {
@@ -242,7 +270,7 @@
        // The domainMap could have been concurrently removed because
        // 1) a shutdown was initiated or 2) an initialize was called.
        // Return will allow the code to:
        // 1) shutdown properly or 2) lazily recreate the FileReplicaDB
        // 1) shutdown properly or 2) lazily recreate the replicaDB
        return null;
      }
@@ -371,6 +399,7 @@
      {
        // do nothing: we are already shutting down
      }
      replicationEnv.shutdown();
    }
@@ -381,10 +410,10 @@
  }
  /**
   * Clears all records from the changelog (does not remove the log itself).
   * Clears all records from the changelog (does not remove the changelog itself).
   *
   * @throws ChangelogException
   *           If an error occurs when clearing the log.
   *           If an error occurs when clearing the changelog.
   */
  public void clearDB() throws ChangelogException
  {
@@ -629,40 +658,57 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
      throws ChangelogException
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
  {
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final MultiDomainServerState offlineReplicas = replicationEnv.getChangelogState().getOfflineReplicas();
    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    for (int serverId : serverIds)
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      replicaDBCursor.next();
      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
    }
    // recycle exhausted cursors,
    // because client code will not manage the cursors itself
    return new CompositeDBCursor<Void>(cursors, true);
    return cursor;
  }
  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
      ServerState startAfterServerState)
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
      throws ChangelogException
  {
    final ServerState domainState = offlineReplicas.getServerState(baseDN);
    if (domainState != null)
    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      for (CSN offlineCSN : domainState)
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
      cursor.addReplicaDB(serverId, lastCSN);
    }
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
        if (serverId == offlineCSN.getServerId()
            && !startAfterServerState.cover(offlineCSN))
        {
          return offlineCSN;
        }
        cursors = new ArrayList<DomainDBCursor>();
        registeredDomainCursors.put(baseDN, cursors);
      }
      cursors.add(cursor);
      return cursor;
    }
  }
  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
  {
    final MultiDomainServerState offlineReplicas =
        replicationEnv.getChangelogState().getOfflineReplicas();
    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
    if (offlineCSN != null
        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
    {
      return offlineCSN;
    }
    return null;
  }
@@ -675,28 +721,55 @@
    final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      return replicaDB.generateCursorFrom(startAfterCSN);
      final DBCursor<UpdateMsg> cursor =
          replicaDB.generateCursorFrom(startAfterCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);
    }
    return EMPTY_CURSOR_REPLICA_DB;
  }
  /** {@inheritDoc} */
  @Override
  public void unregisterCursor(final DBCursor<?> cursor)
  {
    if (cursor instanceof MultiDomainDBCursor)
    {
      registeredMultiDomainCursors.remove(cursor);
    }
    else if (cursor instanceof DomainDBCursor)
    {
      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
      synchronized (registeredMultiDomainCursors)
      {
        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
        if (cursors != null)
        {
          cursors.remove(cursor);
        }
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
  {
    final CSN csn = updateMsg.getCSN();
    final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
        updateMsg.getCSN().getServerId(), replicationServer);
        csn.getServerId(), replicationServer);
    final FileReplicaDB replicaDB = pair.getFirst();
    final boolean wasCreated = pair.getSecond();
    replicaDB.add(updateMsg);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
      indexer.publishUpdateMsg(baseDN, updateMsg);
    }
    return wasCreated;
    return pair.getSecond(); // replica DB was created
  }
  /** {@inheritDoc} */