From a719d21181a3b1c98c16bc677e892cf67fed4e7f Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 18 Dec 2008 17:13:46 +0000
Subject: [PATCH] Assured Replication: - support for dynamic reconfiguration (domain and replication server) - performance improvement in domain (less lock time between sending threads) - performance improvement in server (safe data ack before DB push) - more monitoring info for safe read mode Misc: - support for dynamic domain group id reconfiguration

---
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java |  356 ++++++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 242 insertions(+), 114 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index fa95572..7c4df2b 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -215,7 +215,7 @@
   // Safe Data level (used when assuredMode is SAFE_DATA)
   private byte assuredSdLevel = (byte)1;
   // The timeout in ms that should be used, when waiting for assured acks
-  private long assuredTimeout = 1000;
+  private long assuredTimeout = 2000;
 
   // Group id
   private byte groupId = (byte)1;
@@ -257,6 +257,14 @@
   // format: <server id>:<number of failed updates>
   private Map<Short,Integer> assuredSrServerNotAcknowledgedUpdates =
     new HashMap<Short,Integer>();
+  // Number of updates received in Assured Mode, Safe Read request
+  private AtomicInteger receivedAssuredSrUpdates = new AtomicInteger(0);
+  // Number of updates received in Assured Mode, Safe Read request that we have
+  // acked without errors
+  private AtomicInteger receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
+  // Number of updates received in Assured Mode, Safe Read request that we have
+  // acked with errors
+  private AtomicInteger receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
   // Number of updates sent in Assured Mode, Safe Data
   private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
   // Number of updates sent in Assured Mode, Safe Data, that have been
@@ -765,86 +773,12 @@
     }
 
     numRcvdUpdates.incrementAndGet();
-    return update;
-  }
-
-  /**
-   * Wait for the processing of an assured message.
-   *
-   * @param msg The UpdateMsg for which we are waiting for an ack.
-   * @throws TimeoutException When the configured timeout occurs waiting for the
-   * ack.
-   */
-  private void waitForAck(UpdateMsg msg) throws TimeoutException
-  {
-    // Wait for the ack to be received, timing out if necessary
-    long startTime = System.currentTimeMillis();
-    synchronized (msg)
+    if (update.isAssured() && (update.getAssuredMode() ==
+      AssuredMode.SAFE_READ_MODE))
     {
-      ChangeNumber cn = msg.getChangeNumber();
-      while (waitingAckMsgs.containsKey(cn))
-      {
-        try
-        {
-          // WARNING: this timeout may be difficult to optimize: too low, it
-          // may use too much CPU, too high, it may penalize performance...
-          msg.wait(10);
-        } catch (InterruptedException e)
-        {
-          if (debugEnabled())
-          {
-            TRACER.debugInfo("waitForAck method interrupted for replication " +
-              "serviceID: " + serviceID);
-          }
-          break;
-        }
-        // Timeout ?
-        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
-        {
-          // Timeout occured, be sure that ack is not being received and if so,
-          // remove the update from the wait list, log the timeout error and
-          // also update assured monitoring counters
-          UpdateMsg update;
-          synchronized (waitingAckMsgs)
-          {
-            update = waitingAckMsgs.remove(cn);
-          }
-
-          if (update != null)
-          {
-            // No luck, this is a real timeout
-            // Increment assured replication monitoring counters
-            switch (msg.getAssuredMode())
-            {
-              case SAFE_READ_MODE:
-                assuredSrNotAcknowledgedUpdates.incrementAndGet();
-                assuredSrTimeoutUpdates.incrementAndGet();
-                // Increment number of errors for our RS
-                updateAssuredErrorsByServer(
-                  assuredSrServerNotAcknowledgedUpdates,
-                  broker.getRsServerId());
-                break;
-              case SAFE_DATA_MODE:
-                assuredSdTimeoutUpdates.incrementAndGet();
-                // Increment number of errors for our RS
-                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
-                  broker.getRsServerId());
-                break;
-              default:
-              // Should not happen
-            }
-
-            throw new TimeoutException("No ack received for message cn: " + cn +
-              " and replication servceID: " + serviceID + " after " +
-              assuredTimeout + " ms.");
-          } else
-          {
-            // Ack received just before timeout limit: we can exit
-            break;
-          }
-        }
-      }
+      receivedAssuredSrUpdates.incrementAndGet();
     }
+    return update;
   }
 
   /**
@@ -2002,7 +1936,50 @@
    */
   public Map<Short, Integer> getAssuredSrServerNotAcknowledgedUpdates()
   {
-    return assuredSrServerNotAcknowledgedUpdates;
+    // Clone a snapshot with synchronized section to have a consistent view in
+    // monitoring
+    Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
+    synchronized(assuredSrServerNotAcknowledgedUpdates)
+    {
+      Set<Short> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
+      for (Short serverId : keySet)
+      {
+        Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
+        snapshot.put(serverId, i);
+      }
+    }
+    return snapshot;
+  }
+
+  /**
+   * Gets the number of updates received in assured safe read mode request.
+   * @return The number of updates received in assured safe read mode request.
+   */
+  public int getReceivedAssuredSrUpdates()
+  {
+    return receivedAssuredSrUpdates.get();
+  }
+
+  /**
+   * Gets the number of updates received in assured safe read mode that we acked
+   * without error (no replay error).
+   * @return The number of updates received in assured safe read mode that we
+   * acked without error (no replay error).
+   */
+  public int getReceivedAssuredSrUpdatesAcked()
+  {
+    return this.receivedAssuredSrUpdatesAcked.get();
+  }
+
+  /**
+   * Gets the number of updates received in assured safe read mode that we did
+   * not ack due to error (replay error).
+   * @return The number of updates received in assured safe read mode that we
+   * did not ack due to error (replay error).
+   */
+  public int getReceivedAssuredSrUpdatesNotAcked()
+  {
+    return this.receivedAssuredSrUpdatesNotAcked.get();
   }
 
   /**
@@ -2044,7 +2021,19 @@
    */
   public Map<Short, Integer> getAssuredSdServerTimeoutUpdates()
   {
-    return assuredSdServerTimeoutUpdates;
+    // Clone a snapshot with synchronized section to have a consistent view in
+    // monitoring
+    Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
+    synchronized(assuredSdServerTimeoutUpdates)
+    {
+      Set<Short> keySet = assuredSdServerTimeoutUpdates.keySet();
+      for (Short serverId : keySet)
+      {
+        Integer i = assuredSdServerTimeoutUpdates.get(serverId);
+        snapshot.put(serverId, i);
+      }
+    }
+    return snapshot;
   }
 
   /**
@@ -2068,6 +2057,9 @@
     assuredSrWrongStatusUpdates = new AtomicInteger(0);
     assuredSrReplayErrorUpdates = new AtomicInteger(0);
     assuredSrServerNotAcknowledgedUpdates = new HashMap<Short,Integer>();
+    receivedAssuredSrUpdates = new AtomicInteger(0);
+    receivedAssuredSrUpdatesAcked = new AtomicInteger(0);
+    receivedAssuredSrUpdatesNotAcked = new AtomicInteger(0);
     assuredSdSentUpdates = new AtomicInteger(0);
     assuredSdAcknowledgedUpdates = new AtomicInteger(0);
     assuredSdTimeoutUpdates = new AtomicInteger(0);
@@ -2199,16 +2191,20 @@
    * @param windowSize         The window size that this domain should use.
    * @param heartbeatInterval  The heartbeatInterval that this domain should
    *                           use.
+   * @param groupId            The new group id to use
    */
   public void changeConfig(
       Collection<String> replicationServers,
       int windowSize,
-      long heartbeatInterval)
+      long heartbeatInterval,
+      byte groupId)
   {
+    this.groupId = groupId;
+
     if (broker != null)
     {
       if (broker.changeConfig(
-          replicationServers, windowSize, heartbeatInterval))
+          replicationServers, windowSize, heartbeatInterval, groupId))
       {
         disableService();
         enableService();
@@ -2319,6 +2315,13 @@
               ackMsg.setFailedServers(idList);
             }
             broker.publish(ackMsg);
+            if (replayErrorMsg != null)
+            {
+              receivedAssuredSrUpdatesNotAcked.incrementAndGet();
+            } else
+            {
+              receivedAssuredSrUpdatesAcked.incrementAndGet();
+            }
           }
         } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
         {
@@ -2338,28 +2341,39 @@
   }
 
   /**
-   * Publish an {@link UpdateMsg} to the Replication Service.
-   * <p>
-   * The Replication Service will handle the delivery of this {@link UpdateMsg}
-   * to all the participants of this Replication Domain.
-   * These members will be receive this {@link UpdateMsg} through a call
-   * of the {@link #processUpdate(UpdateMsg)} message.
+   * Prepare a message if it is to be sent in assured mode.
+   * If the assured mode is enabled, this method should be called before
+   * publish(UpdateMsg msg) method. This will configure the update accordingly
+   * before it is sent and will prepare the mechanism that will block until the
+   * matching ack is received. To wait for the ack after publish call, use
+   * the waitForAckIfAssuredEnabled() method.
+   * The expected typical usage in a service inheriting from this class is
+   * the following sequence:
+   * UpdateMsg msg = xxx;
+   * prepareWaitForAckIfAssuredEnabled(msg);
+   * publish(msg);
+   * waitForAckIfAssuredEnabled(msg);
    *
-   * @param msg The UpdateMsg that should be pushed.
-   * @throws TimeoutException When assured replication is enabled and the
-   * configured timeout occurs when blocked waiting for the ack.
+   * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have
+   * no effect if assured replication is disabled.
+   * Note: this mechanism should not be used if using publish(byte[] msg)
+   * version as usage of these methods is already hidden inside.
+   *
+   * @param msg The update message to be sent soon.
    */
-  public void publish(UpdateMsg msg) throws TimeoutException
+  protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
   {
     byte rsGroupId = broker.getRsGroupId();
-
-    // If assured configured, set message accordingly to request an ack in the
-    // right assured mode.
-    // No ack requested for a RS with a different group id. Assured replication
-    // suported for the same locality, i.e: a topology working in the same
-    // geographical location). If we are connected to a RS which is not in our
-    // locality, no need to ask for an ack.
-    if ( assured && ( rsGroupId == groupId ) )
+    /*
+     * If assured configured, set message accordingly to request an ack in the
+     * right assured mode.
+     * No ack requested for a RS with a different group id. Assured
+     * replication suported for the same locality, i.e: a topology working in
+     * the same
+     * geographical location). If we are connected to a RS which is not in our
+     * locality, no need to ask for an ack.
+     */
+    if (assured && (rsGroupId == groupId))
     {
       msg.setAssured(true);
       msg.setAssuredMode(assuredMode);
@@ -2373,18 +2387,29 @@
         waitingAckMsgs.put(msg.getChangeNumber(), msg);
       }
     }
+  }
 
-    // Publish the update
-    broker.publish(msg);
-    state.update(msg.getChangeNumber());
-    numSentUpdates.incrementAndGet();
+  /**
+   * Wait for the processing of an assured message after it has been sent, if
+   * assured replication is configured, otherwise, do nothing.
+   * The prepareWaitForAckIfAssuredEnabled method should have been called
+   * before, see its comment for the full picture.
+   *
+   * @param msg The UpdateMsg for which we are waiting for an ack.
+   * @throws TimeoutException When the configured timeout occurs waiting for the
+   * ack.
+   */
+  protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
+    throws TimeoutException
+  {
+    byte rsGroupId = broker.getRsGroupId();
 
     // If assured mode configured, wait for acknowledgement for the just sent
     // message
-    if ( assured && ( rsGroupId == groupId ) )
+    if (assured && (rsGroupId == groupId))
     {
       // Increment assured replication monitoring counters
-      switch(assuredMode)
+      switch (assuredMode)
       {
         case SAFE_READ_MODE:
           assuredSrSentUpdates.incrementAndGet();
@@ -2393,12 +2418,100 @@
           assuredSdSentUpdates.incrementAndGet();
           break;
         default:
-          // Should not happen
+        // Should not happen
       }
-
-      // Now wait for ack matching the sent assured update
-      waitForAck(msg);
+    } else
+    {
+      // Not assured or bad group id, return immediately
+      return;
     }
+
+    // Wait for the ack to be received, timing out if necessary
+    long startTime = System.currentTimeMillis();
+    synchronized (msg)
+    {
+      ChangeNumber cn = msg.getChangeNumber();
+      while (waitingAckMsgs.containsKey(cn))
+      {
+        try
+        {
+          // WARNING: this timeout may be difficult to optimize: too low, it
+          // may use too much CPU, too high, it may penalize performance...
+          msg.wait(10);
+        } catch (InterruptedException e)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugInfo("waitForAck method interrupted for replication " +
+              "serviceID: " + serviceID);
+          }
+          break;
+        }
+        // Timeout ?
+        if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
+        {
+          // Timeout occured, be sure that ack is not being received and if so,
+          // remove the update from the wait list, log the timeout error and
+          // also update assured monitoring counters
+          UpdateMsg update;
+          synchronized (waitingAckMsgs)
+          {
+            update = waitingAckMsgs.remove(cn);
+          }
+
+          if (update != null)
+          {
+            // No luck, this is a real timeout
+            // Increment assured replication monitoring counters
+            switch (msg.getAssuredMode())
+            {
+              case SAFE_READ_MODE:
+                assuredSrNotAcknowledgedUpdates.incrementAndGet();
+                assuredSrTimeoutUpdates.incrementAndGet();
+                // Increment number of errors for our RS
+                updateAssuredErrorsByServer(
+                  assuredSrServerNotAcknowledgedUpdates,
+                  broker.getRsServerId());
+                break;
+              case SAFE_DATA_MODE:
+                assuredSdTimeoutUpdates.incrementAndGet();
+                // Increment number of errors for our RS
+                updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
+                  broker.getRsServerId());
+                break;
+              default:
+              // Should not happen
+            }
+
+            throw new TimeoutException("No ack received for message cn: " + cn +
+              " and replication servceID: " + serviceID + " after " +
+              assuredTimeout + " ms.");
+          } else
+          {
+            // Ack received just before timeout limit: we can exit
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Publish an {@link UpdateMsg} to the Replication Service.
+   * <p>
+   * The Replication Service will handle the delivery of this {@link UpdateMsg}
+   * to all the participants of this Replication Domain.
+   * These members will be receive this {@link UpdateMsg} through a call
+   * of the {@link #processUpdate(UpdateMsg)} message.
+   *
+   * @param msg The UpdateMsg that should be pushed.
+   */
+  public void publish(UpdateMsg msg)
+  {
+    // Publish the update
+    broker.publish(msg);
+    state.update(msg.getChangeNumber());
+    numSentUpdates.incrementAndGet();
   }
 
   /**
@@ -2410,12 +2523,27 @@
   public void publish(byte[] msg)
   {
     UpdateMsg update = new UpdateMsg(generator.newChangeNumber(), msg);
+
+    // If assured replication is configured, this will prepare blocking
+    // mechanism. If assured replication is disabled, this returns
+    // immediately
+    prepareWaitForAckIfAssuredEnabled(update);
+
+    publish(update);
+
     try
     {
-      publish(update);
-    } catch (TimeoutException e)
+      // If assured replication is enabled, this will wait for the matching
+      // ack or time out. If assured replication is disabled, this returns
+      // immediately
+      waitForAckIfAssuredEnabled(update);
+    } catch (TimeoutException ex)
     {
-      // Should never happen as assured mode not requested
+      // This exception may only be raised if assured replication is
+      // enabled
+      Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
+        assuredTimeout), msg.toString());
+      logError(errorMsg);
     }
   }
 

--
Gitblit v1.10.0