mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.43.2014 f2279a32d3187f3903437f1d6fdd6639a0d13b2b
opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -27,28 +27,32 @@
package org.opends.server.replication.server;
import java.io.IOException;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.*;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.ServerConstants;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg
.ECLRequestType.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg
.Persistent.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg.Persistent.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -117,12 +121,6 @@
  private MultiDomainServerState previousCookie = new MultiDomainServerState();
  /**
   * Eligible CSN - only changes older or equal to eligibleCSN are published in
   * the ECL.
   */
  private CSN eligibleCSN;
  /**
   * The global list of contexts by domain for the search currently processed.
   */
  private Set<DomainContext> domainCtxts = Collections.emptySet();
@@ -133,16 +131,15 @@
   */
  private String dumpState()
  {
    return getClass().getCanonicalName() +
           "[" +
           "[draftCompat=" + draftCompat +
           "] [persistent=" + startECLSessionMsg.getPersistent() +
           "] [startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() +
           "] [isEndOfCNIndexDBReached=" + isEndOfCNIndexDBReached +
           "] [searchPhase=" + searchPhase +
           "] [startCookie=" + startCookie +
           "] [previousCookie=" + previousCookie +
           "]]";
    return getClass().getSimpleName() +
           " [draftCompat=" + draftCompat +
           ", persistent=" + startECLSessionMsg.getPersistent() +
           ", startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() +
           ", endOfCNIndexDBReached=" + isEndOfCNIndexDBReached +
           ", searchPhase=" + searchPhase +
           ", startCookie=" + startCookie +
           ", previousCookie=" + previousCookie +
           "]";
  }
  /**
@@ -155,27 +152,35 @@
   */
  private class DomainContext
  {
    private ReplicationServerDomain rsDomain;
    private final ReplicationServerDomain rsDomain;
    /**
     * active when there are still changes supposed eligible for the ECL.
     * Active when there are still changes supposed eligible for the ECL. It is
     * active by default.
     */
    private boolean active;
    private boolean active = true;
    private UpdateMsg nextMsg;
    /**
     * the message handler from which are reading the changes for this domain.
     */
    private MessageHandler mh;
    private UpdateMsg nextMsg;
    private UpdateMsg nextNonEligibleMsg;
    private ServerState startState;
    private ServerState currentState;
    private ServerState stopState;
    private long domainLatestTrimDate;
    private final MessageHandler mh;
    private final ServerState startState;
    private final ServerState currentState = new ServerState();
    private final ServerState stopState;
    private final long domainLatestTrimDate;
    /**
     * {@inheritDoc}
     */
    public DomainContext(ReplicationServerDomain domain,
        ServerState startState, ServerState stopState, MessageHandler mh)
    {
      this.rsDomain = domain;
      this.startState = startState;
      this.stopState = stopState;
      this.mh = mh;
      this.domainLatestTrimDate = domain.getLatestDomainTrimDate();
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
@@ -183,96 +188,38 @@
      toString(buffer);
      return buffer.toString();
    }
    /**
     * Provide a string representation of this object for debug purpose..
     * @param buffer Append to this buffer.
     */
    public void toString(StringBuilder buffer)
    private StringBuilder toString(StringBuilder buffer)
    {
      buffer.append("[ [active=").append(active)
          .append("] [rsDomain=").append(rsDomain)
          .append("] [nextMsg=").append(nextMsg).append("(")
          .append(nextMsg != null ? asDate(nextMsg.getCSN()).toString() : "")
          .append(")")
          .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
          .append("] [startState=").append(startState)
          .append("] [currentState=").append(currentState)
          .append("] [stopState=").append(stopState)
          .append("]]");
      buffer.append(getClass().getSimpleName());
      buffer.append(" [");
      buffer.append(active ? "active" : "inactive");
      buffer.append(", baseDN=\"").append(rsDomain.getBaseDN()).append("\"");
      if (nextMsg != null)
      {
        buffer.append(", csn=").append(nextMsg.getCSN().toStringUI());
      }
      buffer.append(", nextMsg=[").append(nextMsg);
      buffer.append("]")
          .append(", startState=").append(startState)
          .append(", currentState=").append(currentState)
          .append(", stopState=").append(stopState)
          .append("]");
      return buffer;
    }
    /**
     * Computes the next message eligible regarding the crossDomain eligible
     * CSN.
     *
     * @param opId The operation id.
     * Computes the next available message for this domain context.
     */
    private void computeNextEligibleMessageForDomain(String opId)
    private void computeNextAvailableMessage()
    {
      nextMsg = getNextMessage();
      if (logger.isTraceEnabled())
        debugInfo(opId, "ctxt=" + this);
      assert(nextMsg == null);
      try
      {
        // Before get a new message from the domain, evaluate in priority
        // a message that has not been published to the ECL because it was
        // not eligible
        if (nextNonEligibleMsg != null)
        {
          final boolean hasBecomeEligible = isEligible(nextNonEligibleMsg);
          if (logger.isTraceEnabled())
            debugInfo(opId, "stored nonEligibleMsg " + nextNonEligibleMsg
                + " has now become eligible regarding the eligibleCSN ("
                + eligibleCSN + " ): " + hasBecomeEligible);
          if (hasBecomeEligible)
          {
            nextMsg = nextNonEligibleMsg;
            nextNonEligibleMsg = null;
          }
          // else the oldest is still not eligible - let's wait next
        }
        else
        {
          // Here comes a new message !!!
          final UpdateMsg newMsg = getNextMessage();
          if (newMsg == null)
          {
            return;
          }
          if (logger.isTraceEnabled())
            debugInfo(opId, "got new message : [newMsg=" + newMsg + "] "
                + dumpState());
          final boolean isEligible = isEligible(newMsg);
          if (logger.isTraceEnabled())
            debugInfo(opId, "newMsg isEligible=" + isEligible + " since "
                + "newMsg=[" + toString(newMsg.getCSN()) + "] eligibleCSN=["
                + toString(eligibleCSN) + "] " + dumpState());
          if (isEligible)
          {
            nextMsg = newMsg;
          }
          else
          {
            nextNonEligibleMsg = newMsg;
          }
        }
        logger.trace("In ECLServerHandler, for baseDN="
            + mh.getBaseDNString() + " computeNextAvailableMessage("
            + getOperationId() + ") : newMsg=[" + nextMsg + "] " + dumpState());
      }
      catch(Exception e)
      {
        logger.traceException(e);
      }
    }
    private boolean isEligible(UpdateMsg msg)
    {
      return msg.getCSN().getTime() <= eligibleCSN.getTime();
    }
    private UpdateMsg getNextMessage()
@@ -296,18 +243,6 @@
      }
    }
    private String toString(CSN csn)
    {
      return csn + " " + asDate(csn);
    }
    private void debugInfo(String opId, String message)
    {
      logger.trace("In ECLServerHandler, for baseDN="
          + mh.getBaseDNString() + " getNextEligibleMessageForDomain(" + opId
          + ") " + message);
    }
    /**
     * Unregister the handler from the DomainContext ReplicationDomain.
     * @return Whether the handler has been unregistered with success.
@@ -328,11 +263,10 @@
  private String domaimCtxtsToString(String msg)
  {
    StringBuilder buffer = new StringBuilder();
    final StringBuilder buffer = new StringBuilder();
    buffer.append(msg).append("\n");
    for (DomainContext domainCtxt : domainCtxts) {
      domainCtxt.toString(buffer);
      buffer.append("\n");
      domainCtxt.toString(buffer).append("\n");
    }
    return buffer.toString();
  }
@@ -702,10 +636,12 @@
      // Initializes each and every domain with the next(first) eligible message
      // from the domain.
      for (DomainContext domainCtxt : domainCtxts) {
        domainCtxt.computeNextEligibleMessageForDomain(getOperationId());
        domainCtxt.computeNextAvailableMessage();
        if (domainCtxt.nextMsg == null)
        {
          domainCtxt.active = false;
        }
      }
    }
    catch(DirectoryException de)
@@ -767,64 +703,48 @@
        continue;
      // Creates the new domain context
      final DomainContext newDomainCtxt = new DomainContext();
      newDomainCtxt.active = true;
      newDomainCtxt.rsDomain = domain;
      newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate();
      // Assign the start state for the domain
      final DomainContext newDomainCtxt;
      final ServerState domainStartState =
          startStatesFromProvidedCookie.remove(domain.getBaseDN());
      if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY)
      {
        newDomainCtxt.startState = latestState;
        startStatesFromProvidedCookie.remove(domain.getBaseDN());
        newDomainCtxt = newDomainContext(domain, null, latestState);
      }
      else
      {
        // let's take the start state for this domain from the provided cookie
        newDomainCtxt.startState =
            startStatesFromProvidedCookie.remove(domain.getBaseDN());
        ServerState startState = domainStartState;
        if (providedCookie == null || providedCookie.length() == 0
            || allowUnknownDomains)
        {
          // when there is no cookie provided in the request,
          // let's start traversing this domain from the beginning of
          // what we have in the replication changelog
          if (newDomainCtxt.startState == null)
          if (startState == null)
          {
            newDomainCtxt.startState =
            startState =
                domain.getOldestState().duplicateOnlyOlderThan(
                    newDomainCtxt.domainLatestTrimDate);
                    domain.getLatestDomainTrimDate());
          }
        }
        else
        {
          // when there is a cookie provided in the request,
          if (newDomainCtxt.startState == null)
          if (startState == null)
          {
            missingDomains.append(domain.getBaseDN()).append(":;");
            continue;
          }
          else if (!newDomainCtxt.startState.isEmpty()
              && hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
          else if (!startState.isEmpty()
              && hasCookieBeenTrimmedFromDB(domain, startState))
          {
            throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
                    newDomainCtxt.rsDomain.getBaseDN().toNormalizedString()));
                    domain.getBaseDN().toNormalizedString()));
          }
        }
        newDomainCtxt.stopState = latestState;
        newDomainCtxt = newDomainContext(domain, startState, latestState);
      }
      newDomainCtxt.currentState = new ServerState();
      // Creates an unconnected SH for the domain
      MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
      mh.setInitialServerState(newDomainCtxt.startState);
      mh.setBaseDNAndDomain(domain.getBaseDN(), false);
      // register the unconnected into the domain
      domain.registerHandler(mh);
      newDomainCtxt.mh = mh;
      previousCookie.replace(newDomainCtxt.rsDomain.getBaseDN(),
                             newDomainCtxt.startState.duplicate());
@@ -832,7 +752,7 @@
      results.add(newDomainCtxt);
    }
    if (missingDomains.length()>0)
    if (missingDomains.length() > 0)
    {
      // If there are domain missing in the provided cookie,
      // the request is rejected and a full resync is required.
@@ -851,11 +771,16 @@
    */
    if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
    {
      // JNR: Will the following code trigger a ConcurrentModificationException?
      for (DN providedDomain : startStatesFromProvidedCookie.keySet())
      final Set<DN> providedDomains = startStatesFromProvidedCookie.keySet();
      for (Iterator<DN> iter = providedDomains.iterator(); iter.hasNext();)
      {
        DN providedDomain = iter.next();
        if (rs.getReplicationServerDomain(providedDomain) == null)
        {
          // the domain provided in the cookie is not replicated
          startStatesFromProvidedCookie.remove(providedDomain);
          iter.remove();
        }
      }
    }
    // Now do the final checking
@@ -880,6 +805,19 @@
    return results;
  }
  private DomainContext newDomainContext(ReplicationServerDomain domain,
      ServerState startState, ServerState stopState) throws DirectoryException
  {
    // Create an unconnected MessageHandler for the domain
    MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
    mh.setInitialServerState(startState);
    mh.setBaseDNAndDomain(domain.getBaseDN(), false);
    // register the unconnected into the domain
    domain.registerHandler(mh);
    return new DomainContext(domain, startState, stopState, mh);
  }
  private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain,
      ServerState cookie)
  {
@@ -1070,15 +1008,12 @@
    if (logger.isTraceEnabled())
      logger.trace(getClass().getCanonicalName() + " " + getOperationId()
          + " initialized: " + " " + dumpState() + " " + " "
          + domaimCtxtsToString(""));
          + " initialized: " + " " + dumpState() + domaimCtxtsToString(""));
  }
  private void initializeChangelogSearch(StartECLSessionMsg msg)
      throws DirectoryException
  {
    refreshEligibleCSN();
    if (msg.getECLRequestType() == REQUEST_TYPE_FROM_COOKIE)
    {
      initializeCLSearchFromCookie(msg.getCrossDomainServerState());
@@ -1099,7 +1034,6 @@
   */
  public ECLUpdateMsg takeECLUpdate() throws DirectoryException
  {
    refreshEligibleCSN();
    ECLUpdateMsg msg = getNextECLUpdate();
    // TODO:ECL We should refactor so that a SH always have a session
@@ -1157,8 +1091,10 @@
  public ECLUpdateMsg getNextECLUpdate() throws DirectoryException
  {
    if (logger.isTraceEnabled())
    {
      logger.trace("In cn=changelog" + this +
          " getNextECLUpdate starts: " + dumpState());
    }
    ECLUpdateMsg oldestChange = null;
    try
@@ -1221,7 +1157,7 @@
        }
        if (oldestContext.active)
        {
          oldestContext.computeNextEligibleMessageForDomain(getOperationId());
          oldestContext.computeNextAvailableMessage();
        }
        oldestChange = change;
      }
@@ -1233,8 +1169,9 @@
              "In getNextECLUpdate (persistent): "
                  + "looking for the generalized oldest change"));
        for (DomainContext domainCtxt : domainCtxts) {
          domainCtxt.computeNextEligibleMessageForDomain(getOperationId());
        for (DomainContext domainCtxt : domainCtxts)
        {
          domainCtxt.computeNextAvailableMessage();
        }
        final DomainContext oldestContext = findDomainCtxtWithOldestChange();
@@ -1494,13 +1431,4 @@
    return this.searchPhase != INIT_PHASE;
  }
  /**
   * Refresh the eligibleCSN by requesting the replication server.
   */
  private void refreshEligibleCSN()
  {
    Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs();
    eligibleCSN = replicationServer.getEligibleCSN(excludedBaseDNs);
  }
}