mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
09.52.2014 cb1bb5d131addd27e2927ec90cc572a8c4d40f80
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -39,6 +39,7 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
@@ -56,6 +57,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.AssuredMode.*;
import static org.opends.server.replication.common.StatusMachine.*;
/**
@@ -100,7 +102,7 @@
 *   implementation using methods {@link #initializeRemote(int)}
 *   or {@link #initializeFromRemote(int)}.
 * <p>
 *   At shutdown time, the {@link #stopDomain()} method should be called to
 *   At shutdown time, the {@link #disableService()} method should be called to
 *   cleanly stop the replication service.
 */
public abstract class ReplicationDomain
@@ -115,25 +117,21 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /** The configuration of the replication domain. */
  protected volatile ReplicationDomainCfg config;
  /**
   *  The baseDN for the Replication Service.
   *  All Replication Domain using this baseDN will be connected
   *  through the Replication Service.
   * The assured configuration of the replication domain. It is a duplicate of
   * {@link #config} because of its update model.
   *
   * @see #readAssuredConfig(ReplicationDomainCfg, boolean)
   */
  private final DN baseDN;
  /**
   * The identifier of this Replication Domain inside the
   * Replication Service.
   * Each Domain must use a unique ServerID.
   */
  private final int serverID;
  private volatile ReplicationDomainCfg assuredConfig;
  /**
   * The ReplicationBroker that is used by this ReplicationDomain to
   * connect to the ReplicationService.
   */
  protected ReplicationBroker broker = null;
  protected ReplicationBroker broker;
  /**
   * This Map is used to store all outgoing assured messages in order
@@ -158,33 +156,6 @@
  private volatile DirectoryThread listenerThread = null;
  /**
   * A Map used to store all the ReplicationDomains created on this server.
   */
  private static Map<DN, ReplicationDomain> domains =
      new HashMap<DN, ReplicationDomain>();
  /*
   * Assured mode properties
   */
  /** Whether assured mode is enabled for this domain. */
  private boolean assured = false;
  /** Assured sub mode (used when assured is true). */
  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  /** Safe Data level (used when assuredMode is SAFE_DATA). */
  private byte assuredSdLevel = 1;
  /** The timeout in ms that should be used, when waiting for assured acks. */
  private long assuredTimeout = 2000;
  /** Group id. */
  private byte groupId = 1;
  /**
   * Referrals urls to be published to other servers of the topology.
   * <p>
   * TODO: fill that with all currently opened urls if no urls configured
   */
  private final List<String> refUrls = new ArrayList<String>();
  /**
   * A set of counters used for Monitoring.
   */
  private AtomicInteger numProcessedUpdates = new AtomicInteger(0);
@@ -265,15 +236,6 @@
  private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
    new HashMap<Integer,Integer>();
  /**
   * Window size used during initialization .. between
   * - the initializer/exporter DS that listens/waits acknowledges and that
   *   slows down data msg publishing based on the slowest server
   * - and each initialized/importer DS that publishes acknowledges each
   *   WINDOW/2 data msg received.
   */
  protected final int initWindow;
  /* Status related monitoring fields */
  /**
@@ -311,6 +273,12 @@
  private final Object sessionLock = new Object();
  /**
   * The generationId for this replication domain. It is made of a hash of the
   * 1000 first entries for this domain.
   */
  protected volatile long generationId;
  /**
   * Returns the {@link CSNGenerator} that will be used to
   * generate {@link CSN} for this domain.
   *
@@ -325,46 +293,39 @@
  /**
   * Creates a ReplicationDomain with the provided parameters.
   *
   * @param baseDN     The identifier of the Replication Domain to which
   *                   this object is participating.
   * @param serverID   The identifier of the server that is participating
   *                   to the Replication Domain.
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   * @param initWindow Window used during initialization.
   * @param config
   *          The configuration object for this ReplicationDomain
   * @param generationId
   *          the generation of this ReplicationDomain
   */
  public ReplicationDomain(DN baseDN, int serverID, int initWindow)
  public ReplicationDomain(ReplicationDomainCfg config, long generationId)
  {
    this.baseDN = baseDN;
    this.serverID = serverID;
    this.initWindow = initWindow;
    this.config = config;
    this.assuredConfig = config;
    this.generationId = generationId;
    this.state = new ServerState();
    this.generator = new CSNGenerator(serverID, state);
    domains.put(baseDN, this);
    this.generator = new CSNGenerator(getServerId(), state);
  }
  /**
   * Creates a ReplicationDomain with the provided parameters.
   * (for unit test purpose only)
   * Creates a ReplicationDomain with the provided parameters. (for unit test
   * purpose only)
   *
   * @param baseDN     The identifier of the Replication Domain to which
   *                   this object is participating.
   * @param serverID   The identifier of the server that is participating
   *                   to the Replication Domain.
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   * @param serverState The serverState to use
   * @param config
   *          The configuration object for this ReplicationDomain
   * @param generationId
   *          the generation of this ReplicationDomain
   * @param serverState
   *          The serverState to use
   */
  public ReplicationDomain(DN baseDN, int serverID, ServerState serverState)
  public ReplicationDomain(ReplicationDomainCfg config, long generationId,
      ServerState serverState)
  {
    this.baseDN = baseDN;
    this.serverID = serverID;
    this.initWindow = 100;
    this.config = config;
    this.assuredConfig = config;
    this.generationId = generationId;
    this.state = serverState;
    this.generator = new CSNGenerator(serverID, state);
    domains.put(baseDN, this);
    this.generator = new CSNGenerator(getServerId(), state);
  }
  /**
@@ -387,7 +348,7 @@
    if (!isValidInitialStatus(initStatus))
    {
      logError(ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
          getBaseDNString(), Integer.toString(serverID)));
          getBaseDNString(), Integer.toString(getServerId())));
    }
    else
    {
@@ -406,7 +367,7 @@
  private void receiveChangeStatus(ChangeStatusMsg csMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDN +
      TRACER.debugInfo("Replication domain " + getBaseDN() +
        " received change status message:\n" + csMsg);
    ServerStatus reqStatus = csMsg.getRequestedStatus();
@@ -416,7 +377,7 @@
    if (event == StatusMachineEvent.INVALID_EVENT)
    {
      logError(ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
          getBaseDNString(), Integer.toString(serverID)));
          getBaseDNString(), Integer.toString(getServerId())));
      return;
    }
@@ -468,13 +429,24 @@
  }
  /**
   * Returns the base DN of this ReplicationDomain.
   * Returns the current config of this ReplicationDomain.
   *
   * @return the config
   */
  protected ReplicationDomainCfg getConfig()
  {
    return config;
  }
  /**
   * Returns the base DN of this ReplicationDomain. All Replication Domain using
   * this baseDN will be connected through the Replication Service.
   *
   * @return The base DN of this ReplicationDomain
   */
  public DN getBaseDN()
  {
    return baseDN;
    return config.getBaseDN();
  }
  /**
@@ -484,16 +456,32 @@
   */
  public String getBaseDNString()
  {
    return baseDN.toNormalizedString();
    return getBaseDN().toNormalizedString();
  }
  /**
   * Get the server ID.
   * Get the server ID. The identifier of this Replication Domain inside the
   * Replication Service. Each Domain must use a unique ServerID.
   *
   * @return The server ID.
   */
  public int getServerId()
  {
    return serverID;
    return config.getServerId();
  }
  /**
   * Window size used during initialization .. between - the
   * initializer/exporter DS that listens/waits acknowledges and that slows down
   * data msg publishing based on the slowest server - and each
   * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg
   * received.
   *
   * @return the initWindow
   */
  protected int getInitWindow()
  {
    return config.getInitializationWindowSize();
  }
  /**
@@ -502,25 +490,38 @@
   */
  public boolean isAssured()
  {
    return assured;
    return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType())
        || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType());
  }
  /**
   * Gives the mode for the assured replication of the domain.
   * Gives the mode for the assured replication of the domain. Only used when
   * assured is true).
   *
   * @return The mode for the assured replication of the domain.
   */
  public AssuredMode getAssuredMode()
  {
    return assuredMode;
    switch (assuredConfig.getAssuredType())
    {
    case SAFE_DATA:
    case NOT_ASSURED: // The assured mode will be ignored in that case anyway
      return AssuredMode.SAFE_DATA_MODE;
    case SAFE_READ:
      return AssuredMode.SAFE_READ_MODE;
    }
    return null; // should never happen
  }
  /**
   * Gives the assured level of the replication of the domain.
   * Gives the assured Safe Data level of the replication of the domain. (used
   * when assuredMode is SAFE_DATA).
   *
   * @return The assured level of the replication of the domain.
   */
  public byte getAssuredSdLevel()
  {
    return assuredSdLevel;
    return (byte) assuredConfig.getAssuredSdLevel();
  }
  /**
@@ -529,7 +530,7 @@
   */
  public long getAssuredTimeout()
  {
    return assuredTimeout;
    return assuredConfig.getAssuredTimeout();
  }
  /**
@@ -538,16 +539,20 @@
   */
  public byte getGroupId()
  {
    return groupId;
    return (byte) config.getGroupId();
  }
  /**
   * Gets the referrals URLs this domain publishes.
   * Gets the referrals URLs this domain publishes. Referrals urls to be
   * published to other servers of the topology.
   * <p>
   * TODO: fill that with all currently opened urls if no urls configured
   *
   * @return The referrals URLs this domain publishes.
   */
  public List<String> getRefUrls()
  public Set<String> getRefUrls()
  {
    return refUrls;
    return config.getReferralsUrl();
  }
  /**
@@ -673,67 +678,6 @@
  }
  /**
   * Set the list of Referrals that should be returned when an
   * operation needs to be redirected to this server.
   *
   * @param referralsUrl The list of referrals.
   */
  public void setURLs(Set<String> referralsUrl)
  {
      this.refUrls.addAll(referralsUrl);
  }
  /**
   * Set the timeout of the assured replication.
   *
   * @param assuredTimeout the timeout of the assured replication.
   */
  public void setAssuredTimeout(long assuredTimeout)
  {
    this.assuredTimeout = assuredTimeout;
  }
  /**
   * Sets the groupID.
   *
   * @param groupId The groupID.
   */
  public void setGroupId(byte groupId)
  {
    this.groupId = groupId;
  }
  /**
   * Sets the level of assured replication.
   *
   * @param assuredSdLevel The level of assured replication.
   */
  public void setAssuredSdLevel(byte assuredSdLevel)
  {
    this.assuredSdLevel = assuredSdLevel;
  }
  /**
   * Sets the assured replication mode.
   *
   * @param dataMode The assured replication mode.
   */
  public void setAssuredMode(AssuredMode dataMode)
  {
    this.assuredMode = dataMode;
  }
  /**
   * Sets assured replication.
   *
   * @param assured A boolean indicating if assured replication should be used.
   */
  public void setAssured(boolean assured)
  {
    this.assured = assured;
  }
  /**
   * Receives an update message from the replicationServer.
   * The other types of messages are processed in an opaque way for the caller.
   * Also responsible for updating the list of pending changes
@@ -802,8 +746,8 @@
            */
            if (debugEnabled())
              TRACER.debugInfo(
                  "[IE] processErrorMsg:" + this.serverID +
                  " baseDN: " + this.baseDN +
                  "[IE] processErrorMsg:" + getServerId() +
                  " baseDN: " + getBaseDN() +
                  " Error Msg received: " + errorMsg);
            if (errorMsg.getCreationTime() > ieContext.startTime)
@@ -873,10 +817,9 @@
    }
    numRcvdUpdates.incrementAndGet();
     byte rsGroupId = broker.getRsGroupId();
    if (update.isAssured()
        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE
        && rsGroupId == groupId)
        && broker.getRsGroupId() == getGroupId()
        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
    {
      assuredSrReceivedUpdates.incrementAndGet();
    }
@@ -949,7 +892,7 @@
        requested servers. Log problem
        */
        logError(NOTE_DS_RECEIVED_ACK_ERROR.get(
            getBaseDNString(), Integer.toString(serverID),
            getBaseDNString(), Integer.toString(getServerId()),
            update.toString(), ack.errorsToString()));
        List<Integer> failedServers = ack.getFailedServers();
@@ -1048,7 +991,7 @@
     */
    public ExportThread(int serverIdToInitialize, int initWindow)
    {
      super("Export thread from serverId=" + serverID + " to serverId="
      super("Export thread from serverId=" + getServerId() + " to serverId="
          + serverIdToInitialize);
      this.serverIdToInitialize = serverIdToInitialize;
      this.initWindow = initWindow;
@@ -1379,7 +1322,7 @@
  public void initializeRemote(int target, Task initTask)
  throws DirectoryException
  {
    initializeRemote(target, this.serverID, initTask, this.initWindow);
    initializeRemote(target, getServerId(), initTask, getInitWindow());
  }
  /**
@@ -1418,7 +1361,7 @@
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
          countEntries(), getBaseDNString(), serverID));
          countEntries(), getBaseDNString(), getServerId()));
      for (DSInfo dsi : getReplicasList())
      {
@@ -1436,8 +1379,8 @@
    }
    else
    {
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
          countEntries(), getBaseDNString(), serverID, serverToInitialize));
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
          getBaseDNString(), getServerId(), serverToInitialize));
      ieContext.startList.add(serverToInitialize);
@@ -1471,7 +1414,7 @@
        // Send start message to the peer
        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
            getBaseDN(), serverID, serverToInitialize,
            getBaseDN(), getServerId(), serverToInitialize,
            serverRunningTheTask, ieContext.entryCount, initWindow);
        broker.publish(initTargetMsg);
@@ -1492,8 +1435,8 @@
        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
        // Notify the peer of the success
        DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
        broker.publish(doneMsg);
        broker.publish(
            new DoneMsg(getServerId(), initTargetMsg.getDestination()));
      }
      catch(DirectoryException exportException)
      {
@@ -1595,12 +1538,12 @@
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get(
          getBaseDNString(), serverID, cause));
          getBaseDNString(), getServerId(), cause));
    }
    else
    {
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
          getBaseDNString(), serverID, serverToInitialize, cause));
          getBaseDNString(), getServerId(), serverToInitialize, cause));
    }
@@ -1894,10 +1837,8 @@
            // send the ack of flow control mgmt
            if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
            {
              InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
                  this.serverID,
                  entryMsg.getSenderID(),
                  ieContext.msgCnt);
              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
                  getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
              broker.publish(amsg, false);
              if (debugEnabled())
              {
@@ -1945,7 +1886,7 @@
              Message.raw(Category.SYNC, Severity.NOTICE,
                  ERR_INIT_EXPORTER_DISCONNECTION.get(
                      getBaseDNString(),
                      Integer.toString(this.serverID),
                      Integer.toString(getServerId()),
                      Integer.toString(ieContext.importSource)));
            ieContext.setExceptionIfNoneSet(new DirectoryException(
                ResultCode.OTHER, errMsg));
@@ -2017,7 +1958,7 @@
    // build the message
    EntryMsg entryMessage = new EntryMsg(
        serverID,ieContext.getExportTarget(), lDIFEntry, pos, length,
        getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
        ++ieContext.msgCnt);
    // Waiting the slowest loop
@@ -2219,7 +2160,7 @@
      ieContext.initializeTask = initTask;
      ieContext.attemptCnt = 0;
      ieContext.initReqMsgSent = new InitializeRequestMsg(
          getBaseDN(), serverID, source, this.initWindow);
          getBaseDN(), getServerId(), source, getInitWindow());
      // Publish Init request msg
      broker.publish(ieContext.initReqMsgSent);
@@ -2281,14 +2222,14 @@
    try
    {
      // Log starting
      logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
          getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID));
      logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(getBaseDNString(),
          initTargetMsgReceived.getSenderID(), getServerId()));
      // Go into full update status
      setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
      // Acquire an import context if no already done (and initialize).
      if (initTargetMsgReceived.getInitiatorID() != this.serverID)
      if (initTargetMsgReceived.getInitiatorID() != getServerId())
      {
        /*
        The initTargetMsgReceived is for an import initiated by the remote
@@ -2418,7 +2359,8 @@
      finally
      {
        Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
            getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID,
            getBaseDNString(), initTargetMsgReceived.getSenderID(),
            getServerId(),
            (ieContext.getException() == null ? ""
                : ieContext.getException().getLocalizedMessage()));
        logError(msg);
@@ -2458,7 +2400,7 @@
    if (newStatus == ServerStatus.INVALID_STATUS)
    {
      logError(ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(),
          Integer.toString(serverID), status.toString(), event.toString()));
          String.valueOf(getServerId()), status.toString(), event.toString()));
      return;
    }
@@ -2472,13 +2414,11 @@
        resetMonitoringCounters();
      }
      // Store new status
      status = newStatus;
      if (debugEnabled())
      {
        TRACER.debugInfo("Replication domain " + baseDN + " new status is: "
            + status);
        TRACER.debugInfo("Replication domain " + getBaseDN()
            + " new status is: " + status);
      }
      // Perform whatever actions are needed to apply properties for being
@@ -2560,10 +2500,8 @@
    // check that at least one ReplicationServer did change its generation-id
    checkGenerationID(-1);
    // Reconnect to the Replication Server so that it adopt our
    // GenerationID.
    disableService();
    enableService();
    // Reconnect to the Replication Server so that it adopts our GenerationID.
    restartService();
    // wait for the domain to reconnect.
    int count = 0;
@@ -2597,8 +2535,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
          + " resetGenerationId " + generationIdNewValue);
      TRACER.debugInfo("Server id " + getServerId() + " and domain "
          + getBaseDN() + " resetGenerationId " + generationIdNewValue);
    }
    ResetGenerationIdMsg genIdMessage =
@@ -2607,7 +2545,7 @@
    if (!isConnected())
    {
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
          Integer.toString(serverID),
          Integer.toString(getServerId()),
          Long.toString(genIdMessage.getGenerationId()));
      throw new DirectoryException(ResultCode.OTHER, message);
    }
@@ -3110,33 +3048,19 @@
  }
  /**
   * Definitively stops the Replication Service.
   */
  public void stopDomain()
  {
    disableService();
    domains.remove(baseDN);
  }
  /**
   * Change some ReplicationDomain parameters.
   *
   * @param config
   *          The new configuration that this domain should now use.
   */
  public void changeConfig(ReplicationDomainCfg config)
  protected void changeConfig(ReplicationDomainCfg config)
  {
    this.groupId = (byte) config.getGroupId();
    if (broker != null && broker.changeConfig(config))
    {
      disableService();
      enableService();
      restartService();
    }
  }
  /**
   * Applies a configuration change to the attributes which should be be
   * included in the ECL.
@@ -3149,15 +3073,19 @@
  public void changeConfig(Set<String> includeAttributes,
      Set<String> includeAttributesForDeletes)
  {
    if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes)
        && broker != null)
    final boolean attrsModified = setEclIncludes(
        getServerId(), includeAttributes, includeAttributesForDeletes);
    if (attrsModified && broker != null)
    {
      disableService();
      enableService();
      restartService();
    }
  }
  private void restartService()
  {
    disableService();
    enableService();
  }
  /**
   * This method should trigger an export of the replicated data.
@@ -3236,15 +3164,13 @@
    Send an ack if it was requested and the group id is the same of the RS
    one. Only Safe Read mode makes sense in DS for returning an ack.
    */
    byte rsGroupId = broker.getRsGroupId();
    // Assured feature is supported starting from replication protocol V2
    if (msg.isAssured()
      && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
    {
      AssuredMode msgAssuredMode = msg.getAssuredMode();
      if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
      if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
      {
        if (rsGroupId == groupId)
        if (broker.getRsGroupId() == getGroupId())
        {
          // Send the ack
          AckMsg ackMsg = new AckMsg(msg.getCSN());
@@ -3255,7 +3181,7 @@
            ackMsg.setHasReplayError(true);
            //   -> replay error occurred in our server
            List<Integer> idList = new ArrayList<Integer>();
            idList.add(serverID);
            idList.add(getServerId());
            ackMsg.setFailedServers(idList);
          }
          broker.publish(ackMsg);
@@ -3269,10 +3195,11 @@
          }
        }
      }
      else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
      else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE)
      {
        logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID),
            msgAssuredMode.toString(), getBaseDNString(), msg.toString()));
        logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(String.valueOf(getServerId()),
            msg.getAssuredMode().toString(), getBaseDNString(),
            msg.toString()));
      }
        // Nothing to do in Assured safe data mode, only RS ack updates.
    }
@@ -3303,23 +3230,22 @@
   */
  protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
  {
    byte rsGroupId = broker.getRsGroupId();
    /*
     * If assured configured, set message accordingly to request an ack in the
     * right assured mode.
     * No ack requested for a RS with a different group id. Assured
     * replication supported for the same locality, i.e: a topology working in
     * the same
     * geographical location). If we are connected to a RS which is not in our
     * locality, no need to ask for an ack.
     * No ack requested for a RS with a different group id.
     * Assured replication supported for the same locality,
     * i.e: a topology working in the same geographical location).
     * If we are connected to a RS which is not in our locality,
     * no need to ask for an ack.
     */
    if (assured && rsGroupId == groupId)
    if (needsAck())
    {
      msg.setAssured(true);
      msg.setAssuredMode(assuredMode);
      if (assuredMode == AssuredMode.SAFE_DATA_MODE)
      msg.setAssuredMode(getAssuredMode());
      if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)
      {
        msg.setSafeDataLevel(assuredSdLevel);
        msg.setSafeDataLevel(getAssuredSdLevel());
      }
      // Add the assured message to the list of update that are waiting for acks
@@ -3327,6 +3253,11 @@
    }
  }
  private boolean needsAck()
  {
    return isAssured() && broker.getRsGroupId() == getGroupId();
  }
  /**
   * Wait for the processing of an assured message after it has been sent, if
   * assured replication is configured, otherwise, do nothing.
@@ -3340,14 +3271,10 @@
  protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
    throws TimeoutException
  {
    byte rsGroupId = broker.getRsGroupId();
    // If assured mode configured, wait for acknowledgment for the just sent
    // message
    if (assured && rsGroupId == groupId)
    if (needsAck())
    {
      // Increment assured replication monitoring counters
      switch (assuredMode)
      switch (getAssuredMode())
      {
        case SAFE_READ_MODE:
          assuredSrSentUpdates.incrementAndGet();
@@ -3383,12 +3310,12 @@
          if (debugEnabled())
          {
            TRACER.debugInfo("waitForAck method interrupted for replication " +
              "baseDN: " + baseDN);
              "baseDN: " + getBaseDN());
          }
          break;
        }
        // Timeout ?
        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
        if ((System.currentTimeMillis() - startTime) >= getAssuredTimeout())
        {
          /*
          Timeout occurred, be sure that ack is not being received and if so,
@@ -3424,8 +3351,8 @@
          }
          throw new TimeoutException("No ack received for message csn: " + csn
              + " and replication domain: " + baseDN + " after "
              + assuredTimeout + " ms.");
              + " and replication domain: " + getBaseDN() + " after "
              + getAssuredTimeout() + " ms.");
        }
      }
    }
@@ -3481,7 +3408,7 @@
    {
      // This exception may only be raised if assured replication is enabled
      logError(NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
          Long.toString(assuredTimeout), update.toString()));
          Long.toString(getAssuredTimeout()), update.toString()));
    }
  }
@@ -3493,11 +3420,25 @@
   *
   * @return The GenerationID.
   */
  public abstract long getGenerationID();
  public long getGenerationID()
  {
    return generationId;
  }
  /**
   * Subclasses should use this method to add additional monitoring
   * information in the ReplicationDomain.
   * Sets the generationId for this replication domain.
   *
   * @param generationId
   *          the generationId to set
   */
  public void setGenerationID(long generationId)
  {
    this.generationId = generationId;
  }
  /**
   * Subclasses should use this method to add additional monitoring information
   * in the ReplicationDomain.
   *
   * @return Additional monitoring attributes that will be added in the
   *         ReplicationDomain monitoring entry.
@@ -3711,13 +3652,69 @@
   */
  public CSN getLastLocalChange()
  {
    return state.getCSN(serverID);
    return state.getCSN(getServerId());
  }
  /**
   * Gets and stores the assured replication configuration parameters. Returns a
   * boolean indicating if the passed configuration has changed compared to
   * previous values and the changes require a reconnection.
   *
   * @param config
   *          The configuration object
   * @param allowReconnection
   *          Tells if one must reconnect if significant changes occurred
   */
  protected void readAssuredConfig(ReplicationDomainCfg config,
      boolean allowReconnection)
  {
    // Disconnect if required: changing configuration values before
    // disconnection would make assured replication used immediately and
    // disconnection could cause some timeouts error.
    if (needReconnection(config) && allowReconnection)
    {
      disableService();
      assuredConfig = config;
      enableService();
    }
  }
  private boolean needReconnection(ReplicationDomainCfg cfg)
  {
    final AssuredMode assuredMode = getAssuredMode();
    switch (cfg.getAssuredType())
    {
    case NOT_ASSURED:
      if (isAssured())
      {
        return true;
      }
      break;
    case SAFE_DATA:
      if (!isAssured() || assuredMode == SAFE_READ_MODE)
      {
        return true;
      }
      break;
    case SAFE_READ:
      if (!isAssured() || assuredMode == SAFE_DATA_MODE)
      {
        return true;
      }
      break;
    }
    return isAssured()
        && assuredMode == SAFE_DATA_MODE
        && cfg.getAssuredSdLevel() != getAssuredSdLevel();
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID;
    return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId();
  }
}