mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.19.2013 8d567305382a771caffae063adcd7a42af2c7b3e
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -41,6 +41,7 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
@@ -68,8 +69,7 @@
 *   The startup phase of the ReplicationDomain subclass,
 *   should read the list of replication servers from the configuration,
 *   instantiate a {@link ServerState} then start the publish service
 *   by calling
 *   {@link #startPublishService(Set, int, long, long)}.
 *   by calling {@link #startPublishService(ReplicationDomainCfg)}.
 *   At this point it can start calling the {@link #publish(UpdateMsg)}
 *   method if needed.
 * <p>
@@ -274,7 +274,7 @@
   * - and each initialized/importer DS that publishes acknowledges each
   *   WINDOW/2 data msg received.
   */
  protected int initWindow = 100;
  protected final int initWindow;
  /* Status related monitoring fields */
@@ -304,8 +304,7 @@
  private final Map<Integer, Set<String>> eclIncludesForDeletesByServer =
    new HashMap<Integer, Set<String>>();
  private Set<String> eclIncludesForDeletesAllServers = Collections
      .emptySet();
  private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet();
  /**
   * An object used to protect the initialization of the underlying broker
@@ -363,6 +362,7 @@
  {
    this.baseDN = baseDN;
    this.serverID = serverID;
    this.initWindow = 100;
    this.state = serverState;
    this.generator = new CSNGenerator(serverID, state);
@@ -1060,7 +1060,7 @@
    public void run()
    {
      if (debugEnabled())
        TRACER.debugInfo("[IE] starting " + this.getName());
        TRACER.debugInfo("[IE] starting " + getName());
      try
      {
        initializeRemote(serverIdToInitialize, serverIdToInitialize, null,
@@ -1075,7 +1075,7 @@
      }
      if (debugEnabled())
        TRACER.debugInfo("[IE] ending " + this.getName());
        TRACER.debugInfo("[IE] ending " + getName());
    }
  }
@@ -1313,7 +1313,7 @@
   */
  public int decodeTarget(String targetString) throws DirectoryException
  {
    if (targetString.equalsIgnoreCase("all"))
    if ("all".equalsIgnoreCase(targetString))
    {
      return RoutableMsg.ALL_SERVERS;
    }
@@ -1612,7 +1612,7 @@
            "[IE] wait for start dsid " + dsi.getDsId()
            + " " + dsi.getStatus()
            + " " + dsi.getGenerationId()
            + " " + this.getGenerationID());
            + " " + getGenerationID());
        if (ieContext.startList.contains(dsi.getDsId()))
        {
          if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
@@ -1711,7 +1711,7 @@
          }
          else
          {
            if (dsInfo.getGenerationId() == this.getGenerationID())
            if (dsInfo.getGenerationId() == getGenerationID())
            { // and with the expected generationId
              // We're done with this server
              it.remove();
@@ -1757,8 +1757,7 @@
    {
      // Rejects 2 simultaneous exports
      Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
      throw new DirectoryException(ResultCode.OTHER,
          message);
      throw new DirectoryException(ResultCode.OTHER, message);
    }
    ieContext = new IEContext(importInProgress);
@@ -1777,34 +1776,30 @@
   */
  private void processErrorMsg(ErrorMsg errorMsg)
  {
    if (ieContext != null)
    //Exporting must not be stopped on the first error, if we run initialize-all
    if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
    {
      /*
        Exporting must not be stopped on the first error, if we
        run initialize-all.
      */
      if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
      // The ErrorMsg is received while we have started an initialization
      if (ieContext.getException() == null)
      {
        // The ErrorMsg is received while we have started an initialization
        if (ieContext.getException() == null)
          ieContext.setException(new DirectoryException(ResultCode.OTHER,
              errorMsg.getDetails()));
        ieContext.setException(
            new DirectoryException(ResultCode.OTHER, errorMsg.getDetails()));
      }
        /*
         * This can happen :
         * - on the first InitReqMsg sent when source in not known for example
         * - on the next attempt when source crashed and did not reconnect
         *   even after the nextInitAttemptDelay
         * During the import, the ErrorMsg will be received by receiveEntryBytes
         */
        if (ieContext.initializeTask instanceof InitializeTask)
        {
          // Update the task that initiated the import
          ((InitializeTask)ieContext.initializeTask).
          updateTaskCompletionState(ieContext.getException());
      /*
       * This can happen :
       * - on the first InitReqMsg sent when source in not known for example
       * - on the next attempt when source crashed and did not reconnect
       *   even after the nextInitAttemptDelay
       * During the import, the ErrorMsg will be received by receiveEntryBytes
       */
      if (ieContext.initializeTask instanceof InitializeTask)
      {
        // Update the task that initiated the import
        ((InitializeTask) ieContext.initializeTask)
            .updateTaskCompletionState(ieContext.getException());
          releaseIEContext();
        }
        releaseIEContext();
      }
    }
  }
@@ -1894,8 +1889,7 @@
        {
          /*
          This is the normal termination of the import
          No error is stored and the import is ended
          by returning null
          No error is stored and the import is ended by returning null
          */
          return null;
        }
@@ -1903,8 +1897,7 @@
        {
          /*
          This is an error termination during the import
          The error is stored and the import is ended
          by returning null
          The error is stored and the import is ended by returning null
          */
          if (ieContext.getException() == null)
          {
@@ -1921,8 +1914,8 @@
        {
          // Other messages received during an import are trashed except
          // the topologyMsg.
          if ((msg instanceof TopologyMsg) &&
              (isRemoteDSConnected(ieContext.importSource)==null))
          if (msg instanceof TopologyMsg
              && isRemoteDSConnected(ieContext.importSource) == null)
          {
            Message errMsg =
              Message.raw(Category.SYNC, Severity.NOTICE,
@@ -2043,8 +2036,8 @@
        catch(Exception e) { /* do nothing */ }
        // process any connection error
        if ((broker.hasConnectionError())||
            (broker.getNumLostConnections()!= ieContext.initNumLostConnections))
        if (broker.hasConnectionError()
          || broker.getNumLostConnections() != ieContext.initNumLostConnections)
        {
          // publish failed - store the error in the ieContext ...
          DirectoryException de = new DirectoryException(ResultCode.OTHER,
@@ -2485,8 +2478,7 @@
   * @throws DirectoryException When the generation ID of the Replication
   *                            Servers is not the expected value.
   */
  private void checkGenerationID(long generationID)
  throws DirectoryException
  private void checkGenerationID(long generationID) throws DirectoryException
  {
    boolean allSet = true;
@@ -2535,7 +2527,7 @@
  public void resetReplicationLog() throws DirectoryException
  {
    // Reset the Generation ID to -1 to clean the ReplicationServers.
    resetGenerationId((long)-1);
    resetGenerationId(-1L);
    // check that at least one ReplicationServer did change its generation-id
    checkGenerationID(-1);
@@ -2573,43 +2565,35 @@
   * @throws DirectoryException   When an error occurs
   */
  public void resetGenerationId(Long generationIdNewValue)
  throws DirectoryException
      throws DirectoryException
  {
    if (debugEnabled())
      TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
          + " resetGenerationId " + generationIdNewValue);
    ResetGenerationIdMsg genIdMessage;
    if (generationIdNewValue == null)
    {
      genIdMessage = new ResetGenerationIdMsg(this.getGenerationID());
    }
    else
    {
      genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
    }
    ResetGenerationIdMsg genIdMessage =
        new ResetGenerationIdMsg(getGenId(generationIdNewValue));
    if (!isConnected())
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
          Integer.toString(serverID),
          Long.toString(genIdMessage.getGenerationId()));
      throw new DirectoryException(
         resultCode, message);
      throw new DirectoryException(ResultCode.OTHER, message);
    }
    broker.publish(genIdMessage);
    // check that at least one ReplicationServer did change its generation-id
    if (generationIdNewValue == null)
    checkGenerationID(getGenId(generationIdNewValue));
  }
  private long getGenId(Long generationIdNewValue)
  {
    if (generationIdNewValue != null)
    {
      checkGenerationID(this.getGenerationID());
      return generationIdNewValue;
    }
    else
    {
      checkGenerationID(generationIdNewValue);
    }
    return getGenerationID();
  }
@@ -2945,24 +2929,17 @@
   */
  /**
   * Start the publish mechanism of the Replication Service.
   * After this method has been called, the publish service can be used
   * by calling the {@link #publish(UpdateMsg)} method.
   * Start the publish mechanism of the Replication Service. After this method
   * has been called, the publish service can be used by calling the
   * {@link #publish(UpdateMsg)} method.
   *
   * @param replicationServers   The replication servers that should be used.
   * @param window               The window size of this replication domain.
   * @param heartbeatInterval    The heartbeatInterval that should be used
   *                             to check the availability of the replication
   *                             servers.
   * @param changetimeHeartbeatInterval  The interval used to send change
   *                             time heartbeat to the replication server.
   *
   * @throws ConfigException     If the DirectoryServer configuration was
   *                             incorrect.
   * @param config
   *          The configuration that should be used.
   * @throws ConfigException
   *           If the DirectoryServer configuration was incorrect.
   */
  public void startPublishService(Set<String> replicationServers, int window,
      long heartbeatInterval, long changetimeHeartbeatInterval)
  throws ConfigException
  public void startPublishService(ReplicationDomainCfg config)
      throws ConfigException
  {
    synchronized (sessionLock)
    {
@@ -2970,15 +2947,8 @@
      {
        // create the broker object used to publish and receive changes
        broker = new ReplicationBroker(
            this, state, baseDN,
            serverID, window,
            getGenerationID(),
            heartbeatInterval,
            new ReplSessionSecurity(),
            getGroupId(),
            changetimeHeartbeatInterval);
        broker.start(replicationServers);
            this, state, config, getGenerationID(), new ReplSessionSecurity());
        broker.start();
      }
    }
  }
@@ -2990,7 +2960,7 @@
   * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
   * <p>
   * This method must be called once and must be called after the
   * {@link #startPublishService(Collection, int, long, long)}.
   * {@link #startPublishService(ReplicationDomainCfg)}.
   */
  public void startListenService()
  {
@@ -3040,12 +3010,12 @@
   * <p>
   * The Replication Service will restart from the point indicated by the
   * {@link ServerState} that was given as a parameter to the
   * {@link #startPublishService(Collection, int, long, long)}
   * at startup time.
   * {@link #startPublishService(ReplicationDomainCfg)} at startup time.
   * <p>
   * If some data have changed in the repository during the period of time when
   * the Replication Service was disabled, this {@link ServerState} should
   * therefore be updated by the Replication Domain subclass before calling
   * this method. This method is not MT safe.
   * therefore be updated by the Replication Domain subclass before calling this
   * method. This method is not MT safe.
   */
  public void enableService()
  {
@@ -3071,21 +3041,14 @@
  /**
   * Change some ReplicationDomain parameters.
   *
   * @param replicationServers  The new set of Replication Servers that this
   *                           domain should now use.
   * @param windowSize         The window size that this domain should use.
   * @param heartbeatInterval  The heartbeatInterval that this domain should
   *                           use.
   * @param groupId            The new group id to use
   * @param config
   *          The new configuration that this domain should now use.
   */
  public void changeConfig(Set<String> replicationServers, int windowSize,
      long heartbeatInterval, byte groupId)
  public void changeConfig(ReplicationDomainCfg config)
  {
    this.groupId = groupId;
    this.groupId = (byte) config.getGroupId();
    if (broker != null
        && broker.changeConfig(replicationServers, windowSize,
            heartbeatInterval, groupId))
    if (broker != null && broker.changeConfig(config))
    {
      disableService();
      enableService();
@@ -3195,47 +3158,46 @@
    one. Only Safe Read mode makes sense in DS for returning an ack.
    */
    byte rsGroupId = broker.getRsGroupId();
    if (msg.isAssured())
    // Assured feature is supported starting from replication protocol V2
    if (msg.isAssured()
      && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
    {
      // Assured feature is supported starting from replication protocol V2
      if (broker.getProtocolVersion() >=
        ProtocolVersion.REPLICATION_PROTOCOL_V2)
      AssuredMode msgAssuredMode = msg.getAssuredMode();
      if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
      {
        AssuredMode msgAssuredMode = msg.getAssuredMode();
        if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
        if (rsGroupId == groupId)
        {
          if (rsGroupId == groupId)
          // Send the ack
          AckMsg ackMsg = new AckMsg(msg.getCSN());
          if (replayErrorMsg != null)
          {
            // Send the ack
            AckMsg ackMsg = new AckMsg(msg.getCSN());
            if (replayErrorMsg != null)
            {
              // Mark the error in the ack
              //   -> replay error occurred
              ackMsg.setHasReplayError(true);
              //   -> replay error occurred in our server
              List<Integer> idList = new ArrayList<Integer>();
              idList.add(serverID);
              ackMsg.setFailedServers(idList);
            }
            broker.publish(ackMsg);
            if (replayErrorMsg != null)
            {
              assuredSrReceivedUpdatesNotAcked.incrementAndGet();
            } else
            {
              assuredSrReceivedUpdatesAcked.incrementAndGet();
            }
            // Mark the error in the ack
            //   -> replay error occurred
            ackMsg.setHasReplayError(true);
            //   -> replay error occurred in our server
            List<Integer> idList = new ArrayList<Integer>();
            idList.add(serverID);
            ackMsg.setFailedServers(idList);
          }
        } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
        {
          Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get(
              Integer.toString(serverID), msgAssuredMode.toString(),
              getBaseDNString(), msg.toString());
          logError(errorMsg);
          broker.publish(ackMsg);
          if (replayErrorMsg != null)
          {
            assuredSrReceivedUpdatesNotAcked.incrementAndGet();
          }
          else
          {
            assuredSrReceivedUpdatesAcked.incrementAndGet();
          }
        }
        // Nothing to do in Assured safe data mode, only RS ack updates.
      }
      else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
      {
        Message errorMsg =
            ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID),
                msgAssuredMode.toString(), getBaseDNString(), msg.toString());
        logError(errorMsg);
      }
        // Nothing to do in Assured safe data mode, only RS ack updates.
    }
    incProcessedUpdates();
@@ -3301,7 +3263,7 @@
  {
    byte rsGroupId = broker.getRsGroupId();
    // If assured mode configured, wait for acknowledgement for the just sent
    // If assured mode configured, wait for acknowledgment for the just sent
    // message
    if (assured && rsGroupId == groupId)
    {
@@ -3354,40 +3316,37 @@
          remove the update from the wait list, log the timeout error and
          also update assured monitoring counters
          */
          UpdateMsg update = waitingAckMsgs.remove(csn);
          if (update != null)
          {
            // No luck, this is a real timeout
            // Increment assured replication monitoring counters
            switch (msg.getAssuredMode())
            {
              case SAFE_READ_MODE:
                assuredSrNotAcknowledgedUpdates.incrementAndGet();
                assuredSrTimeoutUpdates.incrementAndGet();
                // Increment number of errors for our RS
                updateAssuredErrorsByServer(
                  assuredSrServerNotAcknowledgedUpdates,
                  broker.getRsServerId());
                break;
              case SAFE_DATA_MODE:
                assuredSdTimeoutUpdates.incrementAndGet();
                // Increment number of errors for our RS
                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
                  broker.getRsServerId());
                break;
              default:
              // Should not happen
            }
            throw new TimeoutException("No ack received for message csn: "
                + csn + " and replication servceID: " + baseDN + " after "
                + assuredTimeout + " ms.");
          } else
          final UpdateMsg update = waitingAckMsgs.remove(csn);
          if (update == null)
          {
            // Ack received just before timeout limit: we can exit
            break;
          }
          // No luck, this is a real timeout
          // Increment assured replication monitoring counters
          switch (msg.getAssuredMode())
          {
          case SAFE_READ_MODE:
            assuredSrNotAcknowledgedUpdates.incrementAndGet();
            assuredSrTimeoutUpdates.incrementAndGet();
            // Increment number of errors for our RS
            updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates,
                broker.getRsServerId());
            break;
          case SAFE_DATA_MODE:
            assuredSdTimeoutUpdates.incrementAndGet();
            // Increment number of errors for our RS
            updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
                broker.getRsServerId());
            break;
          default:
            // Should not happen
          }
          throw new TimeoutException("No ack received for message csn: " + csn
              + " and replication domain: " + baseDN + " after "
              + assuredTimeout + " ms.");
        }
      }
    }
@@ -3425,8 +3384,7 @@
      update = new UpdateMsg(generator.newCSN(), msg);
      /*
      If assured replication is configured, this will prepare blocking
      mechanism. If assured replication is disabled, this returns
      immediately
      mechanism. If assured replication is disabled, this returns immediately
      */
      prepareWaitForAckIfAssuredEnabled(update);
@@ -3443,8 +3401,7 @@
      waitForAckIfAssuredEnabled(update);
    } catch (TimeoutException ex)
    {
      // This exception may only be raised if assured replication is
      // enabled
      // This exception may only be raised if assured replication is enabled
      Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
          Long.toString(assuredTimeout), update.toString());
      logError(errorMsg);