From a719d21181a3b1c98c16bc677e892cf67fed4e7f Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 18 Dec 2008 17:13:46 +0000
Subject: [PATCH] Assured Replication: - support for dynamic reconfiguration (domain and replication server) - performance improvement in domain (less lock time between sending threads) - performance improvement in server (safe data ack before DB push) - more monitoring info for safe read mode Misc: - support for dynamic domain group id reconfiguration

---
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java |  122 +++++++++++++++++++++++++++++++++-------
 1 files changed, 101 insertions(+), 21 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 5f1372a..076d5d6 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -59,6 +59,7 @@
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CheckedOutputStream;
 import java.util.zip.DataFormatException;
@@ -318,26 +319,8 @@
     configDn = configuration.dn();
     this.updateToReplayQueue = updateToReplayQueue;
 
-    /*
-     * Fill assured configuration properties
-     */
-    AssuredType assuredType = configuration.getAssuredType();
-    switch (assuredType)
-    {
-      case NOT_ASSURED:
-        setAssured(false);
-        break;
-      case SAFE_DATA:
-        setAssured(true);
-        setAssuredMode(AssuredMode.SAFE_DATA_MODE);
-        break;
-      case SAFE_READ:
-        setAssured(true);
-        setAssuredMode(AssuredMode.SAFE_READ_MODE);
-        break;
-    }
-    setAssuredSdLevel((byte)configuration.getAssuredSdLevel());
-    setAssuredTimeout(configuration.getAssuredTimeout());
+    // Get assured configuration
+    readAssuredConfig(configuration);
 
     setGroupId((byte)configuration.getGroupId());
     setURLs(configuration.getReferralsUrl());
@@ -405,6 +388,72 @@
   }
 
   /**
+   * Gets and stores the assured replication configuration parameters. Returns
+   * a boolean indicating if the passed configuration has changed compared to
+   * previous values and the changes require a reconnection.
+   * @param configuration The configuration object
+   * @return True if the assured configuration changed and we need to reconnect
+   */
+  private boolean readAssuredConfig(ReplicationDomainCfg configuration)
+  {
+    boolean needReconnect = false;
+
+    byte newSdLevel = (byte) configuration.getAssuredSdLevel();
+    if ((isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)) &&
+      (newSdLevel != getAssuredSdLevel()))
+    {
+      needReconnect = true;
+    }
+
+    AssuredType newAssuredType = configuration.getAssuredType();
+    switch (newAssuredType)
+    {
+      case NOT_ASSURED:
+        if (isAssured())
+        {
+          needReconnect = true;
+        }
+        break;
+      case SAFE_DATA:
+        if (!isAssured() ||
+          (isAssured() && (getAssuredMode() == AssuredMode.SAFE_READ_MODE)))
+        {
+          needReconnect = true;
+        }
+        break;
+      case SAFE_READ:
+        if (!isAssured() ||
+          (isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)))
+        {
+          needReconnect = true;
+        }
+        break;
+    }
+
+    switch (newAssuredType)
+    {
+      case NOT_ASSURED:
+        setAssured(false);
+        break;
+      case SAFE_DATA:
+        setAssured(true);
+        setAssuredMode(AssuredMode.SAFE_DATA_MODE);
+        break;
+      case SAFE_READ:
+        setAssured(true);
+        setAssuredMode(AssuredMode.SAFE_READ_MODE);
+        break;
+    }
+    setAssuredSdLevel(newSdLevel);
+
+    // Changing timeout does not require restart as it is not sent in
+    // StartSessionMsg
+    setAssuredTimeout(configuration.getAssuredTimeout());
+
+    return needReconnect;
+  }
+
+  /**
    * Returns the base DN of this ReplicationDomain.
    *
    * @return The base DN of this ReplicationDomain
@@ -836,7 +885,27 @@
 
     if (!op.isSynchronizationOperation())
     {
+      // If assured replication is configured, this will prepare blocking
+      // mechanism. If assured replication is disabled, this returns
+      // immediately
+      prepareWaitForAckIfAssuredEnabled(msg);
+
       pendingChanges.pushCommittedChanges();
+
+      // If assured replication is enabled, this will wait for the matching
+      // ack or time out. If assured replication is disabled, this returns
+      // immediately
+      try
+      {
+        waitForAckIfAssuredEnabled(msg);
+      } catch (TimeoutException ex)
+      {
+        // This exception may only be raised if assured replication is
+        // enabled
+        Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getServiceID(),
+          Long.toString(getAssuredTimeout()), msg.toString());
+        logError(errorMsg);
+      }
     }
   }
 
@@ -2583,7 +2652,18 @@
     changeConfig(
         configuration.getReplicationServer(),
         configuration.getWindowSize(),
-        configuration.getHeartbeatInterval());
+        configuration.getHeartbeatInterval(),
+        (byte)configuration.getGroupId());
+
+    // Get assured configuration
+    boolean needReconnect = readAssuredConfig(configuration);
+
+    // Reconnect if required
+    if (needReconnect)
+    {
+      disableService();
+      enableService();
+    }
 
     return new ConfigChangeResult(ResultCode.SUCCESS, false);
   }

--
Gitblit v1.10.0