mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
23.38.2007 18d8dd990ea2072267b32e3200c3291fdd53576a
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -39,14 +39,13 @@
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.synchronization.protocol.OperationContext.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -55,20 +54,18 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.MultimasterDomainCfg;
import org.opends.server.api.Backend;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.BooleanConfigAttribute;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.DNConfigAttribute;
import org.opends.server.config.IntegerConfigAttribute;
import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
@@ -77,7 +74,6 @@
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.messages.MessageHandler;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
@@ -130,7 +126,7 @@
 *  handle protocol messages from the changelog server.
 */
public class SynchronizationDomain extends DirectoryThread
       implements ConfigurableComponent
       implements ConfigurationChangeListener<MultimasterDomainCfg>
{
  private SynchronizationMonitor monitor;
@@ -254,7 +250,7 @@
  private int listenerThreadNumber = 10;
  private boolean receiveStatus = true;
  private List<String> changelogServers;
  private Collection<String> changelogServers;
  private DN baseDN;
@@ -263,8 +259,6 @@
  private boolean shutdown = false;
  private DN configDn;
  private InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
@@ -273,102 +267,30 @@
  private boolean disabled = false;
  private boolean stateSavingDisabled = false;
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
  static final String SERVER_ID_ATTR = "ds-cfg-directory-server-id";
  static final String RECEIVE_STATUS = "ds-cfg-receive-status";
  static final String MAX_RECEIVE_QUEUE = "ds-cfg-max-receive-queue";
  static final String MAX_RECEIVE_DELAY = "ds-cfg-max-receive-delay";
  static final String MAX_SEND_QUEUE = "ds-cfg-max-send-queue";
  static final String MAX_SEND_DELAY = "ds-cfg-max-send-delay";
  static final String WINDOW_SIZE = "ds-cfg-window-size";
  static final String HEARTBEAT_INTERVAL = "ds-cfg-heartbeat-interval";
  private int window = 100;
  private static final StringConfigAttribute changelogStub =
    new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
        "changelog server information", true, true, false);
  private static final IntegerConfigAttribute serverIdStub =
    new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false,
                               false, true, 0, true, 65535);
  private static final DNConfigAttribute baseDnStub =
    new DNConfigAttribute(BASE_DN_ATTR, "synchronization base DN",
                          true, false, false);
  private static final BooleanConfigAttribute receiveStatusStub =
    new BooleanConfigAttribute(RECEIVE_STATUS, "receive status", false);
  /**
   * The set of time units that will be used for expressing the heartbeat
   * interval.
   */
  private static final LinkedHashMap<String,Double> timeUnits =
       new LinkedHashMap<String,Double>();
  static
  {
    timeUnits.put(TIME_UNIT_MILLISECONDS_ABBR, 1D);
    timeUnits.put(TIME_UNIT_MILLISECONDS_FULL, 1D);
    timeUnits.put(TIME_UNIT_SECONDS_ABBR, 1000D);
    timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
  }
  /**
   * Creates a new SynchronizationDomain using configuration from configEntry.
   *
   * @param configEntry The ConfigEntry to use to read the configuration of this
   *                    SynchronizationDomain.
   * @param configuration    The configuration of this SynchronizationDomain.
   * @throws ConfigException In case of invalid configuration.
   */
  public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
  public SynchronizationDomain(MultimasterDomainCfg configuration)
    throws ConfigException
  {
    super("Synchronization flush");
    /*
     * read the centralized changelog server configuration
     * this is a multivalued attribute
     */
    StringConfigAttribute changelogServer =
      (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub);
    if (changelogServer == null)
    {
      throw new ConfigException(MSGID_NEED_CHANGELOG_SERVER,
          MessageHandler.getMessage(MSGID_NEED_CHANGELOG_SERVER,
              configEntry.getDN().toString()) );
    }
    changelogServers = changelogServer.activeValues();
    configAttributes.add(changelogServer);
    /*
     * read the server Id information
     * this is a single valued integer, its value must fit on a short integer
     */
    IntegerConfigAttribute serverIdAttr =
      (IntegerConfigAttribute) configEntry.getConfigAttribute(serverIdStub);
    if (serverIdAttr == null)
    {
      throw new ConfigException(MSGID_NEED_SERVER_ID,
          MessageHandler.getMessage(MSGID_NEED_SERVER_ID,
              configEntry.getDN().toString())  );
    }
    serverId = (short) serverIdAttr.activeIntValue();
    configAttributes.add(serverIdAttr);
    /*
     * read the base DN
     */
    DNConfigAttribute baseDn =
      (DNConfigAttribute) configEntry.getConfigAttribute(baseDnStub);
    if (baseDn == null)
      baseDN = null;  // Attribute is not present : don't set a limit
    else
      baseDN = baseDn.activeValue();
    configAttributes.add(baseDn);
    // Read the configuration parameters.
    changelogServers = configuration.getChangelogServer();
    serverId = (short) configuration.getServerId();
    baseDN = configuration.getSynchronizationDN();
    maxReceiveQueue = configuration.getMaxReceiveQueue();
    maxReceiveDelay = (int) configuration.getMaxReceiveDelay();
    maxSendQueue = configuration.getMaxSendQueue();
    maxSendDelay = (int) configuration.getMaxSendDelay();
    window  = configuration.getWindowSize();
    heartbeatInterval = configuration.getHeartbeatInterval();
    /*
     * Modify conflicts are solved for all suffixes but the schema suffix
@@ -386,114 +308,25 @@
      solveConflictFlag = true;
    }
    /*
     * Create a new Persistent Server State that will be used to store
     * the last ChangeNmber seen from all LDAP servers in the topology.
     */
    state = new PersistentServerState(baseDN);
    /*
     * Read the Receive Status.
     * Create a Synchronization monitor object responsible for publishing
     * monitoring information below cn=monitor.
     */
    BooleanConfigAttribute receiveStatusAttr = (BooleanConfigAttribute)
          configEntry.getConfigAttribute(receiveStatusStub);
    if (receiveStatusAttr != null)
    {
      receiveStatus = receiveStatusAttr.activeValue();
      configAttributes.add(receiveStatusAttr);
    }
    /*
     * read the synchronization flow control configuration.
     */
    IntegerConfigAttribute maxReceiveQueueStub =
      new IntegerConfigAttribute(MAX_RECEIVE_QUEUE, "max receive queue",
                                 false, false, false, true, 0,false, 0);
    IntegerConfigAttribute maxReceiveQueueAttr = (IntegerConfigAttribute)
              configEntry.getConfigAttribute(maxReceiveQueueStub);
    if (maxReceiveQueueAttr == null)
      maxReceiveQueue = 0;  // Attribute is not present : don't set a limit
    else
    {
      maxReceiveQueue = maxReceiveQueueAttr.activeIntValue();
      configAttributes.add(maxReceiveQueueAttr);
    }
    IntegerConfigAttribute maxReceiveDelayStub =
      new IntegerConfigAttribute(MAX_RECEIVE_DELAY, "max receive delay",
                                 false, false, false, true, 0, false, 0);
    IntegerConfigAttribute maxReceiveDelayAttr = (IntegerConfigAttribute)
              configEntry.getConfigAttribute(maxReceiveDelayStub);
    if (maxReceiveDelayAttr == null)
      maxReceiveDelay = 0;  // Attribute is not present : don't set a limit
    else
    {
      maxReceiveDelay = maxReceiveDelayAttr.activeIntValue();
      configAttributes.add(maxReceiveDelayAttr);
    }
    IntegerConfigAttribute maxSendQueueStub =
      new IntegerConfigAttribute(MAX_SEND_QUEUE, "max send queue",
                                 false, false, false, true, 0, false, 0);
    IntegerConfigAttribute maxSendQueueAttr =
      (IntegerConfigAttribute) configEntry.getConfigAttribute(maxSendQueueStub);
    if (maxSendQueueAttr == null)
      maxSendQueue = 0;  // Attribute is not present : don't set a limit
    else
    {
      maxSendQueue = maxSendQueueAttr.activeIntValue();
      configAttributes.add(maxSendQueueAttr);
    }
    IntegerConfigAttribute maxSendDelayStub =
      new IntegerConfigAttribute(MAX_SEND_DELAY, "max send delay",
                                 false, false, false, true, 0, false, 0);
    IntegerConfigAttribute maxSendDelayAttr =
      (IntegerConfigAttribute) configEntry.getConfigAttribute(maxSendDelayStub);
    if (maxSendDelayAttr == null)
      maxSendDelay = 0;  // Attribute is not present : don't set a limit
    else
    {
      maxSendDelay = maxSendDelayAttr.activeIntValue();
      configAttributes.add(maxSendDelayAttr);
    }
    Integer window;
    IntegerConfigAttribute windowStub =
      new IntegerConfigAttribute(WINDOW_SIZE, "window size",
                                 false, false, false, true, 0, false, 0);
    IntegerConfigAttribute windowAttr =
      (IntegerConfigAttribute) configEntry.getConfigAttribute(windowStub);
    if (windowAttr == null)
      window = 100;  // Attribute is not present : use the default value
    else
    {
      window = windowAttr.activeIntValue();
      configAttributes.add(windowAttr);
    }
    IntegerWithUnitConfigAttribute heartbeatStub =
      new IntegerWithUnitConfigAttribute(HEARTBEAT_INTERVAL,
                                         "heartbeat interval",
                                         false, timeUnits, true, 0, false, 0);
    IntegerWithUnitConfigAttribute heartbeatAttr =
      (IntegerWithUnitConfigAttribute)
           configEntry.getConfigAttribute(heartbeatStub);
    if (heartbeatAttr == null)
    {
      // Attribute is not present : use the default value
      heartbeatInterval = 1000;
    }
    else
    {
      heartbeatInterval = heartbeatAttr.activeCalculatedValue();
      configAttributes.add(heartbeatAttr);
    }
    configDn = configEntry.getDN();
    DirectoryServer.registerConfigurableComponent(this);
    monitor = new SynchronizationMonitor(this);
    DirectoryServer.registerMonitorProvider(monitor);
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the synchronization domain.
     */
    changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
    /*
     * create the broker object used to publish and receive changes
     */
@@ -519,14 +352,9 @@
      * should we stop the modifications ?
      */
    }
  }
  /**
   * {@inheritDoc}
   */
  public DN getConfigurableComponentEntryDN()
  {
    return configDn;
    // listen for changes on the configuration
    configuration.addChangeListener(this);
  }
  /**
@@ -537,112 +365,6 @@
    return configAttributes;
  }
  /**
   * {@inheritDoc}
   */
  public boolean hasAcceptableConfiguration(ConfigEntry configEntry,
      List<String> unacceptableReasons)
  {
    boolean acceptable = true;
    StringConfigAttribute changelog = null;
    try
    {
      changelog = (StringConfigAttribute)
                                  configEntry.getConfigAttribute(changelogStub);
    } catch (ConfigException e)
    {
      acceptable = false;
      unacceptableReasons.add("Need at least one changelog server.");
    }
    if (changelog == null)
    {
      acceptable = false;
      unacceptableReasons.add("Need at least one changelog server.");
    }
    return acceptable;
  }
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult applyNewConfiguration(ConfigEntry configEntry,
      boolean detailedResults)
  {
    StringConfigAttribute changelog = null;
    List<String> newChangelogServers;
    boolean newReceiveStatus;
    try
    {
      /*
       *  check if changelog server list changed
       */
      changelog = (StringConfigAttribute)
                                  configEntry.getConfigAttribute(changelogStub);
      newChangelogServers = changelog.activeValues();
      boolean sameConf = true;
      for (String s :newChangelogServers)
        if (!changelogServers.contains(s))
          sameConf = false;
      for (String s : changelogServers)
        if (!newChangelogServers.contains(s))
          sameConf = false;
      if (!sameConf)
      {
        broker.stop();
        changelogServers = newChangelogServers;
        broker.start(changelogServers);
      }
      /*
       * check if reception should be disabled
       */
      newReceiveStatus = ((BooleanConfigAttribute)
               configEntry.getConfigAttribute(receiveStatusStub)).activeValue();
      if (newReceiveStatus != receiveStatus)
      {
        /*
         * was disabled and moved to enabled
         */
        if (newReceiveStatus)
        {
          broker.restartReceive();
          for (int i=0; i<listenerThreadNumber; i++)
          {
            ListenerThread myThread = new ListenerThread(this);
            myThread.start();
            synchroThreads.add(myThread);
          }
        }
        else
        {
          /* was enabled and moved to disabled */
          broker.suspendReceive();
          // FIXME Need a way to stop these threads.
          // Setting the shutdown flag does not stop them until they have
          // consumed and discarded one more message each.
//          for (ListenerThread thread : synchroThreads)
//          {
//            thread.shutdown();
//          }
          synchroThreads.clear();
        }
        receiveStatus = newReceiveStatus;
      }
    } catch (Exception e)
    {
      /* this should never happen because the parameters have been
       * validated by hasAcceptableConfiguration
       */
      return new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false);
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
  /**
   * Returns the base DN of this SynchronizationDomain.
@@ -1041,9 +763,7 @@
  public void receiveAck(AckMessage ack)
  {
    UpdateMessage update;
    ChangeNumber changeNumber;
    changeNumber = ack.getChangeNumber();
    ChangeNumber changeNumber = ack.getChangeNumber();
    synchronized (pendingChanges)
    {
@@ -1933,51 +1653,6 @@
  }
  /**
   * Check if a ConfigEntry is valid.
   * @param configEntry The config entry that needs to be checked.
   * @param unacceptableReason A description of the reason why the config entry
   *                           is not acceptable (if return is false).
   * @return a boolean indicating if the configEntry is valid.
   */
  public static boolean checkConfigEntry(ConfigEntry configEntry,
      StringBuilder unacceptableReason)
  {
    try
    {
    StringConfigAttribute changelogServer =
      (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub);
    if (changelogServer == null)
    {
      unacceptableReason.append(
          MessageHandler.getMessage(MSGID_NEED_CHANGELOG_SERVER,
          configEntry.getDN().toString()) );
      return false;
    }
    /*
     * read the server Id information
     * this is a single valued integer, its value must fit on a short integer
     */
    IntegerConfigAttribute serverIdAttr =
      (IntegerConfigAttribute) configEntry.getConfigAttribute(serverIdStub);
    if (serverIdAttr == null)
    {
      unacceptableReason.append(
          MessageHandler.getMessage(MSGID_NEED_SERVER_ID,
              configEntry.getDN().toString()) );
      return false;
    }
    }
    catch (ConfigException e)
    {
      unacceptableReason.append(e.getMessage());
      return false;
    }
    return true;
  }
  /**
   * Get the maximum receive window size.
   *
   * @return The maximum receive window size.
@@ -2026,7 +1701,6 @@
    return broker.getNumLostConnections();
  }
  /**
   * Check if the domain solve conflicts.
   *
@@ -3109,4 +2783,60 @@
    op.setResultCode(ResultCode.SUCCESS);
    synchronize(op);
  }
  /**
   * Check if the provided configuration is acceptable for add.
   *
   * @param configuration The configuration to check.
   * @param unacceptableReasons When the configuration is not acceptable, this
   *                            table is use to return the reasons why this
   *                            configuration is not acceptbale.
   *
   * @return true if the configuration is acceptable, false other wise.
   */
  public static boolean isConfigurationAcceptable(
      MultimasterDomainCfg configuration, List<String> unacceptableReasons)
  {
    // Check that there is not already a domain with the same DN
    // TODO : Check that the server id is a short
    DN dn = configuration.getSynchronizationDN();
    if (MultimasterSynchronization.findDomain(dn,null) != null)
    {
      String message = getMessage(MSGID_SYNC_INVALID_DN, dn.toString());
      unacceptableReasons.add(message);
      return false;
    }
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult applyConfigurationChange(
         MultimasterDomainCfg configuration)
  {
    // server id and base dn are readonly.
    // The other parameters needs to be renegociated with the Changelog Server.
    // so that requires restarting the session with the Changelog Server.
    changelogServers = configuration.getChangelogServer();
    maxReceiveQueue = configuration.getMaxReceiveQueue();
    maxReceiveDelay = (int) configuration.getMaxReceiveDelay();
    maxSendQueue = configuration.getMaxSendQueue();
    maxSendDelay = (int) configuration.getMaxSendDelay();
    window = configuration.getWindowSize();
    heartbeatInterval = configuration.getHeartbeatInterval();
    broker.changeConfig(changelogServers, maxReceiveQueue, maxReceiveDelay,
                        maxSendQueue, maxSendDelay, window, heartbeatInterval);
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
  /**
   * {@inheritDoc}
   */
  public boolean isConfigurationChangeAcceptable(
         MultimasterDomainCfg configuration, List<String> unacceptableReasons)
  {
    return true;
  }
}