mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
07.40.2013 82f5228d84de25cd2ea7d99e9880a8c11971e743
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -50,6 +50,7 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class defines a server handler, which handles all interaction with a
@@ -58,6 +59,11 @@
public final class ECLServerHandler extends ServerHandler
{
  private static int UNDEFINED_PHASE = 0;
  /** TODO JNR. */
  public static int INIT_PHASE = 1;
  private static int PERSISTENT_PHASE = 2;
  /**
   * This is a string identifying the operation, provided by the client part of
   * the ECL, used to help interpretation of messages logged.
@@ -108,6 +114,11 @@
  private CSN eligibleCSN;
  /**
   * The global list of contexts by domain for the search currently processed.
   */
  private DomainContext[] domainCtxts = new DomainContext[0];
  /**
   * Provides a string representation of this object.
   * @return the string representation.
   */
@@ -172,8 +183,7 @@
      buffer.append("[ [active=").append(active)
          .append("] [rsd=").append(rsd)
          .append("] [nextMsg=").append(nextMsg).append("(")
          .append(nextMsg != null ?
          new Date(nextMsg.getCSN().getTime()).toString():"")
          .append(nextMsg != null ? asDate(nextMsg.getCSN()).toString() : "")
          .append(")")
          .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
          .append("] [startState=").append(startState)
@@ -183,16 +193,15 @@
    }
    /**
     * Get the next message eligible regarding
     * the crossDomain eligible CSN. Put it in the context table.
     * @param opid The operation id.
     * Computes the next message eligible regarding the crossDomain eligible
     * CSN.
     *
     * @param opId The operation id.
     */
    private void getNextEligibleMessageForDomain(String opid)
    private void computeNextEligibleMessageForDomain(String opId)
    {
      if (debugEnabled())
        TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDNString() +
          " getNextEligibleMessageForDomain(" + opid+ ") "
          + "ctxt=" + toString());
        debugInfo(opId, "ctxt=" + this);
      assert(nextMsg == null);
      try
@@ -202,22 +211,15 @@
        // not eligible
        if (nextNonEligibleMsg != null)
        {
          boolean hasBecomeEligible =
            (nextNonEligibleMsg.getCSN().getTime()
                <= eligibleCSN.getTime());
          final boolean hasBecomeEligible = isEligible(nextNonEligibleMsg);
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for "
              + mh.getBaseDNString()
              + " getNextEligibleMessageForDomain(" + opid + ") "
              + " stored nonEligibleMsg " + nextNonEligibleMsg
              + " has now become eligible regarding "
              + " the eligibleCSN ("+ eligibleCSN
              + " ):" + hasBecomeEligible);
            debugInfo(opId, "stored nonEligibleMsg " + nextNonEligibleMsg
                + " has now become eligible regarding the eligibleCSN ("
                + eligibleCSN + " ): " + hasBecomeEligible);
          if (hasBecomeEligible)
          {
            // it is now eligible
            nextMsg = nextNonEligibleMsg;
            nextNonEligibleMsg = null;
          }
@@ -226,50 +228,30 @@
        else
        {
          // Here comes a new message !!!
          // non blocking
          UpdateMsg newMsg;
          do {
            newMsg = mh.getNextMessage(false);
            // when the replication changelog is trimmed, the last (latest) chg
            // is left in the db (whatever its age), and we don't want this chg
            // to be returned in the external changelog.
            // So let's check if the chg time is older than the trim date
          } while ((newMsg!=null) &&
              (newMsg.getCSN().getTime() < domainLatestTrimDate));
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for "
                + mh.getBaseDNString()
                + " getNextEligibleMessageForDomain(" + opid + ") "
                + " got new message : "
                +  " baseDN=[" + mh.getBaseDNString()
                + "] [newMsg=" + newMsg + "]" + dumpState());
          // in non blocking mode, return null when no more msg
          if (newMsg != null)
          final UpdateMsg newMsg = getNextMessage();
          if (newMsg == null)
          {
            boolean isEligible = (newMsg.getCSN().getTime()
                <= eligibleCSN.getTime());
            return;
          }
          if (debugEnabled())
              TRACER.debugInfo(" In ECLServerHandler, for "
                + mh.getBaseDNString()
                + " getNextEligibleMessageForDomain(" + opid+ ") "
                + "newMsg isEligible=" + isEligible + " since "
                + "newMsg=[" + newMsg.getCSN()
                + " " + new Date(newMsg.getCSN().getTime())
                + "] eligibleCSN=[" + eligibleCSN
                + " " + new Date(eligibleCSN.getTime())+"]"
            debugInfo(opId, "got new message : [newMsg=" + newMsg + "] "
                + dumpState());
            if (isEligible)
            {
              nextMsg = newMsg;
            }
            else
            {
              nextNonEligibleMsg = newMsg;
            }
          final boolean isEligible = isEligible(newMsg);
          if (debugEnabled())
            debugInfo(opId, "newMsg isEligible=" + isEligible + " since "
                + "newMsg=[" + toString(newMsg.getCSN()) + "] eligibleCSN=["
                + toString(eligibleCSN) + "] " + dumpState());
          if (isEligible)
          {
            nextMsg = newMsg;
          }
          else
          {
            nextNonEligibleMsg = newMsg;
          }
        }
      }
@@ -279,6 +261,44 @@
      }
    }
    private boolean isEligible(UpdateMsg msg)
    {
      return msg.getCSN().getTime() <= eligibleCSN.getTime();
    }
    private UpdateMsg getNextMessage()
    {
      while (true)
      {
        final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */);
        if (newMsg == null)
        { // in non blocking mode, null means no more messages
          return null;
        }
        else if (newMsg.getCSN().getTime() < domainLatestTrimDate)
        {
          // when the replication changelog is trimmed, the last (latest) chg
          // is left in the db (whatever its age), and we don't want this chg
          // to be returned in the external changelog.
          // So let's check if the chg time is older than the trim date
          return newMsg;
        }
      }
    }
    private String toString(CSN csn)
    {
      return csn + " " + asDate(csn);
    }
    private void debugInfo(String opId, String message)
    {
      TRACER.debugInfo("In ECLServerHandler, for baseDN="
          + mh.getBaseDNString() + " getNextEligibleMessageForDomain(" + opId
          + ") " + message);
    }
    /**
     * Unregister the handler from the DomainContext ReplicationDomain.
     * @return Whether the handler has been unregistered with success.
@@ -297,11 +317,6 @@
    }
  }
  /**
   * The global list of contexts by domain for the search currently processed.
   */
  private DomainContext[] domainCtxts = new DomainContext[0];
  private String clDomCtxtsToString(String msg)
  {
    StringBuilder buffer = new StringBuilder();
@@ -313,10 +328,6 @@
    return buffer.toString();
  }
  private static int UNDEFINED_PHASE = 0;
  private static int INIT_PHASE = 1;
  private static int PERSISTENT_PHASE = 2;
  /**
   * Starts this handler based on a start message received from remote server.
   * @param inECLStartMsg The start msg provided by the remote server.
@@ -509,13 +520,18 @@
  /**
   * Initialize the handler from a provided cookie value.
   * @param crossDomainStartState The provided cookie value.
   * @throws DirectoryException When an error is raised.
   *
   * @param providedCookie
   *          The provided cookie value.
   * @throws DirectoryException
   *           When an error is raised.
   */
  private void initializeCLSearchFromGenState(String crossDomainStartState)
  private void initializeCLSearchFromCookie(String providedCookie)
      throws DirectoryException
  {
    initializeChangelogDomainCtxts(crossDomainStartState, false);
    this.draftCompat = false;
    initializeChangelogDomainCtxts(providedCookie, false);
  }
  /**
@@ -684,12 +700,9 @@
      // Creates the table that will contain the real-time info for each
      // and every domain.
      Set<DomainContext> tmpSet = new HashSet<DomainContext>();
      String missingDomains = "";
      for (Iterator<ReplicationServerDomain> iter = rs.getDomainIterator();
           iter.hasNext();)
      final StringBuilder missingDomains = new StringBuilder();
      for (ReplicationServerDomain rsd : toIterable(rs.getDomainIterator()))
      {
        ReplicationServerDomain rsd = iter.next();
        // skip the 'unreal' changelog domain
        if (rsd == this.replicationServerDomain)
          continue;
@@ -704,11 +717,12 @@
        }
        // skip unused domains
        if (rsd.getLatestServerState().isEmpty())
        final ServerState latestServerState = rsd.getLatestServerState();
        if (latestServerState.isEmpty())
          continue;
        // Creates the new domain context
        DomainContext newDomainCtxt = new DomainContext();
        final DomainContext newDomainCtxt = new DomainContext();
        newDomainCtxt.active = true;
        newDomainCtxt.rsd = rsd;
        newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate();
@@ -716,7 +730,7 @@
        // Assign the start state for the domain
        if (isPersistent == PERSISTENT_CHANGES_ONLY)
        {
          newDomainCtxt.startState = rsd.getLatestServerState().duplicate();
          newDomainCtxt.startState = latestServerState;
          startStatesFromProvidedCookie.remove(rsd.getBaseDN());
        }
        else
@@ -746,7 +760,7 @@
            // when there is a cookie provided in the request,
            if (newDomainCtxt.startState == null)
            {
              missingDomains += (rsd.getBaseDN() + ":;");
              missingDomains.append(rsd.getBaseDN()).append(":;");
              continue;
            }
            else if (!newDomainCtxt.startState.isEmpty())
@@ -760,7 +774,7 @@
            }
          }
          newDomainCtxt.stopState = rsd.getLatestServerState().duplicate();
          newDomainCtxt.stopState = latestServerState;
        }
        newDomainCtxt.currentState = new ServerState();
@@ -799,13 +813,12 @@
      - if the cookie contains a domain that is even not replicated
      then this case need to be considered here in another loop.
      */
      if (!startStatesFromProvidedCookie.isEmpty())
      if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
      {
        if (allowUnknownDomains)
          for (DN providedDomain : startStatesFromProvidedCookie.keySet())
            if (rs.getReplicationServerDomain(providedDomain) == null)
              // the domain provided in the cookie is not replicated
              startStatesFromProvidedCookie.remove(providedDomain);
        for (DN providedDomain : startStatesFromProvidedCookie.keySet())
          if (rs.getReplicationServerDomain(providedDomain) == null)
            // the domain provided in the cookie is not replicated
            startStatesFromProvidedCookie.remove(providedDomain);
      }
      // Now do the final checking
@@ -833,7 +846,7 @@
      // Initializes each and every domain with the next(first) eligible message
      // from the domain.
      for (DomainContext domainCtxt : domainCtxts) {
        domainCtxt.getNextEligibleMessageForDomain(operationId);
        domainCtxt.computeNextEligibleMessageForDomain(operationId);
        if (domainCtxt.nextMsg == null)
          domainCtxt.active = false;
@@ -1055,53 +1068,6 @@
      closeInitPhase();
    }
    /* TODO: From replication CSN
    //--
    if (startCLMsg.getStartMode()==2)
    {
      if (CLSearchFromProvidedExactCSN(startCLMsg.getCSN()))
        return;
    }
    //--
    if (startCLMsg.getStartMode()==4)
    {
      // to get the CL first and last
      initializeCLDomCtxts(null); // from start
      CSN crossDomainEligibleCSN = computeCrossDomainEligibleCSN();
      try
      {
        // to get the CL first and last
        // last rely on the crossDomainEligibleCSN thus must have been
        // computed before
        int[] limits = computeCLLimits(crossDomainEligibleCSN);
        // Send the response
        CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]);
        session.publish(msg);
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        try
        {
          session.publish(
            new ErrorMsg(
             replicationServer.getServerId(),
             serverId,
             Message.raw(Category.SYNC, Severity.INFORMATION,
                 "Exception raised: " + e.getMessage())));
        }
        catch(IOException ioe)
        {
          // FIXME: close conn ?
        }
      }
      return;
    }
     */
    // Store into domain
    registerIntoDomain();
    if (debugEnabled())
@@ -1116,7 +1082,7 @@
    short requestType = msg.getECLRequestType();
    if (requestType == REQUEST_TYPE_FROM_COOKIE)
    {
      initializeCLSearchFromGenState(msg.getCrossDomainServerState());
      initializeCLSearchFromCookie(msg.getCrossDomainServerState());
    }
    else if (requestType == REQUEST_TYPE_FROM_CHANGE_NUMBER)
    {
@@ -1231,12 +1197,7 @@
        }
        // Build the ECLUpdateMsg to be returned
        final ECLUpdateMsg change = new ECLUpdateMsg(
            (LDAPUpdateMsg) oldestContext.nextMsg,
            null, // cookie will be set later
            oldestContext.rsd.getBaseDN(),
            0); // changeNumber may be set later
        oldestContext.nextMsg = null;
        final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
        // Default is not to loop, with one exception
        continueLooping = false;
@@ -1250,8 +1211,7 @@
        // Set and test the domain of the oldestChange see if we reached
        // the end of the phase for this domain
        oldestContext.currentState.update(
            change.getUpdateMsg().getCSN());
        oldestContext.currentState.update(change.getUpdateMsg().getCSN());
        if (oldestContext.currentState.cover(oldestContext.stopState)
            || (draftCompat
@@ -1264,7 +1224,7 @@
        {
          // populates the table with the next eligible msg from iDom
          // in non blocking mode, return null when no more eligible msg
          oldestContext.getNextEligibleMessageForDomain(operationId);
          oldestContext.computeNextEligibleMessageForDomain(operationId);
        }
        oldestChange = change;
      }
@@ -1277,21 +1237,14 @@
                  + "looking for the generalized oldest change"));
        for (DomainContext domainCtxt : domainCtxts) {
          domainCtxt.getNextEligibleMessageForDomain(operationId);
          domainCtxt.computeNextEligibleMessageForDomain(operationId);
        }
        DomainContext oldestContext = findOldestChangeFromDomainCtxts();
        if (oldestContext != null)
        {
          final ECLUpdateMsg change = new ECLUpdateMsg(
              (LDAPUpdateMsg) oldestContext.nextMsg,
              null, // set later
              oldestContext.rsd.getBaseDN(),
              0);
          oldestContext.nextMsg = null; // clean
          oldestContext.currentState.update(
              change.getUpdateMsg().getCSN());
          final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
          oldestContext.currentState.update(change.getUpdateMsg().getCSN());
          if (draftCompat)
          {
@@ -1330,6 +1283,15 @@
    return oldestChange;
  }
  private ECLUpdateMsg newECLUpdateMsg(DomainContext ctx)
  {
    // cookie will be set later AND changeNumber may be set later
    final ECLUpdateMsg change = new ECLUpdateMsg(
        (LDAPUpdateMsg) ctx.nextMsg, null, ctx.rsd.getBaseDN(), 0);
    ctx.nextMsg = null; // clean after use
    return change;
  }
  /**
   * Either retrieves a change number from the DB, or assign a new change number
   * and store in the DB.
@@ -1346,13 +1308,11 @@
  private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
      throws ChangelogException
  {
    // We also need to check if the CNIndexDB is consistent with
    // the changelogdb.
    // if not, 2 potential reasons
    // a/ : changelog has been purged (trim)let's traverse the CNIndexDB
    // b/ : changelog is late .. let's traverse the changelogDb
    // The following loop allows to loop until being on the same cn
    // in the 2 dbs
    // We also need to check if the CNIndexDB is consistent with the
    // changelogDB. If not, 2 potential reasons:
    // a/ changelog has been purged (trim) let's traverse the CNIndexDB
    // b/ changelog is late ... let's traverse the changelogDb
    // The following loop allows to loop until being on the same cn in the 2 dbs
    // replogCSN : the oldest change from the changelog db
    CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN();
@@ -1374,20 +1334,17 @@
      final DN dnFromCNIndexDB = currentRecord.getBaseDN();
      if (debugEnabled())
        TRACER.debugInfo("assignChangeNumber() generating change number "
            + " comparing the 2 db DNs :" + dnFromChangelogDb + "?="
            + csnFromChangelogDb + " timestamps:"
            + new Date(csnFromChangelogDb.getTime()) + " ?older"
            + new Date(csnFromCNIndexDB.getTime()));
        TRACER.debugInfo("assignChangeNumber() comparing the 2 db DNs :"
            + dnFromChangelogDb + "?=" + dnFromCNIndexDB + " timestamps:"
            + asDate(csnFromChangelogDb) + " ?older"
            + asDate(csnFromCNIndexDB));
      if (areSameChange(csnFromChangelogDb, dnFromChangelogDb,
          csnFromCNIndexDB, dnFromCNIndexDB))
      {
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() generating change number "
              + " assigning changeNumber=" + currentRecord.getChangeNumber()
              + " to change=" + oldestChange);
          TRACER.debugInfo("assignChangeNumber() assigning changeNumber="
              + currentRecord.getChangeNumber() + " to change=" + oldestChange);
        oldestChange.setChangeNumber(currentRecord.getChangeNumber());
        return true;
@@ -1398,11 +1355,11 @@
      {
        // the change from the changelogDb is older
        // it should have been stored lately
        // let's continue to traverse the changelogdb
        // let's continue to traverse the changelogDB
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber(): will skip "
          TRACER.debugInfo("assignChangeNumber() will skip "
              + csnFromChangelogDb
              + " and read next from the regular changelog.");
              + " and read next change from the regular changelog.");
        return false; // TO BE CHECKED
      }
@@ -1413,18 +1370,17 @@
      try
      {
        // let's traverse the CNIndexDB searching for the change
        // found in the changelogDb.
        // found in the changelogDB
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() generating change number "
              + " will skip " + csnFromCNIndexDB
          TRACER.debugInfo("assignChangeNumber() will skip " + csnFromCNIndexDB
              + " and read next change from the CNIndexDB.");
        isEndOfCNIndexDBReached = !cnIndexDBCursor.next();
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() generating change number has"
              + "skipped to  changeNumber=" + currentRecord.getChangeNumber()
              + " csn=" + currentRecord.getCSN() + " End of CNIndexDB ?"
          TRACER.debugInfo("assignChangeNumber() has skipped to changeNumber="
              + currentRecord.getChangeNumber() + " csn="
              + currentRecord.getCSN() + " End of CNIndexDB ?"
              + isEndOfCNIndexDBReached);
      }
      catch (ChangelogException e)
@@ -1439,6 +1395,11 @@
    }
  }
  private Date asDate(CSN csn)
  {
    return new Date(csn.getTime());
  }
  private boolean areSameChange(CSN csn1, DN dn1, CSN csn2, DN dn2)
  {
    boolean sameDN = dn1.compareTo(dn2) == 0;