mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
25.26.2014 881c7d3d5c53debdd4a1bb5f63146f0f73d56957
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -27,6 +27,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.Map.Entry;
@@ -39,7 +40,6 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -47,7 +47,6 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -73,6 +72,7 @@
  private final ChangelogDB changelogDB;
  /** Only used for initialization, and then discarded. */
  private ChangelogState changelogState;
  private final ECLEnabledDomainPredicate predicate;
  /*
   * The following MultiDomainServerState fields must be thread safe, because
@@ -121,7 +121,7 @@
   *
   * @NonNull
   */
  private MultiDomainDBCursor nextChangeForInsertDBCursor;
  private ECLMultiDomainDBCursor nextChangeForInsertDBCursor;
  /**
   * Builds a ChangeNumberIndexer object.
@@ -133,9 +133,26 @@
   */
  public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
  {
    this(changelogDB, changelogState, new ECLEnabledDomainPredicate());
  }
  /**
   * Builds a ChangeNumberIndexer object.
   *
   * @param changelogDB
   *          the changelogDB
   * @param changelogState
   *          the changelog state used for initialization
   * @param predicate
   *          tells whether a domain is enabled for the external changelog
   */
  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState,
      ECLEnabledDomainPredicate predicate)
  {
    super("Change number indexer");
    this.changelogDB = changelogDB;
    this.changelogState = changelogState;
    this.predicate = predicate;
  }
  /**
@@ -148,7 +165,7 @@
   */
  public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
  {
    if (!isECLEnabledDomain(baseDN))
    if (!predicate.isECLEnabledDomain(baseDN))
    {
      return;
    }
@@ -186,7 +203,7 @@
  public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
      throws ChangelogException
  {
    if (!isECLEnabledDomain(baseDN))
    if (!predicate.isECLEnabledDomain(baseDN))
    {
      return;
    }
@@ -197,24 +214,6 @@
  }
  /**
   * Returns whether the provided baseDN represents a replication domain enabled
   * for the external changelog.
   * <p>
   * This method is a test seam that break the dependency on a static method.
   *
   * @param baseDN
   *          the replication domain to check
   * @return true if the provided baseDN is enabled for the external changelog,
   *         false if the provided baseDN is disabled for the external changelog
   *         or unknown to multimaster replication.
   * @see MultimasterReplication#isECLEnabledDomain(DN)
   */
  protected boolean isECLEnabledDomain(DN baseDN)
  {
    return MultimasterReplication.isECLEnabledDomain(baseDN);
  }
  /**
   * Signals a replica went offline.
   *
   * @param baseDN
@@ -224,7 +223,7 @@
   */
  public void replicaOffline(DN baseDN, CSN offlineCSN)
  {
    if (!isECLEnabledDomain(baseDN))
    if (!predicate.isECLEnabledDomain(baseDN))
    {
      return;
    }
@@ -337,7 +336,7 @@
    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
      if (isECLEnabledDomain(baseDN))
      if (predicate.isECLEnabledDomain(baseDN))
      {
        for (Integer serverId : entry.getValue())
        {
@@ -353,7 +352,8 @@
      }
    }
    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY);
    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate,
        domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY));
    nextChangeForInsertDBCursor.next();
    if (newestRecord != null)
@@ -384,7 +384,7 @@
    {
      for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
      {
        if (isECLEnabledDomain(baseDN))
        if (predicate.isECLEnabledDomain(baseDN))
        {
          replicasOffline.update(baseDN, offlineCSN);
          // a replica offline message could also be the very last time