mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.45.2013 c56828ecb3b656bb531d313b722bd572eeb10905
OPENDJ-1116 Introduce abstraction for the changelog DB

ECLServerHandler.java:
Renamed clDomCtxtsToString() to domaimCtxtsToString().
In waitAndProcessStartSessionECLFromRemoteServer(), removed the many declared exceptions.
Extracted method buildDomainContexts().
Renamed findOldestChangeFromDomainCtxts() to findDomainCtxtWithOldestChange().
Removed useless comments
1 files modified
154 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 154 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -31,7 +31,6 @@
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.zip.DataFormatException;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -317,7 +316,7 @@
    }
  }
  private String clDomCtxtsToString(String msg)
  private String domaimCtxtsToString(String msg)
  {
    StringBuilder buffer = new StringBuilder();
    buffer.append(msg).append("\n");
@@ -483,17 +482,12 @@
  /**
   * Wait receiving the StartSessionMsg from the remote DS and process it.
   *
   * @return the startSessionMsg received
   * @throws DirectoryException
   * @throws IOException
   * @throws ClassNotFoundException
   * @throws DataFormatException
   * @throws NotSupportedOldVersionPDUException
   * @throws Exception
   */
  private StartECLSessionMsg waitAndProcessStartSessionECLFromRemoteServer()
  throws DirectoryException, IOException, ClassNotFoundException,
  DataFormatException,
  NotSupportedOldVersionPDUException
      throws Exception
  {
    ReplicationMsg msg = session.receive();
@@ -505,9 +499,8 @@
    }
    else if (!(msg instanceof StartECLSessionMsg))
    {
      Message message = Message
          .raw("Protocol error: StartECLSessionMsg required." + msg
              + " received.");
      Message message = Message.raw(
          "Protocol error: StartECLSessionMsg required." + msg + " received.");
      abortStart(message);
      return null;
    }
@@ -576,7 +569,7 @@
   * @param startChangeNumber
   *          the start change number coming from the request filter.
   * @return the cookie corresponding to the passed in startChangeNumber.
   * @throws Exception
   * @throws ChangelogException
   *           if a database problem occurred
   * @throws DirectoryException
   *           if a database problem occurred
@@ -677,6 +670,45 @@
  private void initializeChangelogDomainCtxts(String providedCookie,
      boolean allowUnknownDomains) throws DirectoryException
  {
    try
    {
      domainCtxts = buildDomainContexts(providedCookie, allowUnknownDomains);
      startCookie = providedCookie;
      // Initializes each and every domain with the next(first) eligible message
      // from the domain.
      for (DomainContext domainCtxt : domainCtxts) {
        domainCtxt.computeNextEligibleMessageForDomain(operationId);
        if (domainCtxt.nextMsg == null)
          domainCtxt.active = false;
      }
    }
    catch(DirectoryException de)
    {
      throw de;
    }
    catch(Exception e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
      // FIXME:ECL do not publish internal exception plumb to the client
      throw new DirectoryException(
          ResultCode.OPERATIONS_ERROR,
          Message.raw(Category.SYNC, Severity.INFORMATION,"Exception raised: " +
              e),
              e);
    }
    if (debugEnabled())
      TRACER.debugInfo("initializeChangelogDomainCtxts() ends with "
          + dumpState());
  }
  private Set<DomainContext> buildDomainContexts(String providedCookie,
      boolean allowUnknownDomains) throws DirectoryException
  {
    final Set<DomainContext> results = new HashSet<DomainContext>();
    final ReplicationServer rs = this.replicationServer;
    /*
    This map is initialized from the providedCookie.
    Below, it will be traversed and each domain configured with ECL will be
@@ -685,21 +717,9 @@
    Depending on allowUnknownDomains provided flag, a non empty map will
    be considered as an error when allowUnknownDomains is false.
    */
    Map<DN, ServerState> startStatesFromProvidedCookie =
        new HashMap<DN, ServerState>();
    ReplicationServer rs = this.replicationServer;
    // Parse the provided cookie and overwrite startState from it.
    if ((providedCookie != null) && (providedCookie.length()!=0))
      startStatesFromProvidedCookie =
    final Map<DN, ServerState> startStatesFromProvidedCookie =
        MultiDomainServerState.splitGenStateToServerStates(providedCookie);
    try
    {
      // Creates the table that will contain the real-time info for each
      // and every domain.
      final Set<DomainContext> tmpSet = new HashSet<DomainContext>();
      final StringBuilder missingDomains = new StringBuilder();
      for (ReplicationServerDomain domain : toIterable(rs.getDomainIterator()))
      {
@@ -721,6 +741,7 @@
        if (latestServerState.isEmpty())
          continue;
        // Creates the new domain context
        final DomainContext newDomainCtxt = new DomainContext();
        newDomainCtxt.active = true;
@@ -740,8 +761,7 @@
          newDomainCtxt.startState =
              startStatesFromProvidedCookie.remove(domain.getBaseDN());
          if (providedCookie == null
              || providedCookie.length() == 0
        if (providedCookie == null || providedCookie.length() == 0
              || allowUnknownDomains)
          {
            // when there is no cookie provided in the request,
@@ -763,16 +783,14 @@
              missingDomains.append(domain.getBaseDN()).append(":;");
              continue;
            }
            else if (!newDomainCtxt.startState.isEmpty())
            {
              if (hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
          else if (!newDomainCtxt.startState.isEmpty()
              && hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
              {
                throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                    ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
                      newDomainCtxt.rsDomain.getBaseDN().toNormalizedString()));
              }
            }
          }
          newDomainCtxt.stopState = latestServerState;
        }
@@ -789,8 +807,7 @@
        previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(),
                              newDomainCtxt.startState);
        // store the new context
        tmpSet.add(newDomainCtxt);
      results.add(newDomainCtxt);
      }
      if (missingDomains.length()>0)
@@ -803,8 +820,6 @@
              "<" + providedCookie + missingDomains + ">"));
      }
      domainCtxts = tmpSet;
      /*
      When it is valid to have the provided cookie containing unknown domains
      (allowUnknownDomains is true), 2 cases must be considered :
@@ -840,35 +855,7 @@
                startStatesFromProvidedCookie.toString() ,sb.toString()));
      }
      // the next record from the CNIndexDB should be the one
      startCookie = providedCookie;
      // Initializes each and every domain with the next(first) eligible message
      // from the domain.
      for (DomainContext domainCtxt : domainCtxts) {
        domainCtxt.computeNextEligibleMessageForDomain(operationId);
        if (domainCtxt.nextMsg == null)
          domainCtxt.active = false;
      }
    }
    catch(DirectoryException de)
    {
      throw de;
    }
    catch(Exception e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
      // FIXME:ECL do not publish internal exception plumb to the client
      throw new DirectoryException(
          ResultCode.OPERATIONS_ERROR,
          Message.raw(Category.SYNC, Severity.INFORMATION,"Exception raised: " +
              e),
              e);
    }
    if (debugEnabled())
      TRACER.debugInfo(
        " initializeCLDomCtxts ends with " + " " + dumpState());
    return results;
  }
  private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain,
@@ -1039,6 +1026,7 @@
      try
      {
        // Disable timeout for next communications
        // FIXME: why? and where is it reset?
        session.setSoTimeout(0);
      }
      catch(Exception e) { /* do nothing */ }
@@ -1046,18 +1034,15 @@
      // sendWindow MUST be created before starting the writer
      sendWindow = new Semaphore(sendWindowSize);
      // create reader
      reader = new ServerReader(session, this);
      reader.start();
      if (writer == null)
      {
        // create writer
        writer = new ECLServerWriter(session,this,replicationServerDomain);
        writer.start();
      }
      // Resume the writer
      ((ECLServerWriter)writer).resumeWriter();
      // TODO:ECL Potential race condition if writer not yet resumed here
@@ -1073,7 +1058,7 @@
    if (debugEnabled())
      TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId
          + " initialized: " + " " + dumpState() + " " + " "
          + clDomCtxtsToString(""));
          + domaimCtxtsToString(""));
  }
  private void initializeChangelogSearch(StartECLSessionMsg msg)
@@ -1136,18 +1121,17 @@
  @Override
  protected UpdateMsg getNextMessage(boolean synchronous)
  {
    UpdateMsg msg = null;
    try
    {
      ECLUpdateMsg eclMsg = getNextECLUpdate();
      if (eclMsg!=null)
        msg = eclMsg.getUpdateMsg();
        return eclMsg.getUpdateMsg();
    }
    catch(DirectoryException de)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, de);
    }
    return msg;
    return null;
  }
  /**
@@ -1187,7 +1171,7 @@
      while (continueLooping && searchPhase == INIT_PHASE)
      {
        // Step 1 & 2
        DomainContext oldestContext = findOldestChangeFromDomainCtxts();
        final DomainContext oldestContext = findDomainCtxtWithOldestChange();
        if (oldestContext == null)
        { // there is no oldest change to process
          closeInitPhase();
@@ -1196,7 +1180,6 @@
          return null;
        }
        // Build the ECLUpdateMsg to be returned
        final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
        // Default is not to loop, with one exception
@@ -1222,8 +1205,6 @@
        }
        if (oldestContext.active)
        {
          // populates the table with the next eligible msg from iDom
          // in non blocking mode, return null when no more eligible msg
          oldestContext.computeNextEligibleMessageForDomain(operationId);
        }
        oldestChange = change;
@@ -1232,7 +1213,7 @@
      if (searchPhase == PERSISTENT_PHASE)
      {
        if (debugEnabled())
          TRACER.debugInfo(clDomCtxtsToString(
          TRACER.debugInfo(domaimCtxtsToString(
              "In getNextECLUpdate (persistent): "
                  + "looking for the generalized oldest change"));
@@ -1240,12 +1221,11 @@
          domainCtxt.computeNextEligibleMessageForDomain(operationId);
        }
        DomainContext oldestContext = findOldestChangeFromDomainCtxts();
        final DomainContext oldestContext = findDomainCtxtWithOldestChange();
        if (oldestContext != null)
        {
          final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
          oldestContext.currentState.update(change.getUpdateMsg().getCSN());
          if (draftCompat)
          {
            assignNewChangeNumberAndStore(change);
@@ -1269,16 +1249,12 @@
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn);
      // Update the current state
      previousCookie.update(oldestChange.getBaseDN(), csn);
      // Set the current value of global state in the returned message
      oldestChange.setCookie(previousCookie);
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate returns result oldestChange="
            + oldestChange);
    }
    return oldestChange;
  }
@@ -1327,8 +1303,6 @@
        return true;
      }
      // the next change from the CNIndexDB
      final CNIndexRecord currentRecord = cnIndexDBCursor.getRecord();
      final CSN csnFromCNIndexDB = currentRecord.getCSN();
      final DN dnFromCNIndexDB = currentRecord.getBaseDN();
@@ -1454,7 +1428,7 @@
      searchPhase = UNDEFINED_PHASE;
    }
    // End of INIT_PHASE => always release the iterator
    // End of INIT_PHASE => always release the cursor
    releaseCursor();
  }
@@ -1464,13 +1438,13 @@
   * @return the domainCtxt of the domain with the oldest change, null when
   *         none.
   */
  private DomainContext findOldestChangeFromDomainCtxts()
  private DomainContext findDomainCtxtWithOldestChange()
  {
    DomainContext oldestCtxt = null;
    for (DomainContext domainCtxt : domainCtxts)
    {
      if (domainCtxt.active
          // .msg is null when the previous (non blocking) nextMessage did
          // .nextMsg is null when the previous (non blocking) nextMessage did
          // not have any eligible msg to return
          && domainCtxt.nextMsg != null
          && (oldestCtxt == null
@@ -1482,7 +1456,7 @@
    if (debugEnabled())
      TRACER.debugInfo("In cn=changelog," + this
          + " getOldestChangeFromDomainCtxts() returns "
          + " findDomainCtxtWithOldestChange() returns "
          + ((oldestCtxt != null) ? oldestCtxt.nextMsg : "-1"));
    return oldestCtxt;