From cb1bb5d131addd27e2927ec90cc572a8c4d40f80 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 09 Jan 2014 09:52:23 +0000
Subject: [PATCH] Front-port of r10098.

---
 opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java |  503 +++++++++++++++++++++++++++----------------------------
 1 files changed, 250 insertions(+), 253 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index b01feaa..7f41ec9 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
@@ -39,6 +39,7 @@
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.backends.task.Task;
@@ -56,6 +57,7 @@
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.AssuredMode.*;
 import static org.opends.server.replication.common.StatusMachine.*;
 
 /**
@@ -100,7 +102,7 @@
  *   implementation using methods {@link #initializeRemote(int)}
  *   or {@link #initializeFromRemote(int)}.
  * <p>
- *   At shutdown time, the {@link #stopDomain()} method should be called to
+ *   At shutdown time, the {@link #disableService()} method should be called to
  *   cleanly stop the replication service.
  */
 public abstract class ReplicationDomain
@@ -115,25 +117,21 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
+  /** The configuration of the replication domain. */
+  protected volatile ReplicationDomainCfg config;
   /**
-   *  The baseDN for the Replication Service.
-   *  All Replication Domain using this baseDN will be connected
-   *  through the Replication Service.
+   * The assured configuration of the replication domain. It is a duplicate of
+   * {@link #config} because of its update model.
+   *
+   * @see #readAssuredConfig(ReplicationDomainCfg, boolean)
    */
-  private final DN baseDN;
-
-  /**
-   * The identifier of this Replication Domain inside the
-   * Replication Service.
-   * Each Domain must use a unique ServerID.
-   */
-  private final int serverID;
+  private volatile ReplicationDomainCfg assuredConfig;
 
   /**
    * The ReplicationBroker that is used by this ReplicationDomain to
    * connect to the ReplicationService.
    */
-  protected ReplicationBroker broker = null;
+  protected ReplicationBroker broker;
 
   /**
    * This Map is used to store all outgoing assured messages in order
@@ -158,33 +156,6 @@
   private volatile DirectoryThread listenerThread = null;
 
   /**
-   * A Map used to store all the ReplicationDomains created on this server.
-   */
-  private static Map<DN, ReplicationDomain> domains =
-      new HashMap<DN, ReplicationDomain>();
-
-  /*
-   * Assured mode properties
-   */
-  /** Whether assured mode is enabled for this domain. */
-  private boolean assured = false;
-  /** Assured sub mode (used when assured is true). */
-  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
-  /** Safe Data level (used when assuredMode is SAFE_DATA). */
-  private byte assuredSdLevel = 1;
-  /** The timeout in ms that should be used, when waiting for assured acks. */
-  private long assuredTimeout = 2000;
-
-  /** Group id. */
-  private byte groupId = 1;
-  /**
-   * Referrals urls to be published to other servers of the topology.
-   * <p>
-   * TODO: fill that with all currently opened urls if no urls configured
-   */
-  private final List<String> refUrls = new ArrayList<String>();
-
-  /**
    * A set of counters used for Monitoring.
    */
   private AtomicInteger numProcessedUpdates = new AtomicInteger(0);
@@ -265,15 +236,6 @@
   private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
     new HashMap<Integer,Integer>();
 
-  /**
-   * Window size used during initialization .. between
-   * - the initializer/exporter DS that listens/waits acknowledges and that
-   *   slows down data msg publishing based on the slowest server
-   * - and each initialized/importer DS that publishes acknowledges each
-   *   WINDOW/2 data msg received.
-   */
-  protected final int initWindow;
-
   /* Status related monitoring fields */
 
   /**
@@ -311,6 +273,12 @@
   private final Object sessionLock = new Object();
 
   /**
+   * The generationId for this replication domain. It is made of a hash of the
+   * 1000 first entries for this domain.
+   */
+  protected volatile long generationId;
+
+  /**
    * Returns the {@link CSNGenerator} that will be used to
    * generate {@link CSN} for this domain.
    *
@@ -325,46 +293,39 @@
   /**
    * Creates a ReplicationDomain with the provided parameters.
    *
-   * @param baseDN     The identifier of the Replication Domain to which
-   *                   this object is participating.
-   * @param serverID   The identifier of the server that is participating
-   *                   to the Replication Domain.
-   *                   This identifier should be different for each server that
-   *                   is participating to a given Replication Domain.
-   * @param initWindow Window used during initialization.
+   * @param config
+   *          The configuration object for this ReplicationDomain
+   * @param generationId
+   *          the generation of this ReplicationDomain
    */
-  public ReplicationDomain(DN baseDN, int serverID, int initWindow)
+  public ReplicationDomain(ReplicationDomainCfg config, long generationId)
   {
-    this.baseDN = baseDN;
-    this.serverID = serverID;
-    this.initWindow = initWindow;
+    this.config = config;
+    this.assuredConfig = config;
+    this.generationId = generationId;
     this.state = new ServerState();
-    this.generator = new CSNGenerator(serverID, state);
-
-    domains.put(baseDN, this);
+    this.generator = new CSNGenerator(getServerId(), state);
   }
 
   /**
-   * Creates a ReplicationDomain with the provided parameters.
-   * (for unit test purpose only)
+   * Creates a ReplicationDomain with the provided parameters. (for unit test
+   * purpose only)
    *
-   * @param baseDN     The identifier of the Replication Domain to which
-   *                   this object is participating.
-   * @param serverID   The identifier of the server that is participating
-   *                   to the Replication Domain.
-   *                   This identifier should be different for each server that
-   *                   is participating to a given Replication Domain.
-   * @param serverState The serverState to use
+   * @param config
+   *          The configuration object for this ReplicationDomain
+   * @param generationId
+   *          the generation of this ReplicationDomain
+   * @param serverState
+   *          The serverState to use
    */
-  public ReplicationDomain(DN baseDN, int serverID, ServerState serverState)
+  public ReplicationDomain(ReplicationDomainCfg config, long generationId,
+      ServerState serverState)
   {
-    this.baseDN = baseDN;
-    this.serverID = serverID;
-    this.initWindow = 100;
+    this.config = config;
+    this.assuredConfig = config;
+    this.generationId = generationId;
     this.state = serverState;
-    this.generator = new CSNGenerator(serverID, state);
-
-    domains.put(baseDN, this);
+    this.generator = new CSNGenerator(getServerId(), state);
   }
 
   /**
@@ -387,7 +348,7 @@
     if (!isValidInitialStatus(initStatus))
     {
       logError(ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
-          getBaseDNString(), Integer.toString(serverID)));
+          getBaseDNString(), Integer.toString(getServerId())));
     }
     else
     {
@@ -406,7 +367,7 @@
   private void receiveChangeStatus(ChangeStatusMsg csMsg)
   {
     if (debugEnabled())
-      TRACER.debugInfo("Replication domain " + baseDN +
+      TRACER.debugInfo("Replication domain " + getBaseDN() +
         " received change status message:\n" + csMsg);
 
     ServerStatus reqStatus = csMsg.getRequestedStatus();
@@ -416,7 +377,7 @@
     if (event == StatusMachineEvent.INVALID_EVENT)
     {
       logError(ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
-          getBaseDNString(), Integer.toString(serverID)));
+          getBaseDNString(), Integer.toString(getServerId())));
       return;
     }
 
@@ -468,13 +429,24 @@
   }
 
   /**
-   * Returns the base DN of this ReplicationDomain.
+   * Returns the current config of this ReplicationDomain.
+   *
+   * @return the config
+   */
+  protected ReplicationDomainCfg getConfig()
+  {
+    return config;
+  }
+
+  /**
+   * Returns the base DN of this ReplicationDomain. All Replication Domain using
+   * this baseDN will be connected through the Replication Service.
    *
    * @return The base DN of this ReplicationDomain
    */
   public DN getBaseDN()
   {
-    return baseDN;
+    return config.getBaseDN();
   }
 
   /**
@@ -484,16 +456,32 @@
    */
   public String getBaseDNString()
   {
-    return baseDN.toNormalizedString();
+    return getBaseDN().toNormalizedString();
   }
 
   /**
-   * Get the server ID.
+   * Get the server ID. The identifier of this Replication Domain inside the
+   * Replication Service. Each Domain must use a unique ServerID.
+   *
    * @return The server ID.
    */
   public int getServerId()
   {
-    return serverID;
+    return config.getServerId();
+  }
+
+  /**
+   * Window size used during initialization .. between - the
+   * initializer/exporter DS that listens/waits acknowledges and that slows down
+   * data msg publishing based on the slowest server - and each
+   * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg
+   * received.
+   *
+   * @return the initWindow
+   */
+  protected int getInitWindow()
+  {
+    return config.getInitializationWindowSize();
   }
 
   /**
@@ -502,25 +490,38 @@
    */
   public boolean isAssured()
   {
-    return assured;
+    return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType())
+        || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType());
   }
 
   /**
-   * Gives the mode for the assured replication of the domain.
+   * Gives the mode for the assured replication of the domain. Only used when
+   * assured is true).
+   *
    * @return The mode for the assured replication of the domain.
    */
   public AssuredMode getAssuredMode()
   {
-    return assuredMode;
+    switch (assuredConfig.getAssuredType())
+    {
+    case SAFE_DATA:
+    case NOT_ASSURED: // The assured mode will be ignored in that case anyway
+      return AssuredMode.SAFE_DATA_MODE;
+    case SAFE_READ:
+      return AssuredMode.SAFE_READ_MODE;
+    }
+    return null; // should never happen
   }
 
   /**
-   * Gives the assured level of the replication of the domain.
+   * Gives the assured Safe Data level of the replication of the domain. (used
+   * when assuredMode is SAFE_DATA).
+   *
    * @return The assured level of the replication of the domain.
    */
   public byte getAssuredSdLevel()
   {
-    return assuredSdLevel;
+    return (byte) assuredConfig.getAssuredSdLevel();
   }
 
   /**
@@ -529,7 +530,7 @@
    */
   public long getAssuredTimeout()
   {
-    return assuredTimeout;
+    return assuredConfig.getAssuredTimeout();
   }
 
   /**
@@ -538,16 +539,20 @@
    */
   public byte getGroupId()
   {
-    return groupId;
+    return (byte) config.getGroupId();
   }
 
   /**
-   * Gets the referrals URLs this domain publishes.
+   * Gets the referrals URLs this domain publishes. Referrals urls to be
+   * published to other servers of the topology.
+   * <p>
+   * TODO: fill that with all currently opened urls if no urls configured
+   *
    * @return The referrals URLs this domain publishes.
    */
-  public List<String> getRefUrls()
+  public Set<String> getRefUrls()
   {
-    return refUrls;
+    return config.getReferralsUrl();
   }
 
   /**
@@ -673,67 +678,6 @@
   }
 
   /**
-   * Set the list of Referrals that should be returned when an
-   * operation needs to be redirected to this server.
-   *
-   * @param referralsUrl The list of referrals.
-   */
-  public void setURLs(Set<String> referralsUrl)
-  {
-      this.refUrls.addAll(referralsUrl);
-  }
-
-  /**
-   * Set the timeout of the assured replication.
-   *
-   * @param assuredTimeout the timeout of the assured replication.
-   */
-  public void setAssuredTimeout(long assuredTimeout)
-  {
-    this.assuredTimeout = assuredTimeout;
-  }
-
-  /**
-   * Sets the groupID.
-   *
-   * @param groupId The groupID.
-   */
-  public void setGroupId(byte groupId)
-  {
-    this.groupId = groupId;
-  }
-
-  /**
-   * Sets the level of assured replication.
-   *
-   * @param assuredSdLevel The level of assured replication.
-   */
-  public void setAssuredSdLevel(byte assuredSdLevel)
-  {
-    this.assuredSdLevel = assuredSdLevel;
-  }
-
-  /**
-   * Sets the assured replication mode.
-   *
-   * @param dataMode The assured replication mode.
-   */
-  public void setAssuredMode(AssuredMode dataMode)
-  {
-    this.assuredMode = dataMode;
-  }
-
-  /**
-   * Sets assured replication.
-   *
-   * @param assured A boolean indicating if assured replication should be used.
-   */
-  public void setAssured(boolean assured)
-  {
-    this.assured = assured;
-  }
-
-  /**
    * Receives an update message from the replicationServer.
    * The other types of messages are processed in an opaque way for the caller.
    * Also responsible for updating the list of pending changes
@@ -802,8 +746,8 @@
             */
             if (debugEnabled())
               TRACER.debugInfo(
-                  "[IE] processErrorMsg:" + this.serverID +
-                  " baseDN: " + this.baseDN +
+                  "[IE] processErrorMsg:" + getServerId() +
+                  " baseDN: " + getBaseDN() +
                   " Error Msg received: " + errorMsg);
 
             if (errorMsg.getCreationTime() > ieContext.startTime)
@@ -873,10 +817,9 @@
     }
 
     numRcvdUpdates.incrementAndGet();
-     byte rsGroupId = broker.getRsGroupId();
     if (update.isAssured()
-        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE
-        && rsGroupId == groupId)
+        && broker.getRsGroupId() == getGroupId()
+        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
     {
       assuredSrReceivedUpdates.incrementAndGet();
     }
@@ -949,7 +892,7 @@
         requested servers. Log problem
         */
         logError(NOTE_DS_RECEIVED_ACK_ERROR.get(
-            getBaseDNString(), Integer.toString(serverID),
+            getBaseDNString(), Integer.toString(getServerId()),
             update.toString(), ack.errorsToString()));
 
         List<Integer> failedServers = ack.getFailedServers();
@@ -1048,7 +991,7 @@
      */
     public ExportThread(int serverIdToInitialize, int initWindow)
     {
-      super("Export thread from serverId=" + serverID + " to serverId="
+      super("Export thread from serverId=" + getServerId() + " to serverId="
           + serverIdToInitialize);
       this.serverIdToInitialize = serverIdToInitialize;
       this.initWindow = initWindow;
@@ -1379,7 +1322,7 @@
   public void initializeRemote(int target, Task initTask)
   throws DirectoryException
   {
-    initializeRemote(target, this.serverID, initTask, this.initWindow);
+    initializeRemote(target, getServerId(), initTask, getInitWindow());
   }
 
   /**
@@ -1418,7 +1361,7 @@
     if (serverToInitialize == RoutableMsg.ALL_SERVERS)
     {
       logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
-          countEntries(), getBaseDNString(), serverID));
+          countEntries(), getBaseDNString(), getServerId()));
 
       for (DSInfo dsi : getReplicasList())
       {
@@ -1436,8 +1379,8 @@
     }
     else
     {
-      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
-          countEntries(), getBaseDNString(), serverID, serverToInitialize));
+      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
+          getBaseDNString(), getServerId(), serverToInitialize));
 
       ieContext.startList.add(serverToInitialize);
 
@@ -1471,7 +1414,7 @@
 
         // Send start message to the peer
         InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
-            getBaseDN(), serverID, serverToInitialize,
+            getBaseDN(), getServerId(), serverToInitialize,
             serverRunningTheTask, ieContext.entryCount, initWindow);
 
         broker.publish(initTargetMsg);
@@ -1492,8 +1435,8 @@
         exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
 
         // Notify the peer of the success
-        DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
-        broker.publish(doneMsg);
+        broker.publish(
+            new DoneMsg(getServerId(), initTargetMsg.getDestination()));
       }
       catch(DirectoryException exportException)
       {
@@ -1595,12 +1538,12 @@
     if (serverToInitialize == RoutableMsg.ALL_SERVERS)
     {
       logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get(
-          getBaseDNString(), serverID, cause));
+          getBaseDNString(), getServerId(), cause));
     }
     else
     {
       logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
-          getBaseDNString(), serverID, serverToInitialize, cause));
+          getBaseDNString(), getServerId(), serverToInitialize, cause));
     }
 
 
@@ -1894,10 +1837,8 @@
             // send the ack of flow control mgmt
             if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
             {
-              InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
-                  this.serverID,
-                  entryMsg.getSenderID(),
-                  ieContext.msgCnt);
+              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
+                  getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
               broker.publish(amsg, false);
               if (debugEnabled())
               {
@@ -1945,7 +1886,7 @@
               Message.raw(Category.SYNC, Severity.NOTICE,
                   ERR_INIT_EXPORTER_DISCONNECTION.get(
                       getBaseDNString(),
-                      Integer.toString(this.serverID),
+                      Integer.toString(getServerId()),
                       Integer.toString(ieContext.importSource)));
             ieContext.setExceptionIfNoneSet(new DirectoryException(
                 ResultCode.OTHER, errMsg));
@@ -2017,7 +1958,7 @@
 
     // build the message
     EntryMsg entryMessage = new EntryMsg(
-        serverID,ieContext.getExportTarget(), lDIFEntry, pos, length,
+        getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
         ++ieContext.msgCnt);
 
     // Waiting the slowest loop
@@ -2219,7 +2160,7 @@
       ieContext.initializeTask = initTask;
       ieContext.attemptCnt = 0;
       ieContext.initReqMsgSent = new InitializeRequestMsg(
-          getBaseDN(), serverID, source, this.initWindow);
+          getBaseDN(), getServerId(), source, getInitWindow());
 
       // Publish Init request msg
       broker.publish(ieContext.initReqMsgSent);
@@ -2281,14 +2222,14 @@
     try
     {
       // Log starting
-      logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
-          getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID));
+      logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(getBaseDNString(),
+          initTargetMsgReceived.getSenderID(), getServerId()));
 
       // Go into full update status
       setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
 
       // Acquire an import context if no already done (and initialize).
-      if (initTargetMsgReceived.getInitiatorID() != this.serverID)
+      if (initTargetMsgReceived.getInitiatorID() != getServerId())
       {
         /*
         The initTargetMsgReceived is for an import initiated by the remote
@@ -2418,7 +2359,8 @@
       finally
       {
         Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
-            getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID,
+            getBaseDNString(), initTargetMsgReceived.getSenderID(),
+            getServerId(),
             (ieContext.getException() == null ? ""
                 : ieContext.getException().getLocalizedMessage()));
         logError(msg);
@@ -2458,7 +2400,7 @@
     if (newStatus == ServerStatus.INVALID_STATUS)
     {
       logError(ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(),
-          Integer.toString(serverID), status.toString(), event.toString()));
+          String.valueOf(getServerId()), status.toString(), event.toString()));
       return;
     }
 
@@ -2472,13 +2414,11 @@
         resetMonitoringCounters();
       }
 
-      // Store new status
       status = newStatus;
-
       if (debugEnabled())
       {
-        TRACER.debugInfo("Replication domain " + baseDN + " new status is: "
-            + status);
+        TRACER.debugInfo("Replication domain " + getBaseDN()
+            + " new status is: " + status);
       }
 
       // Perform whatever actions are needed to apply properties for being
@@ -2560,10 +2500,8 @@
     // check that at least one ReplicationServer did change its generation-id
     checkGenerationID(-1);
 
-    // Reconnect to the Replication Server so that it adopt our
-    // GenerationID.
-    disableService();
-    enableService();
+    // Reconnect to the Replication Server so that it adopts our GenerationID.
+    restartService();
 
     // wait for the domain to reconnect.
     int count = 0;
@@ -2597,8 +2535,8 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
-          + " resetGenerationId " + generationIdNewValue);
+      TRACER.debugInfo("Server id " + getServerId() + " and domain "
+          + getBaseDN() + " resetGenerationId " + generationIdNewValue);
     }
 
     ResetGenerationIdMsg genIdMessage =
@@ -2607,7 +2545,7 @@
     if (!isConnected())
     {
       Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
-          Integer.toString(serverID),
+          Integer.toString(getServerId()),
           Long.toString(genIdMessage.getGenerationId()));
       throw new DirectoryException(ResultCode.OTHER, message);
     }
@@ -3110,33 +3048,19 @@
   }
 
   /**
-   * Definitively stops the Replication Service.
-   */
-  public void stopDomain()
-  {
-    disableService();
-    domains.remove(baseDN);
-  }
-
-  /**
    * Change some ReplicationDomain parameters.
    *
    * @param config
    *          The new configuration that this domain should now use.
    */
-  public void changeConfig(ReplicationDomainCfg config)
+  protected void changeConfig(ReplicationDomainCfg config)
   {
-    this.groupId = (byte) config.getGroupId();
-
     if (broker != null && broker.changeConfig(config))
     {
-      disableService();
-      enableService();
+      restartService();
     }
   }
 
-
-
   /**
    * Applies a configuration change to the attributes which should be be
    * included in the ECL.
@@ -3149,15 +3073,19 @@
   public void changeConfig(Set<String> includeAttributes,
       Set<String> includeAttributesForDeletes)
   {
-    if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes)
-        && broker != null)
+    final boolean attrsModified = setEclIncludes(
+        getServerId(), includeAttributes, includeAttributesForDeletes);
+    if (attrsModified && broker != null)
     {
-      disableService();
-      enableService();
+      restartService();
     }
   }
 
-
+  private void restartService()
+  {
+    disableService();
+    enableService();
+  }
 
   /**
    * This method should trigger an export of the replicated data.
@@ -3236,15 +3164,13 @@
     Send an ack if it was requested and the group id is the same of the RS
     one. Only Safe Read mode makes sense in DS for returning an ack.
     */
-    byte rsGroupId = broker.getRsGroupId();
     // Assured feature is supported starting from replication protocol V2
     if (msg.isAssured()
       && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
     {
-      AssuredMode msgAssuredMode = msg.getAssuredMode();
-      if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
+      if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
       {
-        if (rsGroupId == groupId)
+        if (broker.getRsGroupId() == getGroupId())
         {
           // Send the ack
           AckMsg ackMsg = new AckMsg(msg.getCSN());
@@ -3255,7 +3181,7 @@
             ackMsg.setHasReplayError(true);
             //   -> replay error occurred in our server
             List<Integer> idList = new ArrayList<Integer>();
-            idList.add(serverID);
+            idList.add(getServerId());
             ackMsg.setFailedServers(idList);
           }
           broker.publish(ackMsg);
@@ -3269,10 +3195,11 @@
           }
         }
       }
-      else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+      else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE)
       {
-        logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID),
-            msgAssuredMode.toString(), getBaseDNString(), msg.toString()));
+        logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(String.valueOf(getServerId()),
+            msg.getAssuredMode().toString(), getBaseDNString(),
+            msg.toString()));
       }
         // Nothing to do in Assured safe data mode, only RS ack updates.
     }
@@ -3303,23 +3230,22 @@
    */
   protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
   {
-    byte rsGroupId = broker.getRsGroupId();
     /*
      * If assured configured, set message accordingly to request an ack in the
      * right assured mode.
-     * No ack requested for a RS with a different group id. Assured
-     * replication supported for the same locality, i.e: a topology working in
-     * the same
-     * geographical location). If we are connected to a RS which is not in our
-     * locality, no need to ask for an ack.
+     * No ack requested for a RS with a different group id.
+     * Assured replication supported for the same locality,
+     * i.e: a topology working in the same geographical location).
+     * If we are connected to a RS which is not in our locality,
+     * no need to ask for an ack.
      */
-    if (assured && rsGroupId == groupId)
+    if (needsAck())
     {
       msg.setAssured(true);
-      msg.setAssuredMode(assuredMode);
-      if (assuredMode == AssuredMode.SAFE_DATA_MODE)
+      msg.setAssuredMode(getAssuredMode());
+      if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)
       {
-        msg.setSafeDataLevel(assuredSdLevel);
+        msg.setSafeDataLevel(getAssuredSdLevel());
       }
 
       // Add the assured message to the list of update that are waiting for acks
@@ -3327,6 +3253,11 @@
     }
   }
 
+  private boolean needsAck()
+  {
+    return isAssured() && broker.getRsGroupId() == getGroupId();
+  }
+
   /**
    * Wait for the processing of an assured message after it has been sent, if
    * assured replication is configured, otherwise, do nothing.
@@ -3340,14 +3271,10 @@
   protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
     throws TimeoutException
   {
-    byte rsGroupId = broker.getRsGroupId();
-
-    // If assured mode configured, wait for acknowledgment for the just sent
-    // message
-    if (assured && rsGroupId == groupId)
+    if (needsAck())
     {
       // Increment assured replication monitoring counters
-      switch (assuredMode)
+      switch (getAssuredMode())
       {
         case SAFE_READ_MODE:
           assuredSrSentUpdates.incrementAndGet();
@@ -3383,12 +3310,12 @@
           if (debugEnabled())
           {
             TRACER.debugInfo("waitForAck method interrupted for replication " +
-              "baseDN: " + baseDN);
+              "baseDN: " + getBaseDN());
           }
           break;
         }
         // Timeout ?
-        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
+        if ((System.currentTimeMillis() - startTime) >= getAssuredTimeout())
         {
           /*
           Timeout occurred, be sure that ack is not being received and if so,
@@ -3424,8 +3351,8 @@
           }
 
           throw new TimeoutException("No ack received for message csn: " + csn
-              + " and replication domain: " + baseDN + " after "
-              + assuredTimeout + " ms.");
+              + " and replication domain: " + getBaseDN() + " after "
+              + getAssuredTimeout() + " ms.");
         }
       }
     }
@@ -3481,7 +3408,7 @@
     {
       // This exception may only be raised if assured replication is enabled
       logError(NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
-          Long.toString(assuredTimeout), update.toString()));
+          Long.toString(getAssuredTimeout()), update.toString()));
     }
   }
 
@@ -3493,11 +3420,25 @@
    *
    * @return The GenerationID.
    */
-  public abstract long getGenerationID();
+  public long getGenerationID()
+  {
+    return generationId;
+  }
 
   /**
-   * Subclasses should use this method to add additional monitoring
-   * information in the ReplicationDomain.
+   * Sets the generationId for this replication domain.
+   *
+   * @param generationId
+   *          the generationId to set
+   */
+  public void setGenerationID(long generationId)
+  {
+    this.generationId = generationId;
+  }
+
+  /**
+   * Subclasses should use this method to add additional monitoring information
+   * in the ReplicationDomain.
    *
    * @return Additional monitoring attributes that will be added in the
    *         ReplicationDomain monitoring entry.
@@ -3711,13 +3652,69 @@
    */
   public CSN getLastLocalChange()
   {
-    return state.getCSN(serverID);
+    return state.getCSN(getServerId());
+  }
+
+  /**
+   * Gets and stores the assured replication configuration parameters. Returns a
+   * boolean indicating if the passed configuration has changed compared to
+   * previous values and the changes require a reconnection.
+   *
+   * @param config
+   *          The configuration object
+   * @param allowReconnection
+   *          Tells if one must reconnect if significant changes occurred
+   */
+  protected void readAssuredConfig(ReplicationDomainCfg config,
+      boolean allowReconnection)
+  {
+    // Disconnect if required: changing configuration values before
+    // disconnection would make assured replication used immediately and
+    // disconnection could cause some timeouts error.
+    if (needReconnection(config) && allowReconnection)
+    {
+      disableService();
+
+      assuredConfig = config;
+
+      enableService();
+    }
+  }
+
+  private boolean needReconnection(ReplicationDomainCfg cfg)
+  {
+    final AssuredMode assuredMode = getAssuredMode();
+    switch (cfg.getAssuredType())
+    {
+    case NOT_ASSURED:
+      if (isAssured())
+      {
+        return true;
+      }
+      break;
+    case SAFE_DATA:
+      if (!isAssured() || assuredMode == SAFE_READ_MODE)
+      {
+        return true;
+      }
+      break;
+    case SAFE_READ:
+      if (!isAssured() || assuredMode == SAFE_DATA_MODE)
+      {
+        return true;
+      }
+      break;
+    }
+
+    return isAssured()
+        && assuredMode == SAFE_DATA_MODE
+        && cfg.getAssuredSdLevel() != getAssuredSdLevel();
   }
 
   /** {@inheritDoc} */
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID;
+    return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId();
   }
 }

--
Gitblit v1.10.0