mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.26.2013 07347aac048a14dc5fa1a5c1d02426230622051d
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -69,44 +69,44 @@
  /**
   * Specifies the last draft changer number (seqnum) requested.
   */
  public int lastDraftCN = 0;
  private int lastDraftCN = 0;
  /**
   * Specifies whether the draft change number (seqnum) db has been read until
   * its end.
   */
  public boolean isEndOfDraftCNReached = false;
  private boolean isEndOfDraftCNReached = false;
  /**
   * Specifies whether the current search has been requested to be persistent
   * or not.
   */
  public short isPersistent;
  private short isPersistent;
  /**
   * Specifies the current search phase : INIT or PERSISTENT.
   */
  public int searchPhase = INIT_PHASE;
  private int searchPhase = INIT_PHASE;
  /**
   * Specifies the cookie contained in the request, specifying where
   * to start serving the ECL.
   */
  public String startCookie;
  private String startCookie;
  /**
   * Specifies the value of the cookie before the change currently processed
   * is returned. It is updated with the change number of the change
   * currently processed (thus becoming the "current" cookie just
   * before the change is returned.
   */
  public MultiDomainServerState previousCookie =
  private MultiDomainServerState previousCookie =
    new MultiDomainServerState();
  /**
   * Specifies the excluded DNs (like cn=admin, ...).
   */
  public Set<String> excludedServiceIDs = new HashSet<String>();
  private Set<String> excludedBaseDNs = new HashSet<String>();
  /**
   * Eligible changeNumber - only changes older or equal to eligibleCN
   * are published in the ECL.
   */
  public ChangeNumber eligibleCN = null;
  private ChangeNumber eligibleCN = null;
  /**
   * Provides a string representation of this object.
@@ -136,19 +136,23 @@
   */
  private class DomainContext
  {
    ReplicationServerDomain rsd;
    private ReplicationServerDomain rsd;
    boolean active;              // active when there are still changes
    // supposed eligible for the ECL.
    /**
     * active when there are still changes supposed eligible for the ECL.
     */
    private boolean active;
    MessageHandler mh;           // the message handler from which are read
    // the changes for this domain
    /**
     * the message handler from which are reading the changes for this domain.
     */
    private MessageHandler mh;
    private UpdateMsg nextMsg;
    private UpdateMsg nextNonEligibleMsg;
    ServerState startState;
    ServerState currentState;
    ServerState stopState;
    long domainLatestTrimDate;
    private ServerState startState;
    private ServerState currentState;
    private ServerState stopState;
    private long domainLatestTrimDate;
    /**
     * {@inheritDoc}
@@ -166,14 +170,16 @@
     */
    public void toString(StringBuilder buffer)
    {
      buffer.append("[ [active=").append(active).append("] [rsd=")
          .append(rsd).append("] [nextMsg=").append(nextMsg).append("(")
      buffer.append("[ [active=").append(active)
          .append("] [rsd=").append(rsd)
          .append("] [nextMsg=").append(nextMsg).append("(")
          .append(nextMsg != null ?
          new Date(nextMsg.getChangeNumber().getTime()).toString():"")
          .append(")")
          .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
          .append("] [startState=").append(startState).append("] [stopState=")
          .append(stopState).append("] [currentState=").append(currentState)
          .append("] [startState=").append(startState)
          .append("] [stopState=").append(stopState)
          .append("] [currentState=").append(currentState)
          .append("]]");
    }
@@ -185,7 +191,7 @@
    private void getNextEligibleMessageForDomain(String opid)
    {
      if (debugEnabled())
        TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
        TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() +
          " getNextEligibleMessageForDomain(" + opid+ ") "
          + "ctxt=" + toString());
@@ -202,7 +208,7 @@
                <= eligibleCN.getTime());
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() +
                " getNextEligibleMessageForDomain(" + opid+ ") "
              + " stored nonEligibleMsg " + nextNonEligibleMsg
              + " has now become eligible regarding "
@@ -232,10 +238,10 @@
              (newMsg.getChangeNumber().getTime() < domainLatestTrimDate));
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() +
                " getNextEligibleMessageForDomain(" + opid+ ") "
                + " got new message : "
                +  " serviceId=[" + mh.getServiceId()
                +  " serviceId=[" + mh.getBaseDN()
                + "] [newMsg=" + newMsg + "]" + dumpState());
          // in non blocking mode, return null when no more msg
@@ -245,7 +251,7 @@
                <= eligibleCN.getTime());
          if (debugEnabled())
              TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId()
              TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN()
                + " getNextEligibleMessageForDomain(" + opid+ ") "
                + "newMsg isEligible=" + isEligible + " since "
                + "newMsg=[" + newMsg.getChangeNumber()
@@ -289,8 +295,10 @@
    }
  }
  // The global list of contexts by domain for the search currently processed.
  DomainContext[] domainCtxts = new DomainContext[0];
  /**
   * The global list of contexts by domain for the search currently processed.
   */
  private DomainContext[] domainCtxts = new DomainContext[0];
  private String clDomCtxtsToString(String msg)
  {
@@ -304,9 +312,9 @@
    return buffer.toString();
  }
  static int UNDEFINED_PHASE = 0;
  static int INIT_PHASE = 1;
  static int PERSISTENT_PHASE = 2;
  private static int UNDEFINED_PHASE = 0;
  private static int INIT_PHASE = 1;
  private static int PERSISTENT_PHASE = 2;
  /**
   * Starts this handler based on a start message received from remote server.
@@ -355,7 +363,7 @@
    {
      // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
      startMsg = new ReplServerStartMsg(replicationServerId,
          replicationServerURL, getServiceId(), maxRcvWindow,
          replicationServerURL, getBaseDN(), maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServerDomain.getReplicationServer()
@@ -365,7 +373,7 @@
    {
      // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
      startMsg = new ReplServerStartDSMsg(replicationServerId,
          replicationServerURL, getServiceId(), maxRcvWindow,
          replicationServerURL, getBaseDN(), maxRcvWindow,
          new ServerState(), localGenerationId, sslEncryption,
          getLocalGroupId(), 0, replicationServer.getWeight(), 0);
    }
@@ -395,14 +403,12 @@
        replicationServer, rcvWindowSize);
    try
    {
      setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true);
      setBaseDNAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true);
    }
    catch(DirectoryException de)
    {
      // no chance to have a bad domain set here
    }
  }
  /**
@@ -425,7 +431,7 @@
        replicationServer, 0);
    try
    {
      setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true);
      setBaseDNAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true);
    }
    catch(DirectoryException de)
    {
@@ -610,7 +616,7 @@
          // the operation) in order to have the first and possible last
          // DraftCN.
          int[] limits = replicationServer.getECLDraftCNLimits(
              eligibleCN, excludedServiceIDs);
              eligibleCN, excludedBaseDNs);
          // If the startDraftCN provided is lower than the first Draft CN in
          // the DB, let's use the lower limit.
@@ -732,7 +738,7 @@
            continue;
          // skip the excluded domains
          if (excludedServiceIDs.contains(rsd.getBaseDn()))
          if (excludedBaseDNs.contains(rsd.getBaseDn()))
          {
            // this is an excluded domain
            if (allowUnknownDomains)
@@ -826,10 +832,8 @@
          // Creates an unconnected SH for the domain
          MessageHandler mh = new MessageHandler(maxQueueSize,
              replicationServerURL, replicationServerId, replicationServer);
          // set initial state
          mh.setInitialServerState(newDomainCtxt.startState);
          // set serviceID and domain
          mh.setServiceIdAndDomain(rsd.getBaseDn(), false);
          mh.setBaseDNAndDomain(rsd.getBaseDn(), false);
          // register the unconnected into the domain
          rsd.registerHandler(mh);
          newDomainCtxt.mh = mh;
@@ -1011,7 +1015,7 @@
    String localString;
    localString = "External changelog Server ";
    if (this.serverId != 0)
      localString += serverId + " " + serverURL + " " + getServiceId()
      localString += serverId + " " + serverURL + " " + getBaseDN()
       + " " + this.getOperationId();
    else
      localString += this.getClass().getCanonicalName()+ " " + operationId;
@@ -1064,8 +1068,8 @@
          ERR_INVALID_COOKIE_SYNTAX.get());
    }
    excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs();
    replicationServer.disableEligibility(excludedServiceIDs);
    excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs();
    replicationServer.disableEligibility(excludedBaseDNs);
    eligibleCN = replicationServer.getEligibleCN();
    if (startECLSessionMsg.getECLRequestType()==
@@ -1327,7 +1331,7 @@
                // the next change from the DraftCN db
                ChangeNumber cnFromDraftCNDb = draftCNDbIter.getChangeNumber();
                String dnFromDraftCNDb = draftCNDbIter.getServiceID();
                String dnFromDraftCNDb = draftCNDbIter.getBaseDN();
                // are replogcn and DraftCNcn should be the same change ?
                int areCNEqual = cnFromChangelogDb.compareTo(cnFromDraftCNDb);
@@ -1395,7 +1399,7 @@
                        draftCNDb.add(
                            oldestChange.getDraftChangeNumber(),
                            previousCookie.toString(),
                            oldestChange.getServiceId(),
                            oldestChange.getBaseDN(),
                            oldestChange.getUpdateMsg().getChangeNumber());
                        break;
@@ -1528,7 +1532,7 @@
      // Update the current state
      previousCookie.update(
          oldestChange.getServiceId(),
          oldestChange.getBaseDN(),
          oldestChange.getUpdateMsg().getChangeNumber());
      // Set the current value of global state in the returned message
@@ -1566,7 +1570,6 @@
        writer = new ECLServerWriter(session,this,replicationServerDomain);
        writer.start();  // start suspended
      }
    }
    else
    {