mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
15.13.2013 c2e1e9fc0480a96bc816afea17866f0cd55e8ee5
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2010-2012 ForgeRock AS
 *      Portions Copyright 2010-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -217,14 +217,11 @@
          if (hasBecomeEligible)
          {
            // it is now elligible
            // it is now eligible
            nextMsg = nextNonEligibleMsg;
            nextNonEligibleMsg = null;
          }
          else
          {
            // the oldest is still not elligible - let's wait next
          }
          // else the oldest is still not eligible - let's wait next
        }
        else
        {
@@ -279,6 +276,23 @@
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
    /**
     * Unregister the handler from the DomainContext ReplicationDomain.
     * @return Whether the handler has been unregistered with success.
     */
    private boolean unRegisterHandler()
    {
      return rsd.unRegisterHandler(mh);
    }
    /**
     * Stops the DomainContext handler.
     */
    private void stopServer()
    {
      rsd.stopServer(mh);
    }
  }
  // The global list of contexts by domain for the search currently processed.
@@ -289,9 +303,8 @@
    StringBuilder buffer = new StringBuilder();
    buffer.append(msg);
    buffer.append("\n");
    for (int i=0;i<domainCtxts.length;i++)
    {
      domainCtxts[i].toString(buffer);
    for (DomainContext domainCtxt : domainCtxts) {
      domainCtxt.toString(buffer);
      buffer.append("\n");
    }
    return buffer.toString();
@@ -475,7 +488,7 @@
      // log
      logStartHandshakeRCVandSND(inECLStartMsg, outStartMsg);
      // until here session is encrypted then it depends on the negociation
      // until here session is encrypted then it depends on the negotiation
      // The session initiator decides whether to use SSL.
      if (!sessionInitiatorSSLEncryption)
        session.stopEncryption();
@@ -578,7 +591,7 @@
      DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler();
      // Any (optimizable) condition on draft CN in the request filter ?
      // Any possible optimization on draft CN in the request filter ?
      if (startDraftCN <= 1)
      {
        // Request filter DOES NOT contain any firstDraftCN
@@ -708,12 +721,14 @@
      boolean allowUnknownDomains)
  throws DirectoryException
  {
    // This map is initialized from the providedCookie.
    // Below, it will be traversed and each domain configured with ECL will be
    // checked and removed from the map.
    // At the end, normally the map should be empty.
    // Depending on allowUnknownDomains provided flag, a non empty map will
    // be considered as an error when allowUnknownDomains is false.
    /*
    This map is initialized from the providedCookie.
    Below, it will be traversed and each domain configured with ECL will be
    checked and removed from the map.
    At the end, normally the map should be empty.
    Depending on allowUnknownDomains provided flag, a non empty map will
    be considered as an error when allowUnknownDomains is false.
    */
    Map<String,ServerState> startStatesFromProvidedCookie =
      new HashMap<String,ServerState>();
@@ -732,7 +747,6 @@
      // and every domain.
      HashSet<DomainContext> tmpSet = new HashSet<DomainContext>();
      String missingDomains = "";
      int i =0;
      if (rsdi != null)
      {
        while (rsdi.hasNext())
@@ -801,10 +815,12 @@
              }
              else if (!newDomainCtxt.startState.isEmpty())
              {
                // when the provided startState is older than the replication
                // changelogdb start state, it means that the replication
                // changelog db has been trimed and the cookie is not valid
                // anymore.
                /*
                when the provided startState is older than the replication
                changelogdb startState, it means that the replication
                changelog db has been trimmed and the cookie is not valid
                anymore.
                */
                boolean cookieTooOld = false;
                for (int aServerId : rsd.getStartState())
                {
@@ -850,7 +866,6 @@
          // store the new context
          tmpSet.add(newDomainCtxt);
          i++;
        }
      }
@@ -864,14 +879,16 @@
              "<" + (providedCookie + missingDomains)+ ">"));
      }
      domainCtxts = tmpSet.toArray(new DomainContext[0]);
      domainCtxts = tmpSet.toArray(new DomainContext[tmpSet.size()]);
      // When it is valid to have the provided cookie containing unknown domains
      // (allowUnknownDomains is true), 2 cases must be considered :
      // - if the cookie contains a domain that is replicated but where
      //   ECL is disabled, then this case is considered above
      // - if the cookie contains a domain that is even not replicated
      //   then this case need to be considered here in another loop.
      /*
      When it is valid to have the provided cookie containing unknown domains
      (allowUnknownDomains is true), 2 cases must be considered :
      - if the cookie contains a domain that is replicated but where
      ECL is disabled, then this case is considered above
      - if the cookie contains a domain that is even not replicated
      then this case need to be considered here in another loop.
      */
      if (!startStatesFromProvidedCookie.isEmpty())
      {
        if (allowUnknownDomains)
@@ -884,21 +901,20 @@
      // Now do the final checking
      if (!startStatesFromProvidedCookie.isEmpty())
      {
        // After reading all the known domains from the provided cookie, there
        // is one (or several) domain that are not currently configured.
        // This domain has probably been removed or replication disabled on it.
        // The request is rejected and full resync is required.
        String unknownDomains = startStatesFromProvidedCookie.toString();
        String possibleCookie = "";
        for (int j=0; j<domainCtxts.length; j++)
        {
          possibleCookie += (domainCtxts[j].rsd.getBaseDn() + ":"
                           + domainCtxts[j].startState + ";");
        /*
        After reading all the known domains from the provided cookie, there
        is one (or several) domain that are not currently configured.
        This domain has probably been removed or replication disabled on it.
        The request is rejected and full resync is required.
        */
        StringBuilder sb = new StringBuilder();
        for (DomainContext domainCtxt : domainCtxts) {
          sb.append(domainCtxt.rsd.getBaseDn()).append(":")
              .append(domainCtxt.startState).append(";");
        }
        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
            ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
                unknownDomains +
                ". Possible cookie: <" + possibleCookie+ ">"));
                startStatesFromProvidedCookie.toString() ,sb.toString()));
      }
      // the next record from the DraftCNdb should be the one
@@ -906,12 +922,11 @@
      // Initializes each and every domain with the next(first) eligible message
      // from the domain.
      for (int j=0; j<domainCtxts.length; j++)
      {
        domainCtxts[j].getNextEligibleMessageForDomain(operationId);
      for (DomainContext domainCtxt : domainCtxts) {
        domainCtxt.getNextEligibleMessageForDomain(operationId);
        if (domainCtxts[j].nextMsg == null)
          domainCtxts[j].active = false;
        if (domainCtxt.nextMsg == null)
          domainCtxt.active = false;
      }
    }
    catch(DirectoryException de)
@@ -955,15 +970,13 @@
      draftCNDbIter.releaseCursor();
      draftCNDbIter = null;
    }
    for (int i=0;i<domainCtxts.length;i++)
    {
      if (!domainCtxts[i].rsd.unRegisterHandler(domainCtxts[i].mh))
      {
    for (DomainContext domainCtxt : domainCtxts) {
      if (!domainCtxt.unRegisterHandler()) {
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
            this +" shutdown() - error when unregistering handler "
            + domainCtxts[i].mh));
            this + " shutdown() - error when unregistering handler "
                + domainCtxt.mh));
      }
      domainCtxts[i].rsd.stopServer(domainCtxts[i].mh);
      domainCtxt.stopServer();
    }
    super.shutdown();
    domainCtxts = null;
@@ -1102,7 +1115,7 @@
        // Disable timeout for next communications
        session.setSoTimeout(0);
      }
      catch(Exception e) {}
      catch(Exception e) { /* do nothing */ }
      // sendWindow MUST be created before starting the writer
      sendWindow = new Semaphore(sendWindowSize);
@@ -1147,7 +1160,7 @@
      try
      {
        // to get the CL first and last
        // last rely on the crossDomainEligibleCN thhus must have been
        // last rely on the crossDomainEligibleCN thus must have been
        // computed before
        int[] limits = computeCLLimits(crossDomainEligibleCN);
        // Send the response
@@ -1291,7 +1304,7 @@
        // Step 1 & 2
        if (searchPhase == INIT_PHASE)
        {
          // Normally we whould not loop .. except ...
          // Default is not to loop, with one exception
          continueLooping = false;
          iDom = getOldestChangeFromDomainCtxts();
@@ -1376,7 +1389,7 @@
                  {
                    // the change from the DraftCNDb is older
                    // that means that the change has been purged from the
                    // changelogDb (and DraftCNdb not yet been trimed)
                    // changelogDb (and DraftCNdb not yet been trimmed)
                    try
                    {
@@ -1387,7 +1400,7 @@
                          + " will skip " + cnFromDraftCNDb
                          + " and read next change from the DraftCNDb.");
                      isEndOfDraftCNReached = (draftCNDbIter.next()==false);
                      isEndOfDraftCNReached = !draftCNDbIter.next();
                      if (debugEnabled())
                      TRACER.debugInfo("getNextECLUpdate generating draftCN "
@@ -1414,14 +1427,10 @@
                        break;
                      }
                      else
                      {
                        // let's go to test this new change fro the DraftCNdb
                        continue;
                      }
                    }
                    catch(Exception e)
                    {
                      // TODO: At least log a warning
                    }
                  }
                  else
@@ -1476,7 +1485,7 @@
          }
          if (domainCtxts[iDom].active)
          {
            // populates the table with the next eligible msg from idomain
            // populates the table with the next eligible msg from iDom
            // in non blocking mode, return null when no more eligible msg
            domainCtxts[iDom].getNextEligibleMessageForDomain(operationId);
          }
@@ -1489,10 +1498,9 @@
          clDomCtxtsToString("In getNextECLUpdate (persistent): " +
          "looking for the generalized oldest change");
        for (int ido=0; ido<domainCtxts.length; ido++)
        {
        for (DomainContext domainCtxt : domainCtxts) {
          // get next msg
          domainCtxts[ido].getNextEligibleMessageForDomain(operationId);
          domainCtxt.getNextEligibleMessageForDomain(operationId);
        }
        // take the oldest one
@@ -1573,8 +1581,7 @@
          + dumpState());
    // go to persistent phase if one
    for (int i=0; i<domainCtxts.length; i++)
      domainCtxts[i].active = true;
    for (DomainContext domainCtxt : domainCtxts) domainCtxt.active = true;
    if (this.isPersistent != StartECLSessionMsg.NON_PERSISTENT)
    {
@@ -1590,7 +1597,7 @@
    }
    else
    {
      // INIT_PHASE is done AND search is not persistent => reinit
      // INIT_PHASE is done AND search is not persistent => re-init
      searchPhase = UNDEFINED_PHASE;
    }