mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
06.58.2013 473d0d45e6886c0ec3f8569cc934ff8a36d199ae
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -31,6 +31,7 @@
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg.*;
import java.io.IOException;
import java.util.*;
@@ -123,7 +124,7 @@
           "] [searchPhase=" + searchPhase +
           "] [startCookie=" + startCookie +
           "] [previousCookie=" + previousCookie +
       "]]";
           "]]";
  }
  /**
@@ -303,8 +304,7 @@
  private String clDomCtxtsToString(String msg)
  {
    StringBuilder buffer = new StringBuilder();
    buffer.append(msg);
    buffer.append("\n");
    buffer.append(msg).append("\n");
    for (DomainContext domainCtxt : domainCtxts) {
      domainCtxt.toString(buffer);
      buffer.append("\n");
@@ -412,10 +412,8 @@
   * @param startECLSessionMsg the start parameters.
   * @throws DirectoryException when an errors occurs.
   */
  public ECLServerHandler(
      ReplicationServer replicationServer,
      StartECLSessionMsg startECLSessionMsg)
  throws DirectoryException
  public ECLServerHandler(ReplicationServer replicationServer,
      StartECLSessionMsg startECLSessionMsg) throws DirectoryException
  {
    // queueSize is hard coded to 1 else super class hangs for some reason
    this(null, 1, replicationServer, 0);
@@ -534,7 +532,7 @@
  public void initializeCLSearchFromGenState(String crossDomainStartState)
  throws DirectoryException
  {
    initializeCLDomCtxts(crossDomainStartState, false);
    initializeChangelogDomainCtxts(crossDomainStartState, false);
  }
  /**
@@ -545,9 +543,9 @@
  public void initializeCLSearchFromDraftCN(int startDraftCN)
  throws DirectoryException
  {
    String crossDomainStartState;
    try
    {
      String crossDomainStartState;
      draftCompat = true;
      DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler();
@@ -650,7 +648,7 @@
      }
      this.draftCompat = true;
      initializeCLDomCtxts(crossDomainStartState, true);
      initializeChangelogDomainCtxts(crossDomainStartState, true);
    }
    catch(DirectoryException de)
    {
@@ -678,7 +676,7 @@
   *           in the provided cookie.
   * @throws DirectoryException When an error occurs.
   */
  public void initializeCLDomCtxts(String providedCookie,
  public void initializeChangelogDomainCtxts(String providedCookie,
      boolean allowUnknownDomains)
  throws DirectoryException
  {
@@ -739,8 +737,7 @@
          newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate();
          // Assign the start state for the domain
          if (isPersistent ==
            StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
          if (isPersistent == PERSISTENT_CHANGES_ONLY)
          {
            newDomainCtxt.startState = rsd.getEligibleState(eligibleCN);
            startStatesFromProvidedCookie.remove(rsd.getBaseDn());
@@ -776,29 +773,8 @@
              }
              else if (!newDomainCtxt.startState.isEmpty())
              {
                /*
                when the provided startState is older than the replication
                changelogdb startState, it means that the replication
                changelog db has been trimmed and the cookie is not valid
                anymore.
                */
                boolean cookieTooOld = false;
                for (int aServerId : rsd.getStartState())
                if (hasCookieBeenTrimmedFromDB(rsd, newDomainCtxt.startState))
                {
                  ChangeNumber dbOldestChange =
                    rsd.getStartState().getMaxChangeNumber(aServerId);
                  ChangeNumber providedChange =
                    newDomainCtxt.startState.getMaxChangeNumber(aServerId);
                  if (providedChange != null
                      && providedChange.older(dbOldestChange))
                  {
                    cookieTooOld=true;
                  }
                }
                if (cookieTooOld)
                {
                  // the provided start
                  throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                      ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
                          newDomainCtxt.rsd.getBaseDn()));
@@ -869,7 +845,7 @@
        StringBuilder sb = new StringBuilder();
        for (DomainContext domainCtxt : domainCtxts) {
          sb.append(domainCtxt.rsd.getBaseDn()).append(":")
              .append(domainCtxt.startState).append(";");
            .append(domainCtxt.startState).append(";");
        }
        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
            ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
@@ -907,12 +883,35 @@
        " initializeCLDomCtxts ends with " + " " + dumpState());
  }
  private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain,
      ServerState cookie)
  {
    /*
    when the provided startState is older than the replication
    changelogdb startState, it means that the replication
    changelog db has been trimmed and the cookie is not valid
    anymore.
    */
    for (int serverId : rsDomain.getStartState())
    {
      ChangeNumber dbOldestChange =
          rsDomain.getStartState().getMaxChangeNumber(serverId);
      ChangeNumber providedChange = cookie.getMaxChangeNumber(serverId);
      if (providedChange != null
          && providedChange.older(dbOldestChange))
      {
        return true;
      }
    }
    return false;
  }
  /**
   * Registers this handler into its related domain and notifies the domain.
   */
  private void registerIntoDomain()
  {
    if (replicationServerDomain!=null)
    if (replicationServerDomain != null)
      replicationServerDomain.registerHandler(this);
  }
@@ -961,10 +960,8 @@
  @Override
  public String getMonitorInstanceName()
  {
    String str = serverURL + " " + String.valueOf(serverId);
    return "Connected External Changelog Server " + str +
    ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT;
    return "Connected External Changelog Server " + serverURL + " " + serverId
        + ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT;
  }
  /**
@@ -987,21 +984,22 @@
    // TODO:ECL No monitoring exist for ECL.
    return attributes;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    String localString;
    localString = "External changelog Server ";
    final String eclServer = "External changelog Server ";
    if (this.serverId != 0)
      localString += serverId + " " + serverURL + " " + getBaseDN()
       + " " + this.getOperationId();
    else
      localString += this.getClass().getCanonicalName()+ " " + operationId;
    return localString;
    {
      return eclServer + serverId + " " + serverURL + " " + getBaseDN() + " "
          + operationId;
    }
    return eclServer + getClass().getCanonicalName() + " " + operationId;
  }
  /**
   * Gets the status of the connected DS.
   * @return The status of the connected DS.
@@ -1053,18 +1051,7 @@
    replicationServer.disableEligibility(excludedBaseDNs);
    eligibleCN = replicationServer.getEligibleCN();
    if (startECLSessionMsg.getECLRequestType()==
      StartECLSessionMsg.REQUEST_TYPE_FROM_COOKIE)
    {
      initializeCLSearchFromGenState(
          startECLSessionMsg.getCrossDomainServerState());
    }
    else if (startECLSessionMsg.getECLRequestType()==
      StartECLSessionMsg.REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER)
    {
      initializeCLSearchFromDraftCN(
          startECLSessionMsg.getFirstDraftChangeNumber());
    }
    initializeChangelogSearch(startECLSessionMsg);
    if (session != null)
    {
@@ -1095,7 +1082,7 @@
      // TODO:ECL Potential race condition if writer not yet resumed here
    }
    if (isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
    if (isPersistent == PERSISTENT_CHANGES_ONLY)
    {
      closeInitPhase();
    }
@@ -1155,7 +1142,20 @@
          " initialized: " +
          " " + dumpState() + " " +
          " " + clDomCtxtsToString(""));
  }
  private void initializeChangelogSearch(StartECLSessionMsg msg)
      throws DirectoryException
  {
    short requestType = msg.getECLRequestType();
    if (requestType == REQUEST_TYPE_FROM_COOKIE)
    {
      initializeCLSearchFromGenState(msg.getCrossDomainServerState());
    }
    else if (requestType == REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER)
    {
      initializeCLSearchFromDraftCN(msg.getFirstDraftChangeNumber());
    }
  }
  /**
@@ -1169,13 +1169,13 @@
  public ECLUpdateMsg takeECLUpdate()
  throws DirectoryException
  {
    boolean interrupted = true;
    ECLUpdateMsg msg = getNextECLUpdate();
    // TODO:ECL We should refactor so that a SH always have a session
    if (session == null)
      return msg;
    boolean interrupted = true;
    boolean acquired = false;
    do
    {
@@ -1264,8 +1264,8 @@
          // Default is not to loop, with one exception
          continueLooping = false;
          int iDom = getOldestChangeFromDomainCtxts();
          if (iDom == -1)
          DomainContext oldestContext = findOldestChangeFromDomainCtxts();
          if (oldestContext == null)
          { // there is no oldest change to process
            closeInitPhase();
@@ -1274,10 +1274,9 @@
          }
          // Build the ECLUpdateMsg to be returned
          DomainContext oldestContext = domainCtxts[iDom];
          String suffix = oldestContext.rsd.getBaseDn();
          oldestChange = new ECLUpdateMsg(
              (LDAPUpdateMsg)oldestContext.nextMsg,
              (LDAPUpdateMsg) oldestContext.nextMsg,
              null, // cookie will be set later
              suffix,
              0); //  draftChangeNumber may be set later
@@ -1431,19 +1430,16 @@
          "looking for the generalized oldest change");
        for (DomainContext domainCtxt : domainCtxts) {
          // get next msg
          domainCtxt.getNextEligibleMessageForDomain(operationId);
        }
        // take the oldest one
        int iDom = getOldestChangeFromDomainCtxts();
        if (iDom != -1)
        DomainContext oldestContext = findOldestChangeFromDomainCtxts();
        if (oldestContext != null)
        {
          DomainContext oldestContext = domainCtxts[iDom];
          String suffix = oldestContext.rsd.getBaseDn();
          oldestChange = new ECLUpdateMsg(
              (LDAPUpdateMsg)oldestContext.nextMsg,
              (LDAPUpdateMsg) oldestContext.nextMsg,
              null, // set later
              suffix, 0);
          oldestContext.nextMsg = null; // clean
@@ -1528,7 +1524,7 @@
    // go to persistent phase if one
    for (DomainContext domainCtxt : domainCtxts) domainCtxt.active = true;
    if (this.isPersistent != StartECLSessionMsg.NON_PERSISTENT)
    if (this.isPersistent != NON_PERSISTENT)
    {
      // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE
      searchPhase = PERSISTENT_PHASE;
@@ -1554,36 +1550,33 @@
  }
  /**
   * Get the index in the domainCtxt table of the domain with the oldest change.
   * @return the index of the domain with the oldest change, -1 when none.
   * Find the domainCtxt of the domain with the oldest change.
   *
   * @return the domainCtxt of the domain with the oldest change, null when
   *         none.
   */
  private int getOldestChangeFromDomainCtxts()
  private DomainContext findOldestChangeFromDomainCtxts()
  {
    int oldest = -1;
    for (int i=0; i<domainCtxts.length; i++)
    DomainContext oldestCtxt = null;
    for (DomainContext domainCtxt : domainCtxts)
    {
      if (domainCtxts[i].active)
      if (domainCtxt.active
          // .msg is null when the previous (non blocking) nextMessage did
          // not have any eligible msg to return
          && domainCtxt.nextMsg != null
          && (oldestCtxt == null
              || domainCtxt.nextMsg.compareTo(oldestCtxt.nextMsg) < 0))
      {
        // on the first loop, oldest==-1
        // .msg is null when the previous (non blocking) nextMessage did
        // not have any eligible msg to return
        if (domainCtxts[i].nextMsg != null)
        {
          if ((oldest==-1) ||
              (domainCtxts[i].nextMsg.compareTo(domainCtxts[oldest].nextMsg)<0))
          {
            oldest = i;
          }
        }
        oldestCtxt = domainCtxt;
      }
    }
    if (debugEnabled())
      TRACER.debugInfo("In cn=changelog"
          + "," + this + " getOldestChangeFromDomainCtxts() returns " +
          ((oldest!=-1)?domainCtxts[oldest].nextMsg:"-1"));
      TRACER.debugInfo("In cn=changelog," + this
          + " getOldestChangeFromDomainCtxts() returns "
          + ((oldestCtxt != null) ? oldestCtxt.nextMsg : "-1"));
    return oldest;
    return oldestCtxt;
  }
  /**