mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
23.38.2007 18d8dd990ea2072267b32e3200c3291fdd53576a
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -40,26 +40,24 @@
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.ChangelogServerCfg;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.IntegerConfigAttribute;
import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.DirectoryServer;
import org.opends.server.messages.MessageHandler;
import org.opends.server.synchronization.protocol.SocketSession;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.ResultCode;
import com.sleepycat.je.DatabaseException;
@@ -73,7 +71,9 @@
 *
 * It is responsible for creating the changelog cache and managing it
 */
public class Changelog implements Runnable, ConfigurableComponent
public class Changelog
  implements Runnable, ConfigurableComponent,
             ConfigurationChangeListener<ChangelogServerCfg>
{
  private short serverId;
  private String serverURL;
@@ -85,7 +85,7 @@
  private boolean runListen = true;
  /* The list of changelog servers configured by the administrator */
  private List<String> changelogServers;
  private Collection<String> changelogServers;
  /* This table is used to store the list of dn for which we are currently
   * handling servers.
@@ -106,212 +106,30 @@
  private long trimAge; // the time (in sec) after which the  changes must
                        // de deleted from the persistent storage.
  static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
  static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port";
  static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
  static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
  static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory";
  static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay";
  static final IntegerConfigAttribute changelogPortStub =
    new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
      true, false, false, true, 0, true, 65535);
  static final IntegerConfigAttribute serverIdStub =
    new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false,
        false, true, 0, true, 65535);
  static final StringConfigAttribute changelogStub =
    new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
        "changelog server information", true, true, false);
  static final IntegerConfigAttribute windowStub =
    new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
                               false, false, false, true, 0, false, 0);
  static final IntegerConfigAttribute queueSizeStub =
    new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size",
                               false, false, false, true, 0, false, 0);
  static final StringConfigAttribute dbDirnameStub =
    new StringConfigAttribute(CHANGELOG_DIR_PATH_ATTR,
        "changelog storage directory path", false, false, true);
  /**
   * The set of time units that will be used for expressing the
   * changelog purge delay.
   */
  private static final LinkedHashMap<String,Double> purgeTimeUnits =
       new LinkedHashMap<String,Double>();
  static
  {
    purgeTimeUnits.put(TIME_UNIT_SECONDS_ABBR, 1D);
    purgeTimeUnits.put(TIME_UNIT_SECONDS_FULL, 1D);
    purgeTimeUnits.put(TIME_UNIT_MINUTES_ABBR, 60D);
    purgeTimeUnits.put(TIME_UNIT_MINUTES_FULL, 1D);
    purgeTimeUnits.put(TIME_UNIT_HOURS_ABBR, 60*60D);
    purgeTimeUnits.put(TIME_UNIT_HOURS_FULL, 60*60D);
    purgeTimeUnits.put(TIME_UNIT_DAYS_ABBR, 24*60*60D);
    purgeTimeUnits.put(TIME_UNIT_DAYS_FULL, 24*60*60D);
  }
  static final IntegerWithUnitConfigAttribute purgeDelayStub =
    new IntegerWithUnitConfigAttribute(PURGE_DELAY_ATTR,
        "changelog purge delay", false, purgeTimeUnits, true, 0, false, 0);
  /**
   * Check if a ConfigEntry is valid.
   * @param config The config entry that needs to be checked.
   * @param unacceptableReason A description of the reason why the config entry
   *                           is not acceptable (if return is false).
   * @return a boolean indicating if the configEntry is valid.
   */
  public static boolean checkConfigEntry(ConfigEntry config,
      StringBuilder unacceptableReason)
  {
    try
    {
      IntegerConfigAttribute changelogPortAttr;
      changelogPortAttr =
        (IntegerConfigAttribute) config.getConfigAttribute(changelogPortStub);
      /* The config must provide a changelog port number
       */
      if (changelogPortAttr == null)
      {
        unacceptableReason.append(
            MessageHandler.getMessage(MSGID_NEED_CHANGELOG_PORT,
            config.getDN().toString())  );
      }
      /*
       * read the server Id information
       * this is a single valued integer, its value must fit on a
       * short integer
       */
      IntegerConfigAttribute serverIdAttr =
        (IntegerConfigAttribute) config.getConfigAttribute(serverIdStub);
      if (serverIdAttr == null)
      {
        unacceptableReason.append(
            MessageHandler.getMessage(MSGID_NEED_SERVER_ID,
            config.getDN().toString()) );
      }
      return true;
    } catch (ConfigException e)
    {
      return false;
    }
  }
  /**
   * Creates a new Changelog using the provided configuration entry.
   *
   * @param config The configuration entry where configuration can be found.
   * @throws ConfigException When Configuration entry is invalid.
   * @param configuration The configuration of this changelog.
   * @throws ConfigException When Configuration is invalid.
   */
  public Changelog(ConfigEntry config) throws ConfigException
  public Changelog(ChangelogServerCfg configuration) throws ConfigException
  {
    shutdown = false;
    runListen = true;
    IntegerConfigAttribute changelogPortAttr =
      (IntegerConfigAttribute) config.getConfigAttribute(changelogPortStub);
    /* if there is no changelog port configured, this process must not be a
     * changelog server
     */
    if (changelogPortAttr == null)
    {
      throw new ConfigException(MSGID_NEED_CHANGELOG_PORT,
          MessageHandler.getMessage(MSGID_NEED_CHANGELOG_PORT,
              config.getDN().toString())  );
    }
    int changelogPort = changelogPortAttr.activeIntValue();
    configAttributes.add(changelogPortAttr);
    /*
     * read the server Id information
     * this is a single valued integer, its value must fit on a
     * short integer
     */
    IntegerConfigAttribute serverIdAttr =
      (IntegerConfigAttribute) config.getConfigAttribute(serverIdStub);
    if (serverIdAttr == null)
    {
      throw new ConfigException(MSGID_NEED_SERVER_ID,
          MessageHandler.getMessage(MSGID_NEED_SERVER_ID,
              config.getDN().toString())  );
    }
    changelogServerId = (short) serverIdAttr.activeIntValue();
    configAttributes.add(serverIdAttr);
    /*
     * read the centralized changelog server configuration
     * this is a multivalued attribute
     */
    StringConfigAttribute changelogServer =
      (StringConfigAttribute) config.getConfigAttribute(changelogStub);
    changelogServers = new ArrayList<String>();
    if (changelogServer != null)
    {
      for (String serverURL : changelogServer.activeValues())
      {
        String[] splitStrings = serverURL.split(":");
        try
        {
          changelogServers.add(
              InetAddress.getByName(splitStrings[0]).getHostAddress()
              + ":" + splitStrings[1]);
        } catch (UnknownHostException e)
        {
          throw new ConfigException(MSGID_UNKNOWN_HOSTNAME,
              e.getLocalizedMessage());
        }
      }
    }
    configAttributes.add(changelogServer);
    IntegerConfigAttribute windowAttr =
      (IntegerConfigAttribute) config.getConfigAttribute(windowStub);
    if (windowAttr == null)
      rcvWindow = 100;  // Attribute is not present : use the default value
    else
    {
      rcvWindow = windowAttr.activeIntValue();
      configAttributes.add(windowAttr);
    }
    IntegerConfigAttribute queueSizeAttr =
      (IntegerConfigAttribute) config.getConfigAttribute(queueSizeStub);
    if (queueSizeAttr == null)
      queueSize = 10000;  // Attribute is not present : use the default value
    else
    {
      queueSize = queueSizeAttr.activeIntValue();
      configAttributes.add(queueSizeAttr);
    }
    /*
     * read the storage directory path attribute
     */
    StringConfigAttribute dbDirnameAttr =
      (StringConfigAttribute) config.getConfigAttribute(dbDirnameStub);
    if (dbDirnameAttr == null)
    int changelogPort = configuration.getChangelogPort();
    changelogServerId = (short) configuration.getChangelogServerId();
    changelogServers = configuration.getChangelogServer();
    if (changelogServers == null)
      changelogServers = new ArrayList<String>();
    queueSize = configuration.getQueueSize();
    trimAge = configuration.getChangelogPurgeDelay();
    dbDirname = configuration.getChangelogDbDirectory();
    rcvWindow = configuration.getWindowSize();
    if (dbDirname == null)
    {
      dbDirname = "changelogDb";
    }
    else
    {
      dbDirname = dbDirnameAttr.activeValue();
      configAttributes.add(changelogServer);
    }
    // Exists or Create
    // Chech that this path exists or create it.
    File f = getFileForPath(dbDirname);
    try
    {
@@ -326,24 +144,8 @@
          e.getMessage() + " " + getFileForPath(dbDirname));
    }
    /*
     * Read the Purge Delay (trim age) attribute
     */
    IntegerWithUnitConfigAttribute purgeDelayAttr =
      (IntegerWithUnitConfigAttribute) config.getConfigAttribute(
          purgeDelayStub);
    if (purgeDelayAttr == null)
      trimAge = 24*60*60;  // not present : use the default value : 1 day
    else
    {
      trimAge = purgeDelayAttr.activeCalculatedValue();
      configAttributes.add(purgeDelayAttr);
    }
    initialize(changelogServerId, changelogPort);
    configDn = config.getDN();
    DirectoryServer.registerConfigurableComponent(this);
    configuration.addChangeListener(this);
  }
  /**
@@ -452,7 +254,7 @@
         */
        for (String serverURL : changelogServers)
        {
          if ((serverURL.compareTo(localURL) != 0) &&
          if ((serverURL.compareTo(this.serverURL) != 0) &&
              (!connectedChangelogs.contains(serverURL)))
          {
            this.connect(serverURL, changelogCache.getBaseDn());
@@ -629,7 +431,6 @@
    }
    dbEnv.shutdown();
    DirectoryServer.deregisterConfigurableComponent(this);
  }
@@ -659,4 +460,56 @@
  {
    return trimAge * 1000;
  }
  /**
   * Check if the provided configuration is acceptable for add.
   *
   * @param configuration The configuration to check.
   * @param unacceptableReasons When the configuration is not acceptable, this
   *                            table is use to return the reasons why this
   *                            configuration is not acceptbale.
   *
   * @return true if the configuration is acceptable, false other wise.
   */
  public static boolean isConfigurationAcceptable(
      ChangelogServerCfg configuration, List<String> unacceptableReasons)
  {
    int port = configuration.getChangelogPort();
    try
    {
      ServerSocket tmpSocket = new ServerSocket();
      tmpSocket.bind(new InetSocketAddress(port));
      tmpSocket.close();
    }
    catch (Exception e)
    {
      String message = getMessage(MSGID_COULD_NOT_BIND_CHANGELOG, port,
                                  e.getMessage());
      unacceptableReasons.add(message);
      return false;
    }
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult applyConfigurationChange(
      ChangelogServerCfg configuration)
  {
    // TODO : implement this
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
  /**
   * {@inheritDoc}
   */
  public boolean isConfigurationChangeAcceptable(
      ChangelogServerCfg configuration, List<String> unacceptableReasons)
  {
    // TODO : implement this
    return true;
  }
}