mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
21.06.2014 ab3cac04319c920ba14be59ea874e6e35f730655
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -46,6 +46,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.je.DomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
@@ -60,6 +61,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -122,7 +124,7 @@
  private final AtomicBoolean shutdown = new AtomicBoolean();
  static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
      new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null);
      new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY);
  /**
   * Creates a new changelog DB.
@@ -658,37 +660,38 @@
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
      cursor.addDomain(baseDN, startState.getServerState(baseDN));
    }
    return cursor;
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
      throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
    final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
      cursor.addReplicaDB(serverId, lastCSN);
    }
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN)
  private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
@@ -715,15 +718,14 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
      throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
      PositionStrategy positionStrategy) throws ChangelogException
  {
    final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor =
          replicaDB.generateCursorFrom(startAfterCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);