From 1b29f765bdfe3705c1b88fcdbb4fa923682e9678 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 23 Apr 2007 13:38:48 +0000
Subject: [PATCH] This integrates the multi-master synchronization with the new admin framework (issue 1477) and makes possible to dynamically add or remove changelog server and synchronization domains in a running server (issue 639).
---
opends/src/server/org/opends/server/synchronization/changelog/Changelog.java | 299 +++++++++++++++--------------------------------------------
1 files changed, 76 insertions(+), 223 deletions(-)
diff --git a/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 531569c..e813bd6 100644
--- a/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -40,26 +40,24 @@
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
+import org.opends.server.admin.server.ConfigurationChangeListener;
+import org.opends.server.admin.std.server.ChangelogServerCfg;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
-import org.opends.server.config.IntegerConfigAttribute;
-import org.opends.server.config.IntegerWithUnitConfigAttribute;
-import org.opends.server.config.StringConfigAttribute;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.messages.MessageHandler;
import org.opends.server.synchronization.protocol.SocketSession;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.ResultCode;
import com.sleepycat.je.DatabaseException;
@@ -73,7 +71,9 @@
*
* It is responsible for creating the changelog cache and managing it
*/
-public class Changelog implements Runnable, ConfigurableComponent
+public class Changelog
+ implements Runnable, ConfigurableComponent,
+ ConfigurationChangeListener<ChangelogServerCfg>
{
private short serverId;
private String serverURL;
@@ -85,7 +85,7 @@
private boolean runListen = true;
/* The list of changelog servers configured by the administrator */
- private List<String> changelogServers;
+ private Collection<String> changelogServers;
/* This table is used to store the list of dn for which we are currently
* handling servers.
@@ -106,212 +106,30 @@
private long trimAge; // the time (in sec) after which the changes must
// de deleted from the persistent storage.
- static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
- static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
- static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
- static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
- static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
- static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory";
- static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay";
-
-
- static final IntegerConfigAttribute changelogPortStub =
- new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
- true, false, false, true, 0, true, 65535);
-
- static final IntegerConfigAttribute serverIdStub =
- new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false,
- false, true, 0, true, 65535);
-
- static final StringConfigAttribute changelogStub =
- new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
- "changelog server information", true, true, false);
-
- static final IntegerConfigAttribute windowStub =
- new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
- false, false, false, true, 0, false, 0);
-
- static final IntegerConfigAttribute queueSizeStub =
- new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size",
- false, false, false, true, 0, false, 0);
-
- static final StringConfigAttribute dbDirnameStub =
- new StringConfigAttribute(CHANGELOG_DIR_PATH_ATTR,
- "changelog storage directory path", false, false, true);
-
- /**
- * The set of time units that will be used for expressing the
- * changelog purge delay.
- */
- private static final LinkedHashMap<String,Double> purgeTimeUnits =
- new LinkedHashMap<String,Double>();
-
- static
- {
- purgeTimeUnits.put(TIME_UNIT_SECONDS_ABBR, 1D);
- purgeTimeUnits.put(TIME_UNIT_SECONDS_FULL, 1D);
- purgeTimeUnits.put(TIME_UNIT_MINUTES_ABBR, 60D);
- purgeTimeUnits.put(TIME_UNIT_MINUTES_FULL, 1D);
- purgeTimeUnits.put(TIME_UNIT_HOURS_ABBR, 60*60D);
- purgeTimeUnits.put(TIME_UNIT_HOURS_FULL, 60*60D);
- purgeTimeUnits.put(TIME_UNIT_DAYS_ABBR, 24*60*60D);
- purgeTimeUnits.put(TIME_UNIT_DAYS_FULL, 24*60*60D);
- }
-
- static final IntegerWithUnitConfigAttribute purgeDelayStub =
- new IntegerWithUnitConfigAttribute(PURGE_DELAY_ATTR,
- "changelog purge delay", false, purgeTimeUnits, true, 0, false, 0);
-
- /**
- * Check if a ConfigEntry is valid.
- * @param config The config entry that needs to be checked.
- * @param unacceptableReason A description of the reason why the config entry
- * is not acceptable (if return is false).
- * @return a boolean indicating if the configEntry is valid.
- */
- public static boolean checkConfigEntry(ConfigEntry config,
- StringBuilder unacceptableReason)
- {
- try
- {
- IntegerConfigAttribute changelogPortAttr;
- changelogPortAttr =
- (IntegerConfigAttribute) config.getConfigAttribute(changelogPortStub);
-
- /* The config must provide a changelog port number
- */
- if (changelogPortAttr == null)
- {
- unacceptableReason.append(
- MessageHandler.getMessage(MSGID_NEED_CHANGELOG_PORT,
- config.getDN().toString()) );
- }
-
- /*
- * read the server Id information
- * this is a single valued integer, its value must fit on a
- * short integer
- */
- IntegerConfigAttribute serverIdAttr =
- (IntegerConfigAttribute) config.getConfigAttribute(serverIdStub);
-
- if (serverIdAttr == null)
- {
- unacceptableReason.append(
- MessageHandler.getMessage(MSGID_NEED_SERVER_ID,
- config.getDN().toString()) );
- }
-
- return true;
- } catch (ConfigException e)
- {
- return false;
- }
- }
-
/**
* Creates a new Changelog using the provided configuration entry.
*
- * @param config The configuration entry where configuration can be found.
- * @throws ConfigException When Configuration entry is invalid.
+ * @param configuration The configuration of this changelog.
+ * @throws ConfigException When Configuration is invalid.
*/
- public Changelog(ConfigEntry config) throws ConfigException
+ public Changelog(ChangelogServerCfg configuration) throws ConfigException
{
shutdown = false;
runListen = true;
-
- IntegerConfigAttribute changelogPortAttr =
- (IntegerConfigAttribute) config.getConfigAttribute(changelogPortStub);
- /* if there is no changelog port configured, this process must not be a
- * changelog server
- */
- if (changelogPortAttr == null)
- {
- throw new ConfigException(MSGID_NEED_CHANGELOG_PORT,
- MessageHandler.getMessage(MSGID_NEED_CHANGELOG_PORT,
- config.getDN().toString()) );
- }
- int changelogPort = changelogPortAttr.activeIntValue();
- configAttributes.add(changelogPortAttr);
-
- /*
- * read the server Id information
- * this is a single valued integer, its value must fit on a
- * short integer
- */
- IntegerConfigAttribute serverIdAttr =
- (IntegerConfigAttribute) config.getConfigAttribute(serverIdStub);
-
- if (serverIdAttr == null)
- {
- throw new ConfigException(MSGID_NEED_SERVER_ID,
- MessageHandler.getMessage(MSGID_NEED_SERVER_ID,
- config.getDN().toString()) );
- }
- changelogServerId = (short) serverIdAttr.activeIntValue();
- configAttributes.add(serverIdAttr);
-
- /*
- * read the centralized changelog server configuration
- * this is a multivalued attribute
- */
- StringConfigAttribute changelogServer =
- (StringConfigAttribute) config.getConfigAttribute(changelogStub);
- changelogServers = new ArrayList<String>();
- if (changelogServer != null)
- {
- for (String serverURL : changelogServer.activeValues())
- {
- String[] splitStrings = serverURL.split(":");
- try
- {
- changelogServers.add(
- InetAddress.getByName(splitStrings[0]).getHostAddress()
- + ":" + splitStrings[1]);
- } catch (UnknownHostException e)
- {
- throw new ConfigException(MSGID_UNKNOWN_HOSTNAME,
- e.getLocalizedMessage());
- }
- }
- }
- configAttributes.add(changelogServer);
-
- IntegerConfigAttribute windowAttr =
- (IntegerConfigAttribute) config.getConfigAttribute(windowStub);
- if (windowAttr == null)
- rcvWindow = 100; // Attribute is not present : use the default value
- else
- {
- rcvWindow = windowAttr.activeIntValue();
- configAttributes.add(windowAttr);
- }
-
- IntegerConfigAttribute queueSizeAttr =
- (IntegerConfigAttribute) config.getConfigAttribute(queueSizeStub);
- if (queueSizeAttr == null)
- queueSize = 10000; // Attribute is not present : use the default value
- else
- {
- queueSize = queueSizeAttr.activeIntValue();
- configAttributes.add(queueSizeAttr);
- }
-
- /*
- * read the storage directory path attribute
- */
- StringConfigAttribute dbDirnameAttr =
- (StringConfigAttribute) config.getConfigAttribute(dbDirnameStub);
- if (dbDirnameAttr == null)
+ int changelogPort = configuration.getChangelogPort();
+ changelogServerId = (short) configuration.getChangelogServerId();
+ changelogServers = configuration.getChangelogServer();
+ if (changelogServers == null)
+ changelogServers = new ArrayList<String>();
+ queueSize = configuration.getQueueSize();
+ trimAge = configuration.getChangelogPurgeDelay();
+ dbDirname = configuration.getChangelogDbDirectory();
+ rcvWindow = configuration.getWindowSize();
+ if (dbDirname == null)
{
dbDirname = "changelogDb";
}
- else
- {
- dbDirname = dbDirnameAttr.activeValue();
- configAttributes.add(changelogServer);
- }
- // Exists or Create
+ // Chech that this path exists or create it.
File f = getFileForPath(dbDirname);
try
{
@@ -326,24 +144,8 @@
e.getMessage() + " " + getFileForPath(dbDirname));
}
- /*
- * Read the Purge Delay (trim age) attribute
- */
- IntegerWithUnitConfigAttribute purgeDelayAttr =
- (IntegerWithUnitConfigAttribute) config.getConfigAttribute(
- purgeDelayStub);
- if (purgeDelayAttr == null)
- trimAge = 24*60*60; // not present : use the default value : 1 day
- else
- {
- trimAge = purgeDelayAttr.activeCalculatedValue();
- configAttributes.add(purgeDelayAttr);
- }
-
initialize(changelogServerId, changelogPort);
-
- configDn = config.getDN();
- DirectoryServer.registerConfigurableComponent(this);
+ configuration.addChangeListener(this);
}
/**
@@ -452,7 +254,7 @@
*/
for (String serverURL : changelogServers)
{
- if ((serverURL.compareTo(localURL) != 0) &&
+ if ((serverURL.compareTo(this.serverURL) != 0) &&
(!connectedChangelogs.contains(serverURL)))
{
this.connect(serverURL, changelogCache.getBaseDn());
@@ -629,7 +431,6 @@
}
dbEnv.shutdown();
- DirectoryServer.deregisterConfigurableComponent(this);
}
@@ -659,4 +460,56 @@
{
return trimAge * 1000;
}
+
+ /**
+ * Check if the provided configuration is acceptable for add.
+ *
+ * @param configuration The configuration to check.
+ * @param unacceptableReasons When the configuration is not acceptable, this
+ * table is use to return the reasons why this
+ * configuration is not acceptbale.
+ *
+ * @return true if the configuration is acceptable, false other wise.
+ */
+ public static boolean isConfigurationAcceptable(
+ ChangelogServerCfg configuration, List<String> unacceptableReasons)
+ {
+ int port = configuration.getChangelogPort();
+
+ try
+ {
+ ServerSocket tmpSocket = new ServerSocket();
+ tmpSocket.bind(new InetSocketAddress(port));
+ tmpSocket.close();
+ }
+ catch (Exception e)
+ {
+ String message = getMessage(MSGID_COULD_NOT_BIND_CHANGELOG, port,
+ e.getMessage());
+ unacceptableReasons.add(message);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public ConfigChangeResult applyConfigurationChange(
+ ChangelogServerCfg configuration)
+ {
+ // TODO : implement this
+ return new ConfigChangeResult(ResultCode.SUCCESS, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isConfigurationChangeAcceptable(
+ ChangelogServerCfg configuration, List<String> unacceptableReasons)
+ {
+ // TODO : implement this
+ return true;
+ }
}
--
Gitblit v1.10.0