| | |
| | | import java.net.Socket; |
| | | import java.net.UnknownHostException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.HashMap; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.server.ChangelogServerCfg; |
| | | import org.opends.server.api.ConfigurableComponent; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.config.ConfigAttribute; |
| | | import org.opends.server.config.ConfigEntry; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.config.IntegerConfigAttribute; |
| | | import org.opends.server.config.IntegerWithUnitConfigAttribute; |
| | | import org.opends.server.config.StringConfigAttribute; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.messages.MessageHandler; |
| | | import org.opends.server.synchronization.protocol.SocketSession; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | |
| | | * |
| | | * It is responsible for creating the changelog cache and managing it |
| | | */ |
| | | public class Changelog implements Runnable, ConfigurableComponent |
| | | public class Changelog |
| | | implements Runnable, ConfigurableComponent, |
| | | ConfigurationChangeListener<ChangelogServerCfg> |
| | | { |
| | | private short serverId; |
| | | private String serverURL; |
| | |
| | | private boolean runListen = true; |
| | | |
| | | /* The list of changelog servers configured by the administrator */ |
| | | private List<String> changelogServers; |
| | | private Collection<String> changelogServers; |
| | | |
| | | /* This table is used to store the list of dn for which we are currently |
| | | * handling servers. |
| | |
| | | private long trimAge; // the time (in sec) after which the changes must |
| | | // de deleted from the persistent storage. |
| | | |
| | | static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; |
| | | static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id"; |
| | | static final String CHANGELOG_PORT_ATTR = "ds-cfg-changelog-port"; |
| | | static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size"; |
| | | static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size"; |
| | | static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-directory"; |
| | | static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay"; |
| | | |
| | | |
| | | static final IntegerConfigAttribute changelogPortStub = |
| | | new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port", |
| | | true, false, false, true, 0, true, 65535); |
| | | |
| | | static final IntegerConfigAttribute serverIdStub = |
| | | new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false, |
| | | false, true, 0, true, 65535); |
| | | |
| | | static final StringConfigAttribute changelogStub = |
| | | new StringConfigAttribute(CHANGELOG_SERVER_ATTR, |
| | | "changelog server information", true, true, false); |
| | | |
| | | static final IntegerConfigAttribute windowStub = |
| | | new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size", |
| | | false, false, false, true, 0, false, 0); |
| | | |
| | | static final IntegerConfigAttribute queueSizeStub = |
| | | new IntegerConfigAttribute(QUEUE_SIZE_ATTR, "changelog queue size", |
| | | false, false, false, true, 0, false, 0); |
| | | |
| | | static final StringConfigAttribute dbDirnameStub = |
| | | new StringConfigAttribute(CHANGELOG_DIR_PATH_ATTR, |
| | | "changelog storage directory path", false, false, true); |
| | | |
| | | /** |
| | | * The set of time units that will be used for expressing the |
| | | * changelog purge delay. |
| | | */ |
| | | private static final LinkedHashMap<String,Double> purgeTimeUnits = |
| | | new LinkedHashMap<String,Double>(); |
| | | |
| | | static |
| | | { |
| | | purgeTimeUnits.put(TIME_UNIT_SECONDS_ABBR, 1D); |
| | | purgeTimeUnits.put(TIME_UNIT_SECONDS_FULL, 1D); |
| | | purgeTimeUnits.put(TIME_UNIT_MINUTES_ABBR, 60D); |
| | | purgeTimeUnits.put(TIME_UNIT_MINUTES_FULL, 1D); |
| | | purgeTimeUnits.put(TIME_UNIT_HOURS_ABBR, 60*60D); |
| | | purgeTimeUnits.put(TIME_UNIT_HOURS_FULL, 60*60D); |
| | | purgeTimeUnits.put(TIME_UNIT_DAYS_ABBR, 24*60*60D); |
| | | purgeTimeUnits.put(TIME_UNIT_DAYS_FULL, 24*60*60D); |
| | | } |
| | | |
| | | static final IntegerWithUnitConfigAttribute purgeDelayStub = |
| | | new IntegerWithUnitConfigAttribute(PURGE_DELAY_ATTR, |
| | | "changelog purge delay", false, purgeTimeUnits, true, 0, false, 0); |
| | | |
| | | /** |
| | | * Check if a ConfigEntry is valid. |
| | | * @param config The config entry that needs to be checked. |
| | | * @param unacceptableReason A description of the reason why the config entry |
| | | * is not acceptable (if return is false). |
| | | * @return a boolean indicating if the configEntry is valid. |
| | | */ |
| | | public static boolean checkConfigEntry(ConfigEntry config, |
| | | StringBuilder unacceptableReason) |
| | | { |
| | | try |
| | | { |
| | | IntegerConfigAttribute changelogPortAttr; |
| | | changelogPortAttr = |
| | | (IntegerConfigAttribute) config.getConfigAttribute(changelogPortStub); |
| | | |
| | | /* The config must provide a changelog port number |
| | | */ |
| | | if (changelogPortAttr == null) |
| | | { |
| | | unacceptableReason.append( |
| | | MessageHandler.getMessage(MSGID_NEED_CHANGELOG_PORT, |
| | | config.getDN().toString()) ); |
| | | } |
| | | |
| | | /* |
| | | * read the server Id information |
| | | * this is a single valued integer, its value must fit on a |
| | | * short integer |
| | | */ |
| | | IntegerConfigAttribute serverIdAttr = |
| | | (IntegerConfigAttribute) config.getConfigAttribute(serverIdStub); |
| | | |
| | | if (serverIdAttr == null) |
| | | { |
| | | unacceptableReason.append( |
| | | MessageHandler.getMessage(MSGID_NEED_SERVER_ID, |
| | | config.getDN().toString()) ); |
| | | } |
| | | |
| | | return true; |
| | | } catch (ConfigException e) |
| | | { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Creates a new Changelog using the provided configuration entry. |
| | | * |
| | | * @param config The configuration entry where configuration can be found. |
| | | * @throws ConfigException When Configuration entry is invalid. |
| | | * @param configuration The configuration of this changelog. |
| | | * @throws ConfigException When Configuration is invalid. |
| | | */ |
| | | public Changelog(ConfigEntry config) throws ConfigException |
| | | public Changelog(ChangelogServerCfg configuration) throws ConfigException |
| | | { |
| | | shutdown = false; |
| | | runListen = true; |
| | | |
| | | IntegerConfigAttribute changelogPortAttr = |
| | | (IntegerConfigAttribute) config.getConfigAttribute(changelogPortStub); |
| | | /* if there is no changelog port configured, this process must not be a |
| | | * changelog server |
| | | */ |
| | | if (changelogPortAttr == null) |
| | | { |
| | | throw new ConfigException(MSGID_NEED_CHANGELOG_PORT, |
| | | MessageHandler.getMessage(MSGID_NEED_CHANGELOG_PORT, |
| | | config.getDN().toString()) ); |
| | | } |
| | | int changelogPort = changelogPortAttr.activeIntValue(); |
| | | configAttributes.add(changelogPortAttr); |
| | | |
| | | /* |
| | | * read the server Id information |
| | | * this is a single valued integer, its value must fit on a |
| | | * short integer |
| | | */ |
| | | IntegerConfigAttribute serverIdAttr = |
| | | (IntegerConfigAttribute) config.getConfigAttribute(serverIdStub); |
| | | |
| | | if (serverIdAttr == null) |
| | | { |
| | | throw new ConfigException(MSGID_NEED_SERVER_ID, |
| | | MessageHandler.getMessage(MSGID_NEED_SERVER_ID, |
| | | config.getDN().toString()) ); |
| | | } |
| | | changelogServerId = (short) serverIdAttr.activeIntValue(); |
| | | configAttributes.add(serverIdAttr); |
| | | |
| | | /* |
| | | * read the centralized changelog server configuration |
| | | * this is a multivalued attribute |
| | | */ |
| | | StringConfigAttribute changelogServer = |
| | | (StringConfigAttribute) config.getConfigAttribute(changelogStub); |
| | | changelogServers = new ArrayList<String>(); |
| | | if (changelogServer != null) |
| | | { |
| | | for (String serverURL : changelogServer.activeValues()) |
| | | { |
| | | String[] splitStrings = serverURL.split(":"); |
| | | try |
| | | { |
| | | changelogServers.add( |
| | | InetAddress.getByName(splitStrings[0]).getHostAddress() |
| | | + ":" + splitStrings[1]); |
| | | } catch (UnknownHostException e) |
| | | { |
| | | throw new ConfigException(MSGID_UNKNOWN_HOSTNAME, |
| | | e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | } |
| | | configAttributes.add(changelogServer); |
| | | |
| | | IntegerConfigAttribute windowAttr = |
| | | (IntegerConfigAttribute) config.getConfigAttribute(windowStub); |
| | | if (windowAttr == null) |
| | | rcvWindow = 100; // Attribute is not present : use the default value |
| | | else |
| | | { |
| | | rcvWindow = windowAttr.activeIntValue(); |
| | | configAttributes.add(windowAttr); |
| | | } |
| | | |
| | | IntegerConfigAttribute queueSizeAttr = |
| | | (IntegerConfigAttribute) config.getConfigAttribute(queueSizeStub); |
| | | if (queueSizeAttr == null) |
| | | queueSize = 10000; // Attribute is not present : use the default value |
| | | else |
| | | { |
| | | queueSize = queueSizeAttr.activeIntValue(); |
| | | configAttributes.add(queueSizeAttr); |
| | | } |
| | | |
| | | /* |
| | | * read the storage directory path attribute |
| | | */ |
| | | StringConfigAttribute dbDirnameAttr = |
| | | (StringConfigAttribute) config.getConfigAttribute(dbDirnameStub); |
| | | if (dbDirnameAttr == null) |
| | | int changelogPort = configuration.getChangelogPort(); |
| | | changelogServerId = (short) configuration.getChangelogServerId(); |
| | | changelogServers = configuration.getChangelogServer(); |
| | | if (changelogServers == null) |
| | | changelogServers = new ArrayList<String>(); |
| | | queueSize = configuration.getQueueSize(); |
| | | trimAge = configuration.getChangelogPurgeDelay(); |
| | | dbDirname = configuration.getChangelogDbDirectory(); |
| | | rcvWindow = configuration.getWindowSize(); |
| | | if (dbDirname == null) |
| | | { |
| | | dbDirname = "changelogDb"; |
| | | } |
| | | else |
| | | { |
| | | dbDirname = dbDirnameAttr.activeValue(); |
| | | configAttributes.add(changelogServer); |
| | | } |
| | | // Exists or Create |
| | | // Chech that this path exists or create it. |
| | | File f = getFileForPath(dbDirname); |
| | | try |
| | | { |
| | |
| | | e.getMessage() + " " + getFileForPath(dbDirname)); |
| | | } |
| | | |
| | | /* |
| | | * Read the Purge Delay (trim age) attribute |
| | | */ |
| | | IntegerWithUnitConfigAttribute purgeDelayAttr = |
| | | (IntegerWithUnitConfigAttribute) config.getConfigAttribute( |
| | | purgeDelayStub); |
| | | if (purgeDelayAttr == null) |
| | | trimAge = 24*60*60; // not present : use the default value : 1 day |
| | | else |
| | | { |
| | | trimAge = purgeDelayAttr.activeCalculatedValue(); |
| | | configAttributes.add(purgeDelayAttr); |
| | | } |
| | | |
| | | initialize(changelogServerId, changelogPort); |
| | | |
| | | configDn = config.getDN(); |
| | | DirectoryServer.registerConfigurableComponent(this); |
| | | configuration.addChangeListener(this); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | for (String serverURL : changelogServers) |
| | | { |
| | | if ((serverURL.compareTo(localURL) != 0) && |
| | | if ((serverURL.compareTo(this.serverURL) != 0) && |
| | | (!connectedChangelogs.contains(serverURL))) |
| | | { |
| | | this.connect(serverURL, changelogCache.getBaseDn()); |
| | |
| | | } |
| | | |
| | | dbEnv.shutdown(); |
| | | DirectoryServer.deregisterConfigurableComponent(this); |
| | | } |
| | | |
| | | |
| | |
| | | { |
| | | return trimAge * 1000; |
| | | } |
| | | |
| | | /** |
| | | * Check if the provided configuration is acceptable for add. |
| | | * |
| | | * @param configuration The configuration to check. |
| | | * @param unacceptableReasons When the configuration is not acceptable, this |
| | | * table is use to return the reasons why this |
| | | * configuration is not acceptbale. |
| | | * |
| | | * @return true if the configuration is acceptable, false other wise. |
| | | */ |
| | | public static boolean isConfigurationAcceptable( |
| | | ChangelogServerCfg configuration, List<String> unacceptableReasons) |
| | | { |
| | | int port = configuration.getChangelogPort(); |
| | | |
| | | try |
| | | { |
| | | ServerSocket tmpSocket = new ServerSocket(); |
| | | tmpSocket.bind(new InetSocketAddress(port)); |
| | | tmpSocket.close(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | String message = getMessage(MSGID_COULD_NOT_BIND_CHANGELOG, port, |
| | | e.getMessage()); |
| | | unacceptableReasons.add(message); |
| | | return false; |
| | | } |
| | | |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | ChangelogServerCfg configuration) |
| | | { |
| | | // TODO : implement this |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public boolean isConfigurationChangeAcceptable( |
| | | ChangelogServerCfg configuration, List<String> unacceptableReasons) |
| | | { |
| | | // TODO : implement this |
| | | return true; |
| | | } |
| | | } |