From 18d8dd990ea2072267b32e3200c3291fdd53576a Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 23 Apr 2007 13:38:48 +0000
Subject: [PATCH] This integrates the multi-master synchronization with the new admin framework (issue 1477) and makes possible to dynamically add or remove changelog server and synchronization domains in a running server (issue 639).
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java | 448 +++++++++++--------------------------------------------
1 files changed, 89 insertions(+), 359 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index 743d24d..c597f18 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -39,14 +39,13 @@
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.synchronization.protocol.OperationContext.*;
-import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -55,20 +54,18 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
+import org.opends.server.admin.server.ConfigurationChangeListener;
+import org.opends.server.admin.std.server.MultimasterDomainCfg;
import org.opends.server.api.Backend;
-import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
-import org.opends.server.config.BooleanConfigAttribute;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.DNConfigAttribute;
-import org.opends.server.config.IntegerConfigAttribute;
-import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
@@ -77,7 +74,6 @@
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
-import org.opends.server.messages.MessageHandler;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
@@ -130,7 +126,7 @@
* handle protocol messages from the changelog server.
*/
public class SynchronizationDomain extends DirectoryThread
- implements ConfigurableComponent
+ implements ConfigurationChangeListener<MultimasterDomainCfg>
{
private SynchronizationMonitor monitor;
@@ -254,7 +250,7 @@
private int listenerThreadNumber = 10;
private boolean receiveStatus = true;
- private List<String> changelogServers;
+ private Collection<String> changelogServers;
private DN baseDN;
@@ -263,8 +259,6 @@
private boolean shutdown = false;
- private DN configDn;
-
private InternalClientConnection conn =
InternalClientConnection.getRootConnection();
@@ -273,102 +267,30 @@
private boolean disabled = false;
private boolean stateSavingDisabled = false;
- static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
- static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
- static final String SERVER_ID_ATTR = "ds-cfg-directory-server-id";
- static final String RECEIVE_STATUS = "ds-cfg-receive-status";
- static final String MAX_RECEIVE_QUEUE = "ds-cfg-max-receive-queue";
- static final String MAX_RECEIVE_DELAY = "ds-cfg-max-receive-delay";
- static final String MAX_SEND_QUEUE = "ds-cfg-max-send-queue";
- static final String MAX_SEND_DELAY = "ds-cfg-max-send-delay";
- static final String WINDOW_SIZE = "ds-cfg-window-size";
- static final String HEARTBEAT_INTERVAL = "ds-cfg-heartbeat-interval";
+ private int window = 100;
- private static final StringConfigAttribute changelogStub =
- new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
- "changelog server information", true, true, false);
-
- private static final IntegerConfigAttribute serverIdStub =
- new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false,
- false, true, 0, true, 65535);
-
- private static final DNConfigAttribute baseDnStub =
- new DNConfigAttribute(BASE_DN_ATTR, "synchronization base DN",
- true, false, false);
-
- private static final BooleanConfigAttribute receiveStatusStub =
- new BooleanConfigAttribute(RECEIVE_STATUS, "receive status", false);
-
-
- /**
- * The set of time units that will be used for expressing the heartbeat
- * interval.
- */
- private static final LinkedHashMap<String,Double> timeUnits =
- new LinkedHashMap<String,Double>();
-
-
-
- static
- {
- timeUnits.put(TIME_UNIT_MILLISECONDS_ABBR, 1D);
- timeUnits.put(TIME_UNIT_MILLISECONDS_FULL, 1D);
- timeUnits.put(TIME_UNIT_SECONDS_ABBR, 1000D);
- timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
- }
/**
* Creates a new SynchronizationDomain using configuration from configEntry.
*
- * @param configEntry The ConfigEntry to use to read the configuration of this
- * SynchronizationDomain.
+ * @param configuration The configuration of this SynchronizationDomain.
* @throws ConfigException In case of invalid configuration.
*/
- public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
+ public SynchronizationDomain(MultimasterDomainCfg configuration)
+ throws ConfigException
{
super("Synchronization flush");
- /*
- * read the centralized changelog server configuration
- * this is a multivalued attribute
- */
- StringConfigAttribute changelogServer =
- (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub);
-
- if (changelogServer == null)
- {
- throw new ConfigException(MSGID_NEED_CHANGELOG_SERVER,
- MessageHandler.getMessage(MSGID_NEED_CHANGELOG_SERVER,
- configEntry.getDN().toString()) );
- }
- changelogServers = changelogServer.activeValues();
- configAttributes.add(changelogServer);
-
- /*
- * read the server Id information
- * this is a single valued integer, its value must fit on a short integer
- */
- IntegerConfigAttribute serverIdAttr =
- (IntegerConfigAttribute) configEntry.getConfigAttribute(serverIdStub);
- if (serverIdAttr == null)
- {
- throw new ConfigException(MSGID_NEED_SERVER_ID,
- MessageHandler.getMessage(MSGID_NEED_SERVER_ID,
- configEntry.getDN().toString()) );
- }
- serverId = (short) serverIdAttr.activeIntValue();
- configAttributes.add(serverIdAttr);
-
- /*
- * read the base DN
- */
- DNConfigAttribute baseDn =
- (DNConfigAttribute) configEntry.getConfigAttribute(baseDnStub);
- if (baseDn == null)
- baseDN = null; // Attribute is not present : don't set a limit
- else
- baseDN = baseDn.activeValue();
- configAttributes.add(baseDn);
+ // Read the configuration parameters.
+ changelogServers = configuration.getChangelogServer();
+ serverId = (short) configuration.getServerId();
+ baseDN = configuration.getSynchronizationDN();
+ maxReceiveQueue = configuration.getMaxReceiveQueue();
+ maxReceiveDelay = (int) configuration.getMaxReceiveDelay();
+ maxSendQueue = configuration.getMaxSendQueue();
+ maxSendDelay = (int) configuration.getMaxSendDelay();
+ window = configuration.getWindowSize();
+ heartbeatInterval = configuration.getHeartbeatInterval();
/*
* Modify conflicts are solved for all suffixes but the schema suffix
@@ -386,114 +308,25 @@
solveConflictFlag = true;
}
+ /*
+ * Create a new Persistent Server State that will be used to store
+ * the last ChangeNmber seen from all LDAP servers in the topology.
+ */
state = new PersistentServerState(baseDN);
/*
- * Read the Receive Status.
+ * Create a Synchronization monitor object responsible for publishing
+ * monitoring information below cn=monitor.
*/
- BooleanConfigAttribute receiveStatusAttr = (BooleanConfigAttribute)
- configEntry.getConfigAttribute(receiveStatusStub);
- if (receiveStatusAttr != null)
- {
- receiveStatus = receiveStatusAttr.activeValue();
- configAttributes.add(receiveStatusAttr);
- }
-
- /*
- * read the synchronization flow control configuration.
- */
- IntegerConfigAttribute maxReceiveQueueStub =
- new IntegerConfigAttribute(MAX_RECEIVE_QUEUE, "max receive queue",
- false, false, false, true, 0,false, 0);
-
- IntegerConfigAttribute maxReceiveQueueAttr = (IntegerConfigAttribute)
- configEntry.getConfigAttribute(maxReceiveQueueStub);
- if (maxReceiveQueueAttr == null)
- maxReceiveQueue = 0; // Attribute is not present : don't set a limit
- else
- {
- maxReceiveQueue = maxReceiveQueueAttr.activeIntValue();
- configAttributes.add(maxReceiveQueueAttr);
- }
-
- IntegerConfigAttribute maxReceiveDelayStub =
- new IntegerConfigAttribute(MAX_RECEIVE_DELAY, "max receive delay",
- false, false, false, true, 0, false, 0);
- IntegerConfigAttribute maxReceiveDelayAttr = (IntegerConfigAttribute)
- configEntry.getConfigAttribute(maxReceiveDelayStub);
- if (maxReceiveDelayAttr == null)
- maxReceiveDelay = 0; // Attribute is not present : don't set a limit
- else
- {
- maxReceiveDelay = maxReceiveDelayAttr.activeIntValue();
- configAttributes.add(maxReceiveDelayAttr);
- }
-
- IntegerConfigAttribute maxSendQueueStub =
- new IntegerConfigAttribute(MAX_SEND_QUEUE, "max send queue",
- false, false, false, true, 0, false, 0);
- IntegerConfigAttribute maxSendQueueAttr =
- (IntegerConfigAttribute) configEntry.getConfigAttribute(maxSendQueueStub);
- if (maxSendQueueAttr == null)
- maxSendQueue = 0; // Attribute is not present : don't set a limit
- else
- {
- maxSendQueue = maxSendQueueAttr.activeIntValue();
- configAttributes.add(maxSendQueueAttr);
- }
-
- IntegerConfigAttribute maxSendDelayStub =
- new IntegerConfigAttribute(MAX_SEND_DELAY, "max send delay",
- false, false, false, true, 0, false, 0);
- IntegerConfigAttribute maxSendDelayAttr =
- (IntegerConfigAttribute) configEntry.getConfigAttribute(maxSendDelayStub);
- if (maxSendDelayAttr == null)
- maxSendDelay = 0; // Attribute is not present : don't set a limit
- else
- {
- maxSendDelay = maxSendDelayAttr.activeIntValue();
- configAttributes.add(maxSendDelayAttr);
- }
-
- Integer window;
- IntegerConfigAttribute windowStub =
- new IntegerConfigAttribute(WINDOW_SIZE, "window size",
- false, false, false, true, 0, false, 0);
- IntegerConfigAttribute windowAttr =
- (IntegerConfigAttribute) configEntry.getConfigAttribute(windowStub);
- if (windowAttr == null)
- window = 100; // Attribute is not present : use the default value
- else
- {
- window = windowAttr.activeIntValue();
- configAttributes.add(windowAttr);
- }
-
- IntegerWithUnitConfigAttribute heartbeatStub =
- new IntegerWithUnitConfigAttribute(HEARTBEAT_INTERVAL,
- "heartbeat interval",
- false, timeUnits, true, 0, false, 0);
- IntegerWithUnitConfigAttribute heartbeatAttr =
- (IntegerWithUnitConfigAttribute)
- configEntry.getConfigAttribute(heartbeatStub);
- if (heartbeatAttr == null)
- {
- // Attribute is not present : use the default value
- heartbeatInterval = 1000;
- }
- else
- {
- heartbeatInterval = heartbeatAttr.activeCalculatedValue();
- configAttributes.add(heartbeatAttr);
- }
-
- configDn = configEntry.getDN();
- DirectoryServer.registerConfigurableComponent(this);
-
monitor = new SynchronizationMonitor(this);
DirectoryServer.registerMonitorProvider(monitor);
+ /*
+ * ChangeNumberGenerator is used to create new unique ChangeNumbers
+ * for each operation done on the synchronization domain.
+ */
changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
+
/*
* create the broker object used to publish and receive changes
*/
@@ -519,14 +352,9 @@
* should we stop the modifications ?
*/
}
- }
- /**
- * {@inheritDoc}
- */
- public DN getConfigurableComponentEntryDN()
- {
- return configDn;
+ // listen for changes on the configuration
+ configuration.addChangeListener(this);
}
/**
@@ -537,112 +365,6 @@
return configAttributes;
}
- /**
- * {@inheritDoc}
- */
- public boolean hasAcceptableConfiguration(ConfigEntry configEntry,
- List<String> unacceptableReasons)
- {
- boolean acceptable = true;
- StringConfigAttribute changelog = null;
- try
- {
- changelog = (StringConfigAttribute)
- configEntry.getConfigAttribute(changelogStub);
- } catch (ConfigException e)
- {
- acceptable = false;
- unacceptableReasons.add("Need at least one changelog server.");
- }
- if (changelog == null)
- {
- acceptable = false;
- unacceptableReasons.add("Need at least one changelog server.");
- }
- return acceptable;
- }
-
- /**
- * {@inheritDoc}
- */
- public ConfigChangeResult applyNewConfiguration(ConfigEntry configEntry,
- boolean detailedResults)
- {
- StringConfigAttribute changelog = null;
- List<String> newChangelogServers;
- boolean newReceiveStatus;
-
- try
- {
- /*
- * check if changelog server list changed
- */
- changelog = (StringConfigAttribute)
- configEntry.getConfigAttribute(changelogStub);
-
- newChangelogServers = changelog.activeValues();
-
- boolean sameConf = true;
- for (String s :newChangelogServers)
- if (!changelogServers.contains(s))
- sameConf = false;
- for (String s : changelogServers)
- if (!newChangelogServers.contains(s))
- sameConf = false;
-
- if (!sameConf)
- {
- broker.stop();
- changelogServers = newChangelogServers;
- broker.start(changelogServers);
- }
-
- /*
- * check if reception should be disabled
- */
- newReceiveStatus = ((BooleanConfigAttribute)
- configEntry.getConfigAttribute(receiveStatusStub)).activeValue();
- if (newReceiveStatus != receiveStatus)
- {
- /*
- * was disabled and moved to enabled
- */
- if (newReceiveStatus)
- {
- broker.restartReceive();
- for (int i=0; i<listenerThreadNumber; i++)
- {
- ListenerThread myThread = new ListenerThread(this);
- myThread.start();
- synchroThreads.add(myThread);
- }
- }
- else
- {
- /* was enabled and moved to disabled */
- broker.suspendReceive();
- // FIXME Need a way to stop these threads.
- // Setting the shutdown flag does not stop them until they have
- // consumed and discarded one more message each.
-// for (ListenerThread thread : synchroThreads)
-// {
-// thread.shutdown();
-// }
- synchroThreads.clear();
- }
- receiveStatus = newReceiveStatus;
- }
-
- } catch (Exception e)
- {
- /* this should never happen because the parameters have been
- * validated by hasAcceptableConfiguration
- */
- return new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false);
- }
-
- return new ConfigChangeResult(ResultCode.SUCCESS, false);
- }
/**
* Returns the base DN of this SynchronizationDomain.
@@ -1041,9 +763,7 @@
public void receiveAck(AckMessage ack)
{
UpdateMessage update;
- ChangeNumber changeNumber;
-
- changeNumber = ack.getChangeNumber();
+ ChangeNumber changeNumber = ack.getChangeNumber();
synchronized (pendingChanges)
{
@@ -1933,51 +1653,6 @@
}
/**
- * Check if a ConfigEntry is valid.
- * @param configEntry The config entry that needs to be checked.
- * @param unacceptableReason A description of the reason why the config entry
- * is not acceptable (if return is false).
- * @return a boolean indicating if the configEntry is valid.
- */
- public static boolean checkConfigEntry(ConfigEntry configEntry,
- StringBuilder unacceptableReason)
- {
- try
- {
- StringConfigAttribute changelogServer =
- (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub);
-
- if (changelogServer == null)
- {
- unacceptableReason.append(
- MessageHandler.getMessage(MSGID_NEED_CHANGELOG_SERVER,
- configEntry.getDN().toString()) );
- return false;
- }
-
- /*
- * read the server Id information
- * this is a single valued integer, its value must fit on a short integer
- */
- IntegerConfigAttribute serverIdAttr =
- (IntegerConfigAttribute) configEntry.getConfigAttribute(serverIdStub);
- if (serverIdAttr == null)
- {
- unacceptableReason.append(
- MessageHandler.getMessage(MSGID_NEED_SERVER_ID,
- configEntry.getDN().toString()) );
- return false;
- }
- }
- catch (ConfigException e)
- {
- unacceptableReason.append(e.getMessage());
- return false;
- }
- return true;
- }
-
- /**
* Get the maximum receive window size.
*
* @return The maximum receive window size.
@@ -2026,7 +1701,6 @@
return broker.getNumLostConnections();
}
-
/**
* Check if the domain solve conflicts.
*
@@ -3109,4 +2783,60 @@
op.setResultCode(ResultCode.SUCCESS);
synchronize(op);
}
+
+ /**
+ * Check if the provided configuration is acceptable for add.
+ *
+ * @param configuration The configuration to check.
+ * @param unacceptableReasons When the configuration is not acceptable, this
+ * table is use to return the reasons why this
+ * configuration is not acceptbale.
+ *
+ * @return true if the configuration is acceptable, false other wise.
+ */
+ public static boolean isConfigurationAcceptable(
+ MultimasterDomainCfg configuration, List<String> unacceptableReasons)
+ {
+ // Check that there is not already a domain with the same DN
+ // TODO : Check that the server id is a short
+ DN dn = configuration.getSynchronizationDN();
+ if (MultimasterSynchronization.findDomain(dn,null) != null)
+ {
+ String message = getMessage(MSGID_SYNC_INVALID_DN, dn.toString());
+ unacceptableReasons.add(message);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public ConfigChangeResult applyConfigurationChange(
+ MultimasterDomainCfg configuration)
+ {
+ // server id and base dn are readonly.
+ // The other parameters needs to be renegociated with the Changelog Server.
+ // so that requires restarting the session with the Changelog Server.
+ changelogServers = configuration.getChangelogServer();
+ maxReceiveQueue = configuration.getMaxReceiveQueue();
+ maxReceiveDelay = (int) configuration.getMaxReceiveDelay();
+ maxSendQueue = configuration.getMaxSendQueue();
+ maxSendDelay = (int) configuration.getMaxSendDelay();
+ window = configuration.getWindowSize();
+ heartbeatInterval = configuration.getHeartbeatInterval();
+ broker.changeConfig(changelogServers, maxReceiveQueue, maxReceiveDelay,
+ maxSendQueue, maxSendDelay, window, heartbeatInterval);
+
+ return new ConfigChangeResult(ResultCode.SUCCESS, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isConfigurationChangeAcceptable(
+ MultimasterDomainCfg configuration, List<String> unacceptableReasons)
+ {
+ return true;
+ }
}
--
Gitblit v1.10.0