mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.04.2014 938dda347b7223b73a1c5d5c47c8674ecdd90102
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -87,6 +88,8 @@
      new HashMap<DN, List<DomainDBCursor>>();
  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
      new CopyOnWriteArrayList<MultiDomainDBCursor>();
  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
  private ReplicationDbEnv replicationEnv;
  private final ReplicationServerCfg config;
  private final File dbDirectory;
@@ -720,7 +723,7 @@
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
  private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
@@ -751,21 +754,31 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
      PositionStrategy positionStrategy) throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
        + " is not supported for the JE implementation fo changelog");
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor =
          replicaDB.generateCursorFrom(startCSN);
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);
      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
      synchronized (replicaCursors)
      {
        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
        if (cursors == null)
        {
          cursors = new ArrayList<ReplicaCursor>();
          replicaCursors.put(replicaID, cursors);
        }
        cursors.add(replicaCursor);
      }
      return replicaCursor;
    }
    return EMPTY_CURSOR_REPLICA_DB;
  }
@@ -790,6 +803,15 @@
        }
      }
    }
    else if (cursor instanceof ReplicaCursor)
    {
      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
      final List<ReplicaCursor> cursors =  replicaCursors.get(replicaCursor.getReplicaID());
      if (cursors != null)
      {
        cursors.remove(cursor);
      }
    }
  }
  /** {@inheritDoc} */
@@ -831,6 +853,19 @@
    {
      indexer.replicaOffline(baseDN, offlineCSN);
    }
    updateCursorsWithOfflineCSN(baseDN, offlineCSN);
  }
  private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
  {
    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
    if (cursors != null && !cursors.isEmpty())
    {
      for (ReplicaCursor cursor : cursors)
      {
        cursor.setOfflineCSN(offlineCSN);
      }
    }
  }
  /**