From a719d21181a3b1c98c16bc677e892cf67fed4e7f Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 18 Dec 2008 17:13:46 +0000
Subject: [PATCH] Assured Replication: - support for dynamic reconfiguration (domain and replication server) - performance improvement in domain (less lock time between sending threads) - performance improvement in server (safe data ack before DB push) - more monitoring info for safe read mode Misc: - support for dynamic domain group id reconfiguration
---
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 122 +++++++++++++++++++++++++++++++++-------
1 files changed, 101 insertions(+), 21 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 5f1372a..076d5d6 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -59,6 +59,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
@@ -318,26 +319,8 @@
configDn = configuration.dn();
this.updateToReplayQueue = updateToReplayQueue;
- /*
- * Fill assured configuration properties
- */
- AssuredType assuredType = configuration.getAssuredType();
- switch (assuredType)
- {
- case NOT_ASSURED:
- setAssured(false);
- break;
- case SAFE_DATA:
- setAssured(true);
- setAssuredMode(AssuredMode.SAFE_DATA_MODE);
- break;
- case SAFE_READ:
- setAssured(true);
- setAssuredMode(AssuredMode.SAFE_READ_MODE);
- break;
- }
- setAssuredSdLevel((byte)configuration.getAssuredSdLevel());
- setAssuredTimeout(configuration.getAssuredTimeout());
+ // Get assured configuration
+ readAssuredConfig(configuration);
setGroupId((byte)configuration.getGroupId());
setURLs(configuration.getReferralsUrl());
@@ -405,6 +388,72 @@
}
/**
+ * Gets and stores the assured replication configuration parameters. Returns
+ * a boolean indicating if the passed configuration has changed compared to
+ * previous values and the changes require a reconnection.
+ * @param configuration The configuration object
+ * @return True if the assured configuration changed and we need to reconnect
+ */
+ private boolean readAssuredConfig(ReplicationDomainCfg configuration)
+ {
+ boolean needReconnect = false;
+
+ byte newSdLevel = (byte) configuration.getAssuredSdLevel();
+ if ((isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)) &&
+ (newSdLevel != getAssuredSdLevel()))
+ {
+ needReconnect = true;
+ }
+
+ AssuredType newAssuredType = configuration.getAssuredType();
+ switch (newAssuredType)
+ {
+ case NOT_ASSURED:
+ if (isAssured())
+ {
+ needReconnect = true;
+ }
+ break;
+ case SAFE_DATA:
+ if (!isAssured() ||
+ (isAssured() && (getAssuredMode() == AssuredMode.SAFE_READ_MODE)))
+ {
+ needReconnect = true;
+ }
+ break;
+ case SAFE_READ:
+ if (!isAssured() ||
+ (isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)))
+ {
+ needReconnect = true;
+ }
+ break;
+ }
+
+ switch (newAssuredType)
+ {
+ case NOT_ASSURED:
+ setAssured(false);
+ break;
+ case SAFE_DATA:
+ setAssured(true);
+ setAssuredMode(AssuredMode.SAFE_DATA_MODE);
+ break;
+ case SAFE_READ:
+ setAssured(true);
+ setAssuredMode(AssuredMode.SAFE_READ_MODE);
+ break;
+ }
+ setAssuredSdLevel(newSdLevel);
+
+ // Changing timeout does not require restart as it is not sent in
+ // StartSessionMsg
+ setAssuredTimeout(configuration.getAssuredTimeout());
+
+ return needReconnect;
+ }
+
+ /**
* Returns the base DN of this ReplicationDomain.
*
* @return The base DN of this ReplicationDomain
@@ -836,7 +885,27 @@
if (!op.isSynchronizationOperation())
{
+ // If assured replication is configured, this will prepare blocking
+ // mechanism. If assured replication is disabled, this returns
+ // immediately
+ prepareWaitForAckIfAssuredEnabled(msg);
+
pendingChanges.pushCommittedChanges();
+
+ // If assured replication is enabled, this will wait for the matching
+ // ack or time out. If assured replication is disabled, this returns
+ // immediately
+ try
+ {
+ waitForAckIfAssuredEnabled(msg);
+ } catch (TimeoutException ex)
+ {
+ // This exception may only be raised if assured replication is
+ // enabled
+ Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getServiceID(),
+ Long.toString(getAssuredTimeout()), msg.toString());
+ logError(errorMsg);
+ }
}
}
@@ -2583,7 +2652,18 @@
changeConfig(
configuration.getReplicationServer(),
configuration.getWindowSize(),
- configuration.getHeartbeatInterval());
+ configuration.getHeartbeatInterval(),
+ (byte)configuration.getGroupId());
+
+ // Get assured configuration
+ boolean needReconnect = readAssuredConfig(configuration);
+
+ // Reconnect if required
+ if (needReconnect)
+ {
+ disableService();
+ enableService();
+ }
return new ConfigChangeResult(ResultCode.SUCCESS, false);
}
--
Gitblit v1.10.0