mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.25.2014 e95d2b4182b468a4e7d206fb11104098beef3b46
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -47,7 +47,10 @@
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg
.ECLRequestType.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg
.Persistent.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -78,7 +81,7 @@
   *
   * @see #getSearchPhase()
   */
  public static int INIT_PHASE = 1;
  private static int INIT_PHASE = 1;
  /**
   * The persistent phase is only used for persistent searches on the External
   * ChangeLog. It comes after the {@link #INIT_PHASE} and sends back to the
@@ -86,30 +89,17 @@
   */
  private static int PERSISTENT_PHASE = 2;
  /**
   * This is a string identifying the operation, provided by the client part of
   * the ECL, used to help interpretation of messages logged.
   */
  private String operationId;
  private StartECLSessionMsg startECLSessionMsg;
  /** Cursor on the {@link ChangeNumberIndexDB}. */
  private DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor;
  private boolean draftCompat = false;
  /**
   * Specifies the last changer number requested.
   */
  private long lastChangeNumber = 0;
  /**
   * Specifies whether the change number db has been read until its end.
   */
  private boolean isEndOfCNIndexDBReached = false;
  /**
   * Specifies whether the current search has been requested to be persistent
   * or not.
   */
  private short isPersistent;
  /**
   * Specifies the current search phase : INIT or PERSISTENT.
   */
  private int searchPhase = INIT_PHASE;
@@ -124,10 +114,6 @@
   * (thus becoming the "current" cookie just before the change is returned.
   */
  private MultiDomainServerState previousCookie = new MultiDomainServerState();
  /**
   * Specifies the excluded DNs (like cn=admin, ...).
   */
  private Set<String> excludedBaseDNs = new HashSet<String>();
  /**
   * Eligible CSN - only changes older or equal to eligibleCSN are published in
@@ -144,13 +130,13 @@
   * Provides a string representation of this object.
   * @return the string representation.
   */
  public String dumpState()
  private String dumpState()
  {
    return getClass().getCanonicalName() +
           "[" +
           "[draftCompat=" + draftCompat +
           "] [persistent=" + isPersistent +
           "] [startChangeNumber=" + lastChangeNumber +
           "] [persistent=" + startECLSessionMsg.getPersistent() +
           "] [startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() +
           "] [isEndOfCNIndexDBReached=" + isEndOfCNIndexDBReached +
           "] [searchPhase=" + searchPhase +
           "] [startCookie=" + startCookie +
@@ -716,7 +702,7 @@
      // Initializes each and every domain with the next(first) eligible message
      // from the domain.
      for (DomainContext domainCtxt : domainCtxts) {
        domainCtxt.computeNextEligibleMessageForDomain(operationId);
        domainCtxt.computeNextEligibleMessageForDomain(getOperationId());
        if (domainCtxt.nextMsg == null)
          domainCtxt.active = false;
@@ -766,6 +752,7 @@
        continue;
      // skip the excluded domains
      Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs();
      if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString()))
      {
        // this is an excluded domain
@@ -786,7 +773,7 @@
      newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate();
      // Assign the start state for the domain
      if (isPersistent == PERSISTENT_CHANGES_ONLY)
      if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY)
      {
        newDomainCtxt.startState = latestState;
        startStatesFromProvidedCookie.remove(domain.getBaseDN());
@@ -999,9 +986,9 @@
    if (this.serverId != 0)
    {
      return eclServer + serverId + " " + serverURL + " "
          + getBaseDNString() + " " + operationId;
          + getBaseDNString() + " " + getOperationId();
    }
    return eclServer + getClass().getCanonicalName() + " " + operationId;
    return eclServer + getClass().getCanonicalName() + " " + getOperationId();
  }
  /**
@@ -1032,10 +1019,8 @@
  private void initialize(StartECLSessionMsg startECLSessionMsg)
      throws DirectoryException
  {
    this.operationId = startECLSessionMsg.getOperationId();
    this.startECLSessionMsg = startECLSessionMsg;
    isPersistent  = startECLSessionMsg.isPersistent();
    lastChangeNumber = startECLSessionMsg.getLastChangeNumber();
    searchPhase   = INIT_PHASE;
    final String cookie = startECLSessionMsg.getCrossDomainServerState();
    try
@@ -1049,9 +1034,6 @@
          ERR_INVALID_COOKIE_SYNTAX.get(cookie));
    }
    excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs();
    refreshEligibleCSN();
    initializeChangelogSearch(startECLSessionMsg);
    if (session != null)
@@ -1081,7 +1063,7 @@
      // TODO:ECL Potential race condition if writer not yet resumed here
    }
    if (isPersistent == PERSISTENT_CHANGES_ONLY)
    if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY)
    {
      closeInitPhase();
    }
@@ -1089,7 +1071,7 @@
    registerIntoDomain();
    if (debugEnabled())
      TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId
      TRACER.debugInfo(getClass().getCanonicalName() + " " + getOperationId()
          + " initialized: " + " " + dumpState() + " " + " "
          + domaimCtxtsToString(""));
  }
@@ -1097,12 +1079,13 @@
  private void initializeChangelogSearch(StartECLSessionMsg msg)
      throws DirectoryException
  {
    short requestType = msg.getECLRequestType();
    if (requestType == REQUEST_TYPE_FROM_COOKIE)
    refreshEligibleCSN();
    if (msg.getECLRequestType() == REQUEST_TYPE_FROM_COOKIE)
    {
      initializeCLSearchFromCookie(msg.getCrossDomainServerState());
    }
    else if (requestType == REQUEST_TYPE_FROM_CHANGE_NUMBER)
    else if (msg.getECLRequestType() == REQUEST_TYPE_FROM_CHANGE_NUMBER)
    {
      initializeCLSearchFromChangeNumber(msg.getFirstChangeNumber());
    }
@@ -1240,7 +1223,7 @@
        }
        if (oldestContext.active)
        {
          oldestContext.computeNextEligibleMessageForDomain(operationId);
          oldestContext.computeNextEligibleMessageForDomain(getOperationId());
        }
        oldestChange = change;
      }
@@ -1253,7 +1236,7 @@
                  + "looking for the generalized oldest change"));
        for (DomainContext domainCtxt : domainCtxts) {
          domainCtxt.computeNextEligibleMessageForDomain(operationId);
          domainCtxt.computeNextEligibleMessageForDomain(getOperationId());
        }
        final DomainContext oldestContext = findDomainCtxtWithOldestChange();
@@ -1292,6 +1275,7 @@
  private boolean isBeyondLastRequestedChangeNumber(final ECLUpdateMsg change)
  {
    final long lastChangeNumber = startECLSessionMsg.getLastChangeNumber();
    return draftCompat
        && 0 < lastChangeNumber
        && lastChangeNumber < change.getChangeNumber();
@@ -1431,7 +1415,7 @@
    // go to persistent phase if one
    for (DomainContext domainCtxt : domainCtxts) domainCtxt.active = true;
    if (this.isPersistent != NON_PERSISTENT)
    if (startECLSessionMsg.getPersistent() != NON_PERSISTENT)
    {
      // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE
      searchPhase = PERSISTENT_PHASE;
@@ -1488,23 +1472,27 @@
   */
  public String getOperationId()
  {
    return operationId;
    return startECLSessionMsg.getOperationId();
  }
  /**
   * Getter for the persistent property of the current search.
   * @return Whether the current search is persistent or not.
   * Returns whether the current search is a persistent search .
   *
   * @return true if the current search is a persistent search, false otherwise
   */
  public short isPersistent() {
    return this.isPersistent;
  boolean isNonPersistent()
  {
    return startECLSessionMsg.getPersistent() == NON_PERSISTENT;
  }
  /**
   * Getter for the current search phase (INIT or PERSISTENT).
   * @return Whether the current search is persistent or not.
   * Returns whether the initialization phase has completed.
   *
   * @return true the initialization phase has completed, false otherwise
   */
  public int getSearchPhase() {
    return this.searchPhase;
  boolean isInitPhaseDone()
  {
    return this.searchPhase != INIT_PHASE;
  }
  /**
@@ -1512,6 +1500,7 @@
   */
  private void refreshEligibleCSN()
  {
    Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs();
    eligibleCSN = replicationServer.getEligibleCSN(excludedBaseDNs);
  }