mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.45.2013 c56828ecb3b656bb531d313b722bd572eeb10905
OPENDJ-1116 Introduce abstraction for the changelog DB

ECLServerHandler.java:
Renamed clDomCtxtsToString() to domaimCtxtsToString().
In waitAndProcessStartSessionECLFromRemoteServer(), removed the many declared exceptions.
Extracted method buildDomainContexts().
Renamed findOldestChangeFromDomainCtxts() to findDomainCtxtWithOldestChange().
Removed useless comments
1 files modified
380 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 380 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -31,7 +31,6 @@
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.zip.DataFormatException;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -317,7 +316,7 @@
    }
  }
  private String clDomCtxtsToString(String msg)
  private String domaimCtxtsToString(String msg)
  {
    StringBuilder buffer = new StringBuilder();
    buffer.append(msg).append("\n");
@@ -483,17 +482,12 @@
  /**
   * Wait receiving the StartSessionMsg from the remote DS and process it.
   *
   * @return the startSessionMsg received
   * @throws DirectoryException
   * @throws IOException
   * @throws ClassNotFoundException
   * @throws DataFormatException
   * @throws NotSupportedOldVersionPDUException
   * @throws Exception
   */
  private StartECLSessionMsg waitAndProcessStartSessionECLFromRemoteServer()
  throws DirectoryException, IOException, ClassNotFoundException,
  DataFormatException,
  NotSupportedOldVersionPDUException
      throws Exception
  {
    ReplicationMsg msg = session.receive();
@@ -505,9 +499,8 @@
    }
    else if (!(msg instanceof StartECLSessionMsg))
    {
      Message message = Message
          .raw("Protocol error: StartECLSessionMsg required." + msg
              + " received.");
      Message message = Message.raw(
          "Protocol error: StartECLSessionMsg required." + msg + " received.");
      abortStart(message);
      return null;
    }
@@ -576,7 +569,7 @@
   * @param startChangeNumber
   *          the start change number coming from the request filter.
   * @return the cookie corresponding to the passed in startChangeNumber.
   * @throws Exception
   * @throws ChangelogException
   *           if a database problem occurred
   * @throws DirectoryException
   *           if a database problem occurred
@@ -677,170 +670,9 @@
  private void initializeChangelogDomainCtxts(String providedCookie,
      boolean allowUnknownDomains) throws DirectoryException
  {
    /*
    This map is initialized from the providedCookie.
    Below, it will be traversed and each domain configured with ECL will be
    checked and removed from the map.
    At the end, normally the map should be empty.
    Depending on allowUnknownDomains provided flag, a non empty map will
    be considered as an error when allowUnknownDomains is false.
    */
    Map<DN, ServerState> startStatesFromProvidedCookie =
        new HashMap<DN, ServerState>();
    ReplicationServer rs = this.replicationServer;
    // Parse the provided cookie and overwrite startState from it.
    if ((providedCookie != null) && (providedCookie.length()!=0))
      startStatesFromProvidedCookie =
        MultiDomainServerState.splitGenStateToServerStates(providedCookie);
    try
    {
      // Creates the table that will contain the real-time info for each
      // and every domain.
      final Set<DomainContext> tmpSet = new HashSet<DomainContext>();
      final StringBuilder missingDomains = new StringBuilder();
      for (ReplicationServerDomain domain : toIterable(rs.getDomainIterator()))
      {
        // skip the 'unreal' changelog domain
        if (domain == this.replicationServerDomain)
          continue;
        // skip the excluded domains
        if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString()))
        {
          // this is an excluded domain
          if (allowUnknownDomains)
            startStatesFromProvidedCookie.remove(domain.getBaseDN());
          continue;
        }
        // skip unused domains
        final ServerState latestServerState = domain.getLatestServerState();
        if (latestServerState.isEmpty())
          continue;
        // Creates the new domain context
        final DomainContext newDomainCtxt = new DomainContext();
        newDomainCtxt.active = true;
        newDomainCtxt.rsDomain = domain;
        newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate();
        // Assign the start state for the domain
        if (isPersistent == PERSISTENT_CHANGES_ONLY)
        {
          newDomainCtxt.startState = latestServerState;
          startStatesFromProvidedCookie.remove(domain.getBaseDN());
        }
        else
        {
          // let's take the start state for this domain from the provided
          // cookie
          newDomainCtxt.startState =
              startStatesFromProvidedCookie.remove(domain.getBaseDN());
          if (providedCookie == null
              || providedCookie.length() == 0
              || allowUnknownDomains)
          {
            // when there is no cookie provided in the request,
            // let's start traversing this domain from the beginning of
            // what we have in the replication changelog
            if (newDomainCtxt.startState == null)
            {
              CSN latestTrimCSN =
                  new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0);
              newDomainCtxt.startState =
                  domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN);
            }
          }
          else
          {
            // when there is a cookie provided in the request,
            if (newDomainCtxt.startState == null)
            {
              missingDomains.append(domain.getBaseDN()).append(":;");
              continue;
            }
            else if (!newDomainCtxt.startState.isEmpty())
            {
              if (hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
              {
                throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                    ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
                      newDomainCtxt.rsDomain.getBaseDN().toNormalizedString()));
              }
            }
          }
          newDomainCtxt.stopState = latestServerState;
        }
        newDomainCtxt.currentState = new ServerState();
        // Creates an unconnected SH for the domain
        MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
        mh.setInitialServerState(newDomainCtxt.startState);
        mh.setBaseDNAndDomain(domain.getBaseDN(), false);
        // register the unconnected into the domain
        domain.registerHandler(mh);
        newDomainCtxt.mh = mh;
        previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(),
                              newDomainCtxt.startState);
        // store the new context
        tmpSet.add(newDomainCtxt);
      }
      if (missingDomains.length()>0)
      {
        // If there are domain missing in the provided cookie,
        // the request is rejected and a full resync is required.
        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
          ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
              missingDomains,
              "<" + providedCookie + missingDomains + ">"));
      }
      domainCtxts = tmpSet;
      /*
      When it is valid to have the provided cookie containing unknown domains
      (allowUnknownDomains is true), 2 cases must be considered :
      - if the cookie contains a domain that is replicated but where
      ECL is disabled, then this case is considered above
      - if the cookie contains a domain that is even not replicated
      then this case need to be considered here in another loop.
      */
      if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
      {
        for (DN providedDomain : startStatesFromProvidedCookie.keySet())
          if (rs.getReplicationServerDomain(providedDomain) == null)
            // the domain provided in the cookie is not replicated
            startStatesFromProvidedCookie.remove(providedDomain);
      }
      // Now do the final checking
      if (!startStatesFromProvidedCookie.isEmpty())
      {
        /*
        After reading all the known domains from the provided cookie, there
        is one (or several) domain that are not currently configured.
        This domain has probably been removed or replication disabled on it.
        The request is rejected and full resync is required.
        */
        StringBuilder sb = new StringBuilder();
        for (DomainContext domainCtxt : domainCtxts) {
          sb.append(domainCtxt.rsDomain.getBaseDN()).append(":")
            .append(domainCtxt.startState).append(";");
        }
        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
            ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
                startStatesFromProvidedCookie.toString() ,sb.toString()));
      }
      // the next record from the CNIndexDB should be the one
      domainCtxts = buildDomainContexts(providedCookie, allowUnknownDomains);
      startCookie = providedCookie;
      // Initializes each and every domain with the next(first) eligible message
@@ -867,8 +699,163 @@
              e);
    }
    if (debugEnabled())
      TRACER.debugInfo(
        " initializeCLDomCtxts ends with " + " " + dumpState());
      TRACER.debugInfo("initializeChangelogDomainCtxts() ends with "
          + dumpState());
  }
  private Set<DomainContext> buildDomainContexts(String providedCookie,
      boolean allowUnknownDomains) throws DirectoryException
  {
    final Set<DomainContext> results = new HashSet<DomainContext>();
    final ReplicationServer rs = this.replicationServer;
    /*
    This map is initialized from the providedCookie.
    Below, it will be traversed and each domain configured with ECL will be
    checked and removed from the map.
    At the end, normally the map should be empty.
    Depending on allowUnknownDomains provided flag, a non empty map will
    be considered as an error when allowUnknownDomains is false.
    */
    final Map<DN, ServerState> startStatesFromProvidedCookie =
        MultiDomainServerState.splitGenStateToServerStates(providedCookie);
    final StringBuilder missingDomains = new StringBuilder();
    for (ReplicationServerDomain domain : toIterable(rs.getDomainIterator()))
    {
      // skip the 'unreal' changelog domain
      if (domain == this.replicationServerDomain)
        continue;
      // skip the excluded domains
      if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString()))
      {
        // this is an excluded domain
        if (allowUnknownDomains)
          startStatesFromProvidedCookie.remove(domain.getBaseDN());
        continue;
      }
      // skip unused domains
      final ServerState latestServerState = domain.getLatestServerState();
      if (latestServerState.isEmpty())
        continue;
      // Creates the new domain context
      final DomainContext newDomainCtxt = new DomainContext();
      newDomainCtxt.active = true;
      newDomainCtxt.rsDomain = domain;
      newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate();
      // Assign the start state for the domain
      if (isPersistent == PERSISTENT_CHANGES_ONLY)
      {
        newDomainCtxt.startState = latestServerState;
        startStatesFromProvidedCookie.remove(domain.getBaseDN());
      }
      else
      {
        // let's take the start state for this domain from the provided
        // cookie
        newDomainCtxt.startState =
            startStatesFromProvidedCookie.remove(domain.getBaseDN());
        if (providedCookie == null || providedCookie.length() == 0
            || allowUnknownDomains)
        {
          // when there is no cookie provided in the request,
          // let's start traversing this domain from the beginning of
          // what we have in the replication changelog
          if (newDomainCtxt.startState == null)
          {
            CSN latestTrimCSN =
                new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0);
            newDomainCtxt.startState =
                domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN);
          }
        }
        else
        {
          // when there is a cookie provided in the request,
          if (newDomainCtxt.startState == null)
          {
            missingDomains.append(domain.getBaseDN()).append(":;");
            continue;
          }
          else if (!newDomainCtxt.startState.isEmpty()
              && hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
          {
            throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
                    newDomainCtxt.rsDomain.getBaseDN().toNormalizedString()));
          }
        }
        newDomainCtxt.stopState = latestServerState;
      }
      newDomainCtxt.currentState = new ServerState();
      // Creates an unconnected SH for the domain
      MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
      mh.setInitialServerState(newDomainCtxt.startState);
      mh.setBaseDNAndDomain(domain.getBaseDN(), false);
      // register the unconnected into the domain
      domain.registerHandler(mh);
      newDomainCtxt.mh = mh;
      previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(),
                            newDomainCtxt.startState);
      results.add(newDomainCtxt);
    }
    if (missingDomains.length()>0)
    {
      // If there are domain missing in the provided cookie,
      // the request is rejected and a full resync is required.
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
        ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
            missingDomains,
            "<" + providedCookie + missingDomains + ">"));
    }
    /*
    When it is valid to have the provided cookie containing unknown domains
    (allowUnknownDomains is true), 2 cases must be considered :
    - if the cookie contains a domain that is replicated but where
    ECL is disabled, then this case is considered above
    - if the cookie contains a domain that is even not replicated
    then this case need to be considered here in another loop.
    */
    if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
    {
      for (DN providedDomain : startStatesFromProvidedCookie.keySet())
        if (rs.getReplicationServerDomain(providedDomain) == null)
          // the domain provided in the cookie is not replicated
          startStatesFromProvidedCookie.remove(providedDomain);
    }
    // Now do the final checking
    if (!startStatesFromProvidedCookie.isEmpty())
    {
      /*
      After reading all the known domains from the provided cookie, there
      is one (or several) domain that are not currently configured.
      This domain has probably been removed or replication disabled on it.
      The request is rejected and full resync is required.
      */
      StringBuilder sb = new StringBuilder();
      for (DomainContext domainCtxt : domainCtxts) {
        sb.append(domainCtxt.rsDomain.getBaseDN()).append(":")
          .append(domainCtxt.startState).append(";");
      }
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
          ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
              startStatesFromProvidedCookie.toString() ,sb.toString()));
    }
    return results;
  }
  private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain,
@@ -1039,6 +1026,7 @@
      try
      {
        // Disable timeout for next communications
        // FIXME: why? and where is it reset?
        session.setSoTimeout(0);
      }
      catch(Exception e) { /* do nothing */ }
@@ -1046,18 +1034,15 @@
      // sendWindow MUST be created before starting the writer
      sendWindow = new Semaphore(sendWindowSize);
      // create reader
      reader = new ServerReader(session, this);
      reader.start();
      if (writer == null)
      {
        // create writer
        writer = new ECLServerWriter(session,this,replicationServerDomain);
        writer.start();
      }
      // Resume the writer
      ((ECLServerWriter)writer).resumeWriter();
      // TODO:ECL Potential race condition if writer not yet resumed here
@@ -1073,7 +1058,7 @@
    if (debugEnabled())
      TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId
          + " initialized: " + " " + dumpState() + " " + " "
          + clDomCtxtsToString(""));
          + domaimCtxtsToString(""));
  }
  private void initializeChangelogSearch(StartECLSessionMsg msg)
@@ -1136,18 +1121,17 @@
  @Override
  protected UpdateMsg getNextMessage(boolean synchronous)
  {
    UpdateMsg msg = null;
    try
    {
      ECLUpdateMsg eclMsg = getNextECLUpdate();
      if (eclMsg!=null)
        msg = eclMsg.getUpdateMsg();
      if (eclMsg != null)
        return eclMsg.getUpdateMsg();
    }
    catch(DirectoryException de)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, de);
    }
    return msg;
    return null;
  }
  /**
@@ -1187,7 +1171,7 @@
      while (continueLooping && searchPhase == INIT_PHASE)
      {
        // Step 1 & 2
        DomainContext oldestContext = findOldestChangeFromDomainCtxts();
        final DomainContext oldestContext = findDomainCtxtWithOldestChange();
        if (oldestContext == null)
        { // there is no oldest change to process
          closeInitPhase();
@@ -1196,7 +1180,6 @@
          return null;
        }
        // Build the ECLUpdateMsg to be returned
        final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
        // Default is not to loop, with one exception
@@ -1222,8 +1205,6 @@
        }
        if (oldestContext.active)
        {
          // populates the table with the next eligible msg from iDom
          // in non blocking mode, return null when no more eligible msg
          oldestContext.computeNextEligibleMessageForDomain(operationId);
        }
        oldestChange = change;
@@ -1232,7 +1213,7 @@
      if (searchPhase == PERSISTENT_PHASE)
      {
        if (debugEnabled())
          TRACER.debugInfo(clDomCtxtsToString(
          TRACER.debugInfo(domaimCtxtsToString(
              "In getNextECLUpdate (persistent): "
                  + "looking for the generalized oldest change"));
@@ -1240,12 +1221,11 @@
          domainCtxt.computeNextEligibleMessageForDomain(operationId);
        }
        DomainContext oldestContext = findOldestChangeFromDomainCtxts();
        final DomainContext oldestContext = findDomainCtxtWithOldestChange();
        if (oldestContext != null)
        {
          final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
          oldestContext.currentState.update(change.getUpdateMsg().getCSN());
          if (draftCompat)
          {
            assignNewChangeNumberAndStore(change);
@@ -1269,16 +1249,12 @@
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn);
      // Update the current state
      previousCookie.update(oldestChange.getBaseDN(), csn);
      // Set the current value of global state in the returned message
      oldestChange.setCookie(previousCookie);
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate returns result oldestChange="
            + oldestChange);
    }
    return oldestChange;
  }
@@ -1327,8 +1303,6 @@
        return true;
      }
      // the next change from the CNIndexDB
      final CNIndexRecord currentRecord = cnIndexDBCursor.getRecord();
      final CSN csnFromCNIndexDB = currentRecord.getCSN();
      final DN dnFromCNIndexDB = currentRecord.getBaseDN();
@@ -1454,7 +1428,7 @@
      searchPhase = UNDEFINED_PHASE;
    }
    // End of INIT_PHASE => always release the iterator
    // End of INIT_PHASE => always release the cursor
    releaseCursor();
  }
@@ -1464,13 +1438,13 @@
   * @return the domainCtxt of the domain with the oldest change, null when
   *         none.
   */
  private DomainContext findOldestChangeFromDomainCtxts()
  private DomainContext findDomainCtxtWithOldestChange()
  {
    DomainContext oldestCtxt = null;
    for (DomainContext domainCtxt : domainCtxts)
    {
      if (domainCtxt.active
          // .msg is null when the previous (non blocking) nextMessage did
          // .nextMsg is null when the previous (non blocking) nextMessage did
          // not have any eligible msg to return
          && domainCtxt.nextMsg != null
          && (oldestCtxt == null
@@ -1482,7 +1456,7 @@
    if (debugEnabled())
      TRACER.debugInfo("In cn=changelog," + this
          + " getOldestChangeFromDomainCtxts() returns "
          + " findDomainCtxtWithOldestChange() returns "
          + ((oldestCtxt != null) ? oldestCtxt.nextMsg : "-1"));
    return oldestCtxt;