From 18d8dd990ea2072267b32e3200c3291fdd53576a Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 23 Apr 2007 13:38:48 +0000
Subject: [PATCH] This integrates the multi-master synchronization with the new admin framework (issue 1477) and makes possible to dynamically add or remove changelog server and synchronization domains in a running server (issue 639).

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java |  448 +++++++++++--------------------------------------------
 1 files changed, 89 insertions(+), 359 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index 743d24d..c597f18 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -39,14 +39,13 @@
 import static org.opends.server.synchronization.common.LogMessages.*;
 import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
 import static org.opends.server.synchronization.protocol.OperationContext.*;
-import static org.opends.server.util.ServerConstants.*;
 import static org.opends.server.util.StaticUtils.createEntry;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -55,20 +54,18 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.DataFormatException;
 
+import org.opends.server.admin.server.ConfigurationChangeListener;
+import org.opends.server.admin.std.server.MultimasterDomainCfg;
 import org.opends.server.api.Backend;
-import org.opends.server.api.ConfigurableComponent;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.api.SynchronizationProvider;
 import org.opends.server.backends.jeb.BackendImpl;
 import org.opends.server.backends.task.Task;
 import org.opends.server.backends.task.TaskState;
-import org.opends.server.config.BooleanConfigAttribute;
 import org.opends.server.config.ConfigAttribute;
 import org.opends.server.config.ConfigEntry;
 import org.opends.server.config.ConfigException;
 import org.opends.server.config.DNConfigAttribute;
-import org.opends.server.config.IntegerConfigAttribute;
-import org.opends.server.config.IntegerWithUnitConfigAttribute;
 import org.opends.server.config.StringConfigAttribute;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
@@ -77,7 +74,6 @@
 import org.opends.server.core.ModifyDNOperation;
 import org.opends.server.core.ModifyOperation;
 import org.opends.server.core.Operation;
-import org.opends.server.messages.MessageHandler;
 import org.opends.server.protocols.asn1.ASN1Exception;
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
@@ -130,7 +126,7 @@
  *  handle protocol messages from the changelog server.
  */
 public class SynchronizationDomain extends DirectoryThread
-       implements ConfigurableComponent
+       implements ConfigurationChangeListener<MultimasterDomainCfg>
 {
   private SynchronizationMonitor monitor;
 
@@ -254,7 +250,7 @@
   private int listenerThreadNumber = 10;
   private boolean receiveStatus = true;
 
-  private List<String> changelogServers;
+  private Collection<String> changelogServers;
 
   private DN baseDN;
 
@@ -263,8 +259,6 @@
 
   private boolean shutdown = false;
 
-  private DN configDn;
-
   private InternalClientConnection conn =
       InternalClientConnection.getRootConnection();
 
@@ -273,102 +267,30 @@
   private boolean disabled = false;
   private boolean stateSavingDisabled = false;
 
-  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
-  static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
-  static final String SERVER_ID_ATTR = "ds-cfg-directory-server-id";
-  static final String RECEIVE_STATUS = "ds-cfg-receive-status";
-  static final String MAX_RECEIVE_QUEUE = "ds-cfg-max-receive-queue";
-  static final String MAX_RECEIVE_DELAY = "ds-cfg-max-receive-delay";
-  static final String MAX_SEND_QUEUE = "ds-cfg-max-send-queue";
-  static final String MAX_SEND_DELAY = "ds-cfg-max-send-delay";
-  static final String WINDOW_SIZE = "ds-cfg-window-size";
-  static final String HEARTBEAT_INTERVAL = "ds-cfg-heartbeat-interval";
+  private int window = 100;
 
-  private static final StringConfigAttribute changelogStub =
-    new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
-        "changelog server information", true, true, false);
-
-  private static final IntegerConfigAttribute serverIdStub =
-    new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false,
-                               false, true, 0, true, 65535);
-
-  private static final DNConfigAttribute baseDnStub =
-    new DNConfigAttribute(BASE_DN_ATTR, "synchronization base DN",
-                          true, false, false);
-
-  private static final BooleanConfigAttribute receiveStatusStub =
-    new BooleanConfigAttribute(RECEIVE_STATUS, "receive status", false);
-
-
-  /**
-   * The set of time units that will be used for expressing the heartbeat
-   * interval.
-   */
-  private static final LinkedHashMap<String,Double> timeUnits =
-       new LinkedHashMap<String,Double>();
-
-
-
-  static
-  {
-    timeUnits.put(TIME_UNIT_MILLISECONDS_ABBR, 1D);
-    timeUnits.put(TIME_UNIT_MILLISECONDS_FULL, 1D);
-    timeUnits.put(TIME_UNIT_SECONDS_ABBR, 1000D);
-    timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
-  }
 
   /**
    * Creates a new SynchronizationDomain using configuration from configEntry.
    *
-   * @param configEntry The ConfigEntry to use to read the configuration of this
-   *                    SynchronizationDomain.
+   * @param configuration    The configuration of this SynchronizationDomain.
    * @throws ConfigException In case of invalid configuration.
    */
-  public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
+  public SynchronizationDomain(MultimasterDomainCfg configuration)
+    throws ConfigException
   {
     super("Synchronization flush");
 
-    /*
-     * read the centralized changelog server configuration
-     * this is a multivalued attribute
-     */
-    StringConfigAttribute changelogServer =
-      (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub);
-
-    if (changelogServer == null)
-    {
-      throw new ConfigException(MSGID_NEED_CHANGELOG_SERVER,
-          MessageHandler.getMessage(MSGID_NEED_CHANGELOG_SERVER,
-              configEntry.getDN().toString()) );
-    }
-    changelogServers = changelogServer.activeValues();
-    configAttributes.add(changelogServer);
-
-    /*
-     * read the server Id information
-     * this is a single valued integer, its value must fit on a short integer
-     */
-    IntegerConfigAttribute serverIdAttr =
-      (IntegerConfigAttribute) configEntry.getConfigAttribute(serverIdStub);
-    if (serverIdAttr == null)
-    {
-      throw new ConfigException(MSGID_NEED_SERVER_ID,
-          MessageHandler.getMessage(MSGID_NEED_SERVER_ID,
-              configEntry.getDN().toString())  );
-    }
-    serverId = (short) serverIdAttr.activeIntValue();
-    configAttributes.add(serverIdAttr);
-
-    /*
-     * read the base DN
-     */
-    DNConfigAttribute baseDn =
-      (DNConfigAttribute) configEntry.getConfigAttribute(baseDnStub);
-    if (baseDn == null)
-      baseDN = null;  // Attribute is not present : don't set a limit
-    else
-      baseDN = baseDn.activeValue();
-    configAttributes.add(baseDn);
+    // Read the configuration parameters.
+    changelogServers = configuration.getChangelogServer();
+    serverId = (short) configuration.getServerId();
+    baseDN = configuration.getSynchronizationDN();
+    maxReceiveQueue = configuration.getMaxReceiveQueue();
+    maxReceiveDelay = (int) configuration.getMaxReceiveDelay();
+    maxSendQueue = configuration.getMaxSendQueue();
+    maxSendDelay = (int) configuration.getMaxSendDelay();
+    window  = configuration.getWindowSize();
+    heartbeatInterval = configuration.getHeartbeatInterval();
 
     /*
      * Modify conflicts are solved for all suffixes but the schema suffix
@@ -386,114 +308,25 @@
       solveConflictFlag = true;
     }
 
+    /*
+     * Create a new Persistent Server State that will be used to store
+     * the last ChangeNmber seen from all LDAP servers in the topology.
+     */
     state = new PersistentServerState(baseDN);
 
     /*
-     * Read the Receive Status.
+     * Create a Synchronization monitor object responsible for publishing
+     * monitoring information below cn=monitor.
      */
-    BooleanConfigAttribute receiveStatusAttr = (BooleanConfigAttribute)
-          configEntry.getConfigAttribute(receiveStatusStub);
-    if (receiveStatusAttr != null)
-    {
-      receiveStatus = receiveStatusAttr.activeValue();
-      configAttributes.add(receiveStatusAttr);
-    }
-
-    /*
-     * read the synchronization flow control configuration.
-     */
-    IntegerConfigAttribute maxReceiveQueueStub =
-      new IntegerConfigAttribute(MAX_RECEIVE_QUEUE, "max receive queue",
-                                 false, false, false, true, 0,false, 0);
-
-    IntegerConfigAttribute maxReceiveQueueAttr = (IntegerConfigAttribute)
-              configEntry.getConfigAttribute(maxReceiveQueueStub);
-    if (maxReceiveQueueAttr == null)
-      maxReceiveQueue = 0;  // Attribute is not present : don't set a limit
-    else
-    {
-      maxReceiveQueue = maxReceiveQueueAttr.activeIntValue();
-      configAttributes.add(maxReceiveQueueAttr);
-    }
-
-    IntegerConfigAttribute maxReceiveDelayStub =
-      new IntegerConfigAttribute(MAX_RECEIVE_DELAY, "max receive delay",
-                                 false, false, false, true, 0, false, 0);
-    IntegerConfigAttribute maxReceiveDelayAttr = (IntegerConfigAttribute)
-              configEntry.getConfigAttribute(maxReceiveDelayStub);
-    if (maxReceiveDelayAttr == null)
-      maxReceiveDelay = 0;  // Attribute is not present : don't set a limit
-    else
-    {
-      maxReceiveDelay = maxReceiveDelayAttr.activeIntValue();
-      configAttributes.add(maxReceiveDelayAttr);
-    }
-
-    IntegerConfigAttribute maxSendQueueStub =
-      new IntegerConfigAttribute(MAX_SEND_QUEUE, "max send queue",
-                                 false, false, false, true, 0, false, 0);
-    IntegerConfigAttribute maxSendQueueAttr =
-      (IntegerConfigAttribute) configEntry.getConfigAttribute(maxSendQueueStub);
-    if (maxSendQueueAttr == null)
-      maxSendQueue = 0;  // Attribute is not present : don't set a limit
-    else
-    {
-      maxSendQueue = maxSendQueueAttr.activeIntValue();
-      configAttributes.add(maxSendQueueAttr);
-    }
-
-    IntegerConfigAttribute maxSendDelayStub =
-      new IntegerConfigAttribute(MAX_SEND_DELAY, "max send delay",
-                                 false, false, false, true, 0, false, 0);
-    IntegerConfigAttribute maxSendDelayAttr =
-      (IntegerConfigAttribute) configEntry.getConfigAttribute(maxSendDelayStub);
-    if (maxSendDelayAttr == null)
-      maxSendDelay = 0;  // Attribute is not present : don't set a limit
-    else
-    {
-      maxSendDelay = maxSendDelayAttr.activeIntValue();
-      configAttributes.add(maxSendDelayAttr);
-    }
-
-    Integer window;
-    IntegerConfigAttribute windowStub =
-      new IntegerConfigAttribute(WINDOW_SIZE, "window size",
-                                 false, false, false, true, 0, false, 0);
-    IntegerConfigAttribute windowAttr =
-      (IntegerConfigAttribute) configEntry.getConfigAttribute(windowStub);
-    if (windowAttr == null)
-      window = 100;  // Attribute is not present : use the default value
-    else
-    {
-      window = windowAttr.activeIntValue();
-      configAttributes.add(windowAttr);
-    }
-
-    IntegerWithUnitConfigAttribute heartbeatStub =
-      new IntegerWithUnitConfigAttribute(HEARTBEAT_INTERVAL,
-                                         "heartbeat interval",
-                                         false, timeUnits, true, 0, false, 0);
-    IntegerWithUnitConfigAttribute heartbeatAttr =
-      (IntegerWithUnitConfigAttribute)
-           configEntry.getConfigAttribute(heartbeatStub);
-    if (heartbeatAttr == null)
-    {
-      // Attribute is not present : use the default value
-      heartbeatInterval = 1000;
-    }
-    else
-    {
-      heartbeatInterval = heartbeatAttr.activeCalculatedValue();
-      configAttributes.add(heartbeatAttr);
-    }
-
-    configDn = configEntry.getDN();
-    DirectoryServer.registerConfigurableComponent(this);
-
     monitor = new SynchronizationMonitor(this);
     DirectoryServer.registerMonitorProvider(monitor);
 
+    /*
+     * ChangeNumberGenerator is used to create new unique ChangeNumbers
+     * for each operation done on the synchronization domain.
+     */
     changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
+
     /*
      * create the broker object used to publish and receive changes
      */
@@ -519,14 +352,9 @@
       * should we stop the modifications ?
       */
     }
-  }
 
-  /**
-   * {@inheritDoc}
-   */
-  public DN getConfigurableComponentEntryDN()
-  {
-    return configDn;
+    // listen for changes on the configuration
+    configuration.addChangeListener(this);
   }
 
   /**
@@ -537,112 +365,6 @@
     return configAttributes;
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public boolean hasAcceptableConfiguration(ConfigEntry configEntry,
-      List<String> unacceptableReasons)
-  {
-    boolean acceptable = true;
-    StringConfigAttribute changelog = null;
-    try
-    {
-      changelog = (StringConfigAttribute)
-                                  configEntry.getConfigAttribute(changelogStub);
-    } catch (ConfigException e)
-    {
-      acceptable = false;
-      unacceptableReasons.add("Need at least one changelog server.");
-    }
-    if (changelog == null)
-    {
-      acceptable = false;
-      unacceptableReasons.add("Need at least one changelog server.");
-    }
-    return acceptable;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public ConfigChangeResult applyNewConfiguration(ConfigEntry configEntry,
-      boolean detailedResults)
-  {
-    StringConfigAttribute changelog = null;
-    List<String> newChangelogServers;
-    boolean newReceiveStatus;
-
-    try
-    {
-      /*
-       *  check if changelog server list changed
-       */
-      changelog = (StringConfigAttribute)
-                                  configEntry.getConfigAttribute(changelogStub);
-
-      newChangelogServers = changelog.activeValues();
-
-      boolean sameConf = true;
-      for (String s :newChangelogServers)
-        if (!changelogServers.contains(s))
-          sameConf = false;
-      for (String s : changelogServers)
-        if (!newChangelogServers.contains(s))
-          sameConf = false;
-
-      if (!sameConf)
-      {
-        broker.stop();
-        changelogServers = newChangelogServers;
-        broker.start(changelogServers);
-      }
-
-      /*
-       * check if reception should be disabled
-       */
-      newReceiveStatus = ((BooleanConfigAttribute)
-               configEntry.getConfigAttribute(receiveStatusStub)).activeValue();
-      if (newReceiveStatus != receiveStatus)
-      {
-        /*
-         * was disabled and moved to enabled
-         */
-        if (newReceiveStatus)
-        {
-          broker.restartReceive();
-          for (int i=0; i<listenerThreadNumber; i++)
-          {
-            ListenerThread myThread = new ListenerThread(this);
-            myThread.start();
-            synchroThreads.add(myThread);
-          }
-        }
-        else
-        {
-          /* was enabled and moved to disabled */
-          broker.suspendReceive();
-          // FIXME Need a way to stop these threads.
-          // Setting the shutdown flag does not stop them until they have
-          // consumed and discarded one more message each.
-//          for (ListenerThread thread : synchroThreads)
-//          {
-//            thread.shutdown();
-//          }
-          synchroThreads.clear();
-        }
-        receiveStatus = newReceiveStatus;
-      }
-
-    } catch (Exception e)
-    {
-      /* this should never happen because the parameters have been
-       * validated by hasAcceptableConfiguration
-       */
-      return new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false);
-    }
-
-    return new ConfigChangeResult(ResultCode.SUCCESS, false);
-  }
 
   /**
    * Returns the base DN of this SynchronizationDomain.
@@ -1041,9 +763,7 @@
   public void receiveAck(AckMessage ack)
   {
     UpdateMessage update;
-    ChangeNumber changeNumber;
-
-    changeNumber = ack.getChangeNumber();
+    ChangeNumber changeNumber = ack.getChangeNumber();
 
     synchronized (pendingChanges)
     {
@@ -1933,51 +1653,6 @@
   }
 
   /**
-   * Check if a ConfigEntry is valid.
-   * @param configEntry The config entry that needs to be checked.
-   * @param unacceptableReason A description of the reason why the config entry
-   *                           is not acceptable (if return is false).
-   * @return a boolean indicating if the configEntry is valid.
-   */
-  public static boolean checkConfigEntry(ConfigEntry configEntry,
-      StringBuilder unacceptableReason)
-  {
-    try
-    {
-    StringConfigAttribute changelogServer =
-      (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub);
-
-    if (changelogServer == null)
-    {
-      unacceptableReason.append(
-          MessageHandler.getMessage(MSGID_NEED_CHANGELOG_SERVER,
-          configEntry.getDN().toString()) );
-      return false;
-    }
-
-    /*
-     * read the server Id information
-     * this is a single valued integer, its value must fit on a short integer
-     */
-    IntegerConfigAttribute serverIdAttr =
-      (IntegerConfigAttribute) configEntry.getConfigAttribute(serverIdStub);
-    if (serverIdAttr == null)
-    {
-      unacceptableReason.append(
-          MessageHandler.getMessage(MSGID_NEED_SERVER_ID,
-              configEntry.getDN().toString()) );
-      return false;
-    }
-    }
-    catch (ConfigException e)
-    {
-      unacceptableReason.append(e.getMessage());
-      return false;
-    }
-    return true;
-  }
-
-  /**
    * Get the maximum receive window size.
    *
    * @return The maximum receive window size.
@@ -2026,7 +1701,6 @@
     return broker.getNumLostConnections();
   }
 
-
   /**
    * Check if the domain solve conflicts.
    *
@@ -3109,4 +2783,60 @@
     op.setResultCode(ResultCode.SUCCESS);
     synchronize(op);
   }
+
+  /**
+   * Check if the provided configuration is acceptable for add.
+   *
+   * @param configuration The configuration to check.
+   * @param unacceptableReasons When the configuration is not acceptable, this
+   *                            table is use to return the reasons why this
+   *                            configuration is not acceptbale.
+   *
+   * @return true if the configuration is acceptable, false other wise.
+   */
+  public static boolean isConfigurationAcceptable(
+      MultimasterDomainCfg configuration, List<String> unacceptableReasons)
+  {
+    // Check that there is not already a domain with the same DN
+    // TODO : Check that the server id is a short
+    DN dn = configuration.getSynchronizationDN();
+    if (MultimasterSynchronization.findDomain(dn,null) != null)
+    {
+      String message = getMessage(MSGID_SYNC_INVALID_DN, dn.toString());
+      unacceptableReasons.add(message);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public ConfigChangeResult applyConfigurationChange(
+         MultimasterDomainCfg configuration)
+  {
+    // server id and base dn are readonly.
+    // The other parameters needs to be renegociated with the Changelog Server.
+    // so that requires restarting the session with the Changelog Server.
+    changelogServers = configuration.getChangelogServer();
+    maxReceiveQueue = configuration.getMaxReceiveQueue();
+    maxReceiveDelay = (int) configuration.getMaxReceiveDelay();
+    maxSendQueue = configuration.getMaxSendQueue();
+    maxSendDelay = (int) configuration.getMaxSendDelay();
+    window = configuration.getWindowSize();
+    heartbeatInterval = configuration.getHeartbeatInterval();
+    broker.changeConfig(changelogServers, maxReceiveQueue, maxReceiveDelay,
+                        maxSendQueue, maxSendDelay, window, heartbeatInterval);
+
+    return new ConfigChangeResult(ResultCode.SUCCESS, false);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public boolean isConfigurationChangeAcceptable(
+         MultimasterDomainCfg configuration, List<String> unacceptableReasons)
+  {
+    return true;
+  }
 }

--
Gitblit v1.10.0