From 55065c7531e93a725b02dc619f6c526228e768ce Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 23 Oct 2013 14:19:46 +0000
Subject: [PATCH] LDAPReplicationDomain.java: Replaced instance fields with directly storing and using the ReplicationDomainCfg object.

---
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java |  313 ++++++++++++++++++++++-----------------------------
 1 files changed, 135 insertions(+), 178 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 4cb07d3..18d1730 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -41,6 +41,7 @@
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
+import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.backends.task.Task;
 import org.opends.server.config.ConfigException;
@@ -68,8 +69,7 @@
  *   The startup phase of the ReplicationDomain subclass,
  *   should read the list of replication servers from the configuration,
  *   instantiate a {@link ServerState} then start the publish service
- *   by calling
- *   {@link #startPublishService(Set, int, long, long)}.
+ *   by calling {@link #startPublishService(ReplicationDomainCfg)}.
  *   At this point it can start calling the {@link #publish(UpdateMsg)}
  *   method if needed.
  * <p>
@@ -274,7 +274,7 @@
    * - and each initialized/importer DS that publishes acknowledges each
    *   WINDOW/2 data msg received.
    */
-  protected int initWindow = 100;
+  protected final int initWindow;
 
   /* Status related monitoring fields */
 
@@ -304,8 +304,7 @@
 
   private final Map<Integer, Set<String>> eclIncludesForDeletesByServer =
     new HashMap<Integer, Set<String>>();
-  private Set<String> eclIncludesForDeletesAllServers = Collections
-      .emptySet();
+  private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet();
 
   /**
    * An object used to protect the initialization of the underlying broker
@@ -363,6 +362,7 @@
   {
     this.baseDN = baseDN;
     this.serverID = serverID;
+    this.initWindow = 100;
     this.state = serverState;
     this.generator = new CSNGenerator(serverID, state);
 
@@ -1060,7 +1060,7 @@
     public void run()
     {
       if (debugEnabled())
-        TRACER.debugInfo("[IE] starting " + this.getName());
+        TRACER.debugInfo("[IE] starting " + getName());
       try
       {
         initializeRemote(serverIdToInitialize, serverIdToInitialize, null,
@@ -1075,7 +1075,7 @@
       }
 
       if (debugEnabled())
-        TRACER.debugInfo("[IE] ending " + this.getName());
+        TRACER.debugInfo("[IE] ending " + getName());
     }
   }
 
@@ -1313,7 +1313,7 @@
    */
   public int decodeTarget(String targetString) throws DirectoryException
   {
-    if (targetString.equalsIgnoreCase("all"))
+    if ("all".equalsIgnoreCase(targetString))
     {
       return RoutableMsg.ALL_SERVERS;
     }
@@ -1612,7 +1612,7 @@
             "[IE] wait for start dsid " + dsi.getDsId()
             + " " + dsi.getStatus()
             + " " + dsi.getGenerationId()
-            + " " + this.getGenerationID());
+            + " " + getGenerationID());
         if (ieContext.startList.contains(dsi.getDsId()))
         {
           if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
@@ -1711,7 +1711,7 @@
           }
           else
           {
-            if (dsInfo.getGenerationId() == this.getGenerationID())
+            if (dsInfo.getGenerationId() == getGenerationID())
             { // and with the expected generationId
               // We're done with this server
               it.remove();
@@ -1757,8 +1757,7 @@
     {
       // Rejects 2 simultaneous exports
       Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
-      throw new DirectoryException(ResultCode.OTHER,
-          message);
+      throw new DirectoryException(ResultCode.OTHER, message);
     }
 
     ieContext = new IEContext(importInProgress);
@@ -1777,34 +1776,30 @@
    */
   private void processErrorMsg(ErrorMsg errorMsg)
   {
-    if (ieContext != null)
+    //Exporting must not be stopped on the first error, if we run initialize-all
+    if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
     {
-      /*
-        Exporting must not be stopped on the first error, if we
-        run initialize-all.
-      */
-      if (ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
+      // The ErrorMsg is received while we have started an initialization
+      if (ieContext.getException() == null)
       {
-        // The ErrorMsg is received while we have started an initialization
-        if (ieContext.getException() == null)
-          ieContext.setException(new DirectoryException(ResultCode.OTHER,
-              errorMsg.getDetails()));
+        ieContext.setException(
+            new DirectoryException(ResultCode.OTHER, errorMsg.getDetails()));
+      }
 
-        /*
-         * This can happen :
-         * - on the first InitReqMsg sent when source in not known for example
-         * - on the next attempt when source crashed and did not reconnect
-         *   even after the nextInitAttemptDelay
-         * During the import, the ErrorMsg will be received by receiveEntryBytes
-         */
-        if (ieContext.initializeTask instanceof InitializeTask)
-        {
-          // Update the task that initiated the import
-          ((InitializeTask)ieContext.initializeTask).
-          updateTaskCompletionState(ieContext.getException());
+      /*
+       * This can happen :
+       * - on the first InitReqMsg sent when source in not known for example
+       * - on the next attempt when source crashed and did not reconnect
+       *   even after the nextInitAttemptDelay
+       * During the import, the ErrorMsg will be received by receiveEntryBytes
+       */
+      if (ieContext.initializeTask instanceof InitializeTask)
+      {
+        // Update the task that initiated the import
+        ((InitializeTask) ieContext.initializeTask)
+            .updateTaskCompletionState(ieContext.getException());
 
-          releaseIEContext();
-        }
+        releaseIEContext();
       }
     }
   }
@@ -1894,8 +1889,7 @@
         {
           /*
           This is the normal termination of the import
-          No error is stored and the import is ended
-          by returning null
+          No error is stored and the import is ended by returning null
           */
           return null;
         }
@@ -1903,8 +1897,7 @@
         {
           /*
           This is an error termination during the import
-          The error is stored and the import is ended
-          by returning null
+          The error is stored and the import is ended by returning null
           */
           if (ieContext.getException() == null)
           {
@@ -1921,8 +1914,8 @@
         {
           // Other messages received during an import are trashed except
           // the topologyMsg.
-          if ((msg instanceof TopologyMsg) &&
-              (isRemoteDSConnected(ieContext.importSource)==null))
+          if (msg instanceof TopologyMsg
+              && isRemoteDSConnected(ieContext.importSource) == null)
           {
             Message errMsg =
               Message.raw(Category.SYNC, Severity.NOTICE,
@@ -2043,8 +2036,8 @@
         catch(Exception e) { /* do nothing */ }
 
         // process any connection error
-        if ((broker.hasConnectionError())||
-            (broker.getNumLostConnections()!= ieContext.initNumLostConnections))
+        if (broker.hasConnectionError()
+          || broker.getNumLostConnections() != ieContext.initNumLostConnections)
         {
           // publish failed - store the error in the ieContext ...
           DirectoryException de = new DirectoryException(ResultCode.OTHER,
@@ -2485,8 +2478,7 @@
    * @throws DirectoryException When the generation ID of the Replication
    *                            Servers is not the expected value.
    */
-  private void checkGenerationID(long generationID)
-  throws DirectoryException
+  private void checkGenerationID(long generationID) throws DirectoryException
   {
     boolean allSet = true;
 
@@ -2535,7 +2527,7 @@
   public void resetReplicationLog() throws DirectoryException
   {
     // Reset the Generation ID to -1 to clean the ReplicationServers.
-    resetGenerationId((long)-1);
+    resetGenerationId(-1L);
 
     // check that at least one ReplicationServer did change its generation-id
     checkGenerationID(-1);
@@ -2573,43 +2565,35 @@
    * @throws DirectoryException   When an error occurs
    */
   public void resetGenerationId(Long generationIdNewValue)
-  throws DirectoryException
+      throws DirectoryException
   {
     if (debugEnabled())
       TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
           + " resetGenerationId " + generationIdNewValue);
 
-    ResetGenerationIdMsg genIdMessage;
-
-    if (generationIdNewValue == null)
-    {
-      genIdMessage = new ResetGenerationIdMsg(this.getGenerationID());
-    }
-    else
-    {
-      genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
-    }
+    ResetGenerationIdMsg genIdMessage =
+        new ResetGenerationIdMsg(getGenId(generationIdNewValue));
 
     if (!isConnected())
     {
-      ResultCode resultCode = ResultCode.OTHER;
       Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
           Integer.toString(serverID),
           Long.toString(genIdMessage.getGenerationId()));
-      throw new DirectoryException(
-         resultCode, message);
+      throw new DirectoryException(ResultCode.OTHER, message);
     }
     broker.publish(genIdMessage);
 
     // check that at least one ReplicationServer did change its generation-id
-    if (generationIdNewValue == null)
+    checkGenerationID(getGenId(generationIdNewValue));
+  }
+
+  private long getGenId(Long generationIdNewValue)
+  {
+    if (generationIdNewValue != null)
     {
-      checkGenerationID(this.getGenerationID());
+      return generationIdNewValue;
     }
-    else
-    {
-      checkGenerationID(generationIdNewValue);
-    }
+    return getGenerationID();
   }
 
 
@@ -2945,24 +2929,17 @@
    */
 
   /**
-   * Start the publish mechanism of the Replication Service.
-   * After this method has been called, the publish service can be used
-   * by calling the {@link #publish(UpdateMsg)} method.
+   * Start the publish mechanism of the Replication Service. After this method
+   * has been called, the publish service can be used by calling the
+   * {@link #publish(UpdateMsg)} method.
    *
-   * @param replicationServers   The replication servers that should be used.
-   * @param window               The window size of this replication domain.
-   * @param heartbeatInterval    The heartbeatInterval that should be used
-   *                             to check the availability of the replication
-   *                             servers.
-   * @param changetimeHeartbeatInterval  The interval used to send change
-   *                             time heartbeat to the replication server.
-   *
-   * @throws ConfigException     If the DirectoryServer configuration was
-   *                             incorrect.
+   * @param config
+   *          The configuration that should be used.
+   * @throws ConfigException
+   *           If the DirectoryServer configuration was incorrect.
    */
-  public void startPublishService(Set<String> replicationServers, int window,
-      long heartbeatInterval, long changetimeHeartbeatInterval)
-  throws ConfigException
+  public void startPublishService(ReplicationDomainCfg config)
+      throws ConfigException
   {
     synchronized (sessionLock)
     {
@@ -2970,15 +2947,8 @@
       {
         // create the broker object used to publish and receive changes
         broker = new ReplicationBroker(
-            this, state, baseDN,
-            serverID, window,
-            getGenerationID(),
-            heartbeatInterval,
-            new ReplSessionSecurity(),
-            getGroupId(),
-            changetimeHeartbeatInterval);
-
-        broker.start(replicationServers);
+            this, state, config, getGenerationID(), new ReplSessionSecurity());
+        broker.start();
       }
     }
   }
@@ -2990,7 +2960,7 @@
    * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
    * <p>
    * This method must be called once and must be called after the
-   * {@link #startPublishService(Collection, int, long, long)}.
+   * {@link #startPublishService(ReplicationDomainCfg)}.
    */
   public void startListenService()
   {
@@ -3040,12 +3010,12 @@
    * <p>
    * The Replication Service will restart from the point indicated by the
    * {@link ServerState} that was given as a parameter to the
-   * {@link #startPublishService(Collection, int, long, long)}
-   * at startup time.
+   * {@link #startPublishService(ReplicationDomainCfg)} at startup time.
+   * <p>
    * If some data have changed in the repository during the period of time when
    * the Replication Service was disabled, this {@link ServerState} should
-   * therefore be updated by the Replication Domain subclass before calling
-   * this method. This method is not MT safe.
+   * therefore be updated by the Replication Domain subclass before calling this
+   * method. This method is not MT safe.
    */
   public void enableService()
   {
@@ -3071,21 +3041,14 @@
   /**
    * Change some ReplicationDomain parameters.
    *
-   * @param replicationServers  The new set of Replication Servers that this
-   *                           domain should now use.
-   * @param windowSize         The window size that this domain should use.
-   * @param heartbeatInterval  The heartbeatInterval that this domain should
-   *                           use.
-   * @param groupId            The new group id to use
+   * @param config
+   *          The new configuration that this domain should now use.
    */
-  public void changeConfig(Set<String> replicationServers, int windowSize,
-      long heartbeatInterval, byte groupId)
+  public void changeConfig(ReplicationDomainCfg config)
   {
-    this.groupId = groupId;
+    this.groupId = (byte) config.getGroupId();
 
-    if (broker != null
-        && broker.changeConfig(replicationServers, windowSize,
-            heartbeatInterval, groupId))
+    if (broker != null && broker.changeConfig(config))
     {
       disableService();
       enableService();
@@ -3195,47 +3158,46 @@
     one. Only Safe Read mode makes sense in DS for returning an ack.
     */
     byte rsGroupId = broker.getRsGroupId();
-    if (msg.isAssured())
+    // Assured feature is supported starting from replication protocol V2
+    if (msg.isAssured()
+      && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
     {
-      // Assured feature is supported starting from replication protocol V2
-      if (broker.getProtocolVersion() >=
-        ProtocolVersion.REPLICATION_PROTOCOL_V2)
+      AssuredMode msgAssuredMode = msg.getAssuredMode();
+      if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
       {
-        AssuredMode msgAssuredMode = msg.getAssuredMode();
-        if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
+        if (rsGroupId == groupId)
         {
-          if (rsGroupId == groupId)
+          // Send the ack
+          AckMsg ackMsg = new AckMsg(msg.getCSN());
+          if (replayErrorMsg != null)
           {
-            // Send the ack
-            AckMsg ackMsg = new AckMsg(msg.getCSN());
-            if (replayErrorMsg != null)
-            {
-              // Mark the error in the ack
-              //   -> replay error occurred
-              ackMsg.setHasReplayError(true);
-              //   -> replay error occurred in our server
-              List<Integer> idList = new ArrayList<Integer>();
-              idList.add(serverID);
-              ackMsg.setFailedServers(idList);
-            }
-            broker.publish(ackMsg);
-            if (replayErrorMsg != null)
-            {
-              assuredSrReceivedUpdatesNotAcked.incrementAndGet();
-            } else
-            {
-              assuredSrReceivedUpdatesAcked.incrementAndGet();
-            }
+            // Mark the error in the ack
+            //   -> replay error occurred
+            ackMsg.setHasReplayError(true);
+            //   -> replay error occurred in our server
+            List<Integer> idList = new ArrayList<Integer>();
+            idList.add(serverID);
+            ackMsg.setFailedServers(idList);
           }
-        } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
-        {
-          Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get(
-              Integer.toString(serverID), msgAssuredMode.toString(),
-              getBaseDNString(), msg.toString());
-          logError(errorMsg);
+          broker.publish(ackMsg);
+          if (replayErrorMsg != null)
+          {
+            assuredSrReceivedUpdatesNotAcked.incrementAndGet();
+          }
+          else
+          {
+            assuredSrReceivedUpdatesAcked.incrementAndGet();
+          }
         }
-        // Nothing to do in Assured safe data mode, only RS ack updates.
       }
+      else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+      {
+        Message errorMsg =
+            ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID),
+                msgAssuredMode.toString(), getBaseDNString(), msg.toString());
+        logError(errorMsg);
+      }
+        // Nothing to do in Assured safe data mode, only RS ack updates.
     }
 
     incProcessedUpdates();
@@ -3301,7 +3263,7 @@
   {
     byte rsGroupId = broker.getRsGroupId();
 
-    // If assured mode configured, wait for acknowledgement for the just sent
+    // If assured mode configured, wait for acknowledgment for the just sent
     // message
     if (assured && rsGroupId == groupId)
     {
@@ -3354,40 +3316,37 @@
           remove the update from the wait list, log the timeout error and
           also update assured monitoring counters
           */
-          UpdateMsg update = waitingAckMsgs.remove(csn);
-
-          if (update != null)
-          {
-            // No luck, this is a real timeout
-            // Increment assured replication monitoring counters
-            switch (msg.getAssuredMode())
-            {
-              case SAFE_READ_MODE:
-                assuredSrNotAcknowledgedUpdates.incrementAndGet();
-                assuredSrTimeoutUpdates.incrementAndGet();
-                // Increment number of errors for our RS
-                updateAssuredErrorsByServer(
-                  assuredSrServerNotAcknowledgedUpdates,
-                  broker.getRsServerId());
-                break;
-              case SAFE_DATA_MODE:
-                assuredSdTimeoutUpdates.incrementAndGet();
-                // Increment number of errors for our RS
-                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
-                  broker.getRsServerId());
-                break;
-              default:
-              // Should not happen
-            }
-
-            throw new TimeoutException("No ack received for message csn: "
-                + csn + " and replication servceID: " + baseDN + " after "
-                + assuredTimeout + " ms.");
-          } else
+          final UpdateMsg update = waitingAckMsgs.remove(csn);
+          if (update == null)
           {
             // Ack received just before timeout limit: we can exit
             break;
           }
+
+          // No luck, this is a real timeout
+          // Increment assured replication monitoring counters
+          switch (msg.getAssuredMode())
+          {
+          case SAFE_READ_MODE:
+            assuredSrNotAcknowledgedUpdates.incrementAndGet();
+            assuredSrTimeoutUpdates.incrementAndGet();
+            // Increment number of errors for our RS
+            updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates,
+                broker.getRsServerId());
+            break;
+          case SAFE_DATA_MODE:
+            assuredSdTimeoutUpdates.incrementAndGet();
+            // Increment number of errors for our RS
+            updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
+                broker.getRsServerId());
+            break;
+          default:
+            // Should not happen
+          }
+
+          throw new TimeoutException("No ack received for message csn: " + csn
+              + " and replication domain: " + baseDN + " after "
+              + assuredTimeout + " ms.");
         }
       }
     }
@@ -3425,8 +3384,7 @@
       update = new UpdateMsg(generator.newCSN(), msg);
       /*
       If assured replication is configured, this will prepare blocking
-      mechanism. If assured replication is disabled, this returns
-      immediately
+      mechanism. If assured replication is disabled, this returns immediately
       */
       prepareWaitForAckIfAssuredEnabled(update);
 
@@ -3443,8 +3401,7 @@
       waitForAckIfAssuredEnabled(update);
     } catch (TimeoutException ex)
     {
-      // This exception may only be raised if assured replication is
-      // enabled
+      // This exception may only be raised if assured replication is enabled
       Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
           Long.toString(assuredTimeout), update.toString());
       logError(errorMsg);

--
Gitblit v1.10.0