| | |
| | | import static org.opends.server.synchronization.common.LogMessages.*; |
| | | import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME; |
| | | import static org.opends.server.synchronization.protocol.OperationContext.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | import static org.opends.server.util.StaticUtils.createEntry; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.IOException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.Collection; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.server.MultimasterDomainCfg; |
| | | import org.opends.server.api.Backend; |
| | | import org.opends.server.api.ConfigurableComponent; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.api.SynchronizationProvider; |
| | | import org.opends.server.backends.jeb.BackendImpl; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.backends.task.TaskState; |
| | | import org.opends.server.config.BooleanConfigAttribute; |
| | | import org.opends.server.config.ConfigAttribute; |
| | | import org.opends.server.config.ConfigEntry; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.config.DNConfigAttribute; |
| | | import org.opends.server.config.IntegerConfigAttribute; |
| | | import org.opends.server.config.IntegerWithUnitConfigAttribute; |
| | | import org.opends.server.config.StringConfigAttribute; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | |
| | | import org.opends.server.core.ModifyDNOperation; |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.core.Operation; |
| | | import org.opends.server.messages.MessageHandler; |
| | | import org.opends.server.protocols.asn1.ASN1Exception; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | |
| | | * handle protocol messages from the changelog server. |
| | | */ |
| | | public class SynchronizationDomain extends DirectoryThread |
| | | implements ConfigurableComponent |
| | | implements ConfigurationChangeListener<MultimasterDomainCfg> |
| | | { |
| | | private SynchronizationMonitor monitor; |
| | | |
| | |
| | | private int listenerThreadNumber = 10; |
| | | private boolean receiveStatus = true; |
| | | |
| | | private List<String> changelogServers; |
| | | private Collection<String> changelogServers; |
| | | |
| | | private DN baseDN; |
| | | |
| | |
| | | |
| | | private boolean shutdown = false; |
| | | |
| | | private DN configDn; |
| | | |
| | | private InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | |
| | |
| | | private boolean disabled = false; |
| | | private boolean stateSavingDisabled = false; |
| | | |
| | | static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; |
| | | static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn"; |
| | | static final String SERVER_ID_ATTR = "ds-cfg-directory-server-id"; |
| | | static final String RECEIVE_STATUS = "ds-cfg-receive-status"; |
| | | static final String MAX_RECEIVE_QUEUE = "ds-cfg-max-receive-queue"; |
| | | static final String MAX_RECEIVE_DELAY = "ds-cfg-max-receive-delay"; |
| | | static final String MAX_SEND_QUEUE = "ds-cfg-max-send-queue"; |
| | | static final String MAX_SEND_DELAY = "ds-cfg-max-send-delay"; |
| | | static final String WINDOW_SIZE = "ds-cfg-window-size"; |
| | | static final String HEARTBEAT_INTERVAL = "ds-cfg-heartbeat-interval"; |
| | | private int window = 100; |
| | | |
| | | private static final StringConfigAttribute changelogStub = |
| | | new StringConfigAttribute(CHANGELOG_SERVER_ATTR, |
| | | "changelog server information", true, true, false); |
| | | |
| | | private static final IntegerConfigAttribute serverIdStub = |
| | | new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false, |
| | | false, true, 0, true, 65535); |
| | | |
| | | private static final DNConfigAttribute baseDnStub = |
| | | new DNConfigAttribute(BASE_DN_ATTR, "synchronization base DN", |
| | | true, false, false); |
| | | |
| | | private static final BooleanConfigAttribute receiveStatusStub = |
| | | new BooleanConfigAttribute(RECEIVE_STATUS, "receive status", false); |
| | | |
| | | |
| | | /** |
| | | * The set of time units that will be used for expressing the heartbeat |
| | | * interval. |
| | | */ |
| | | private static final LinkedHashMap<String,Double> timeUnits = |
| | | new LinkedHashMap<String,Double>(); |
| | | |
| | | |
| | | |
| | | static |
| | | { |
| | | timeUnits.put(TIME_UNIT_MILLISECONDS_ABBR, 1D); |
| | | timeUnits.put(TIME_UNIT_MILLISECONDS_FULL, 1D); |
| | | timeUnits.put(TIME_UNIT_SECONDS_ABBR, 1000D); |
| | | timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new SynchronizationDomain using configuration from configEntry. |
| | | * |
| | | * @param configEntry The ConfigEntry to use to read the configuration of this |
| | | * SynchronizationDomain. |
| | | * @param configuration The configuration of this SynchronizationDomain. |
| | | * @throws ConfigException In case of invalid configuration. |
| | | */ |
| | | public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException |
| | | public SynchronizationDomain(MultimasterDomainCfg configuration) |
| | | throws ConfigException |
| | | { |
| | | super("Synchronization flush"); |
| | | |
| | | /* |
| | | * read the centralized changelog server configuration |
| | | * this is a multivalued attribute |
| | | */ |
| | | StringConfigAttribute changelogServer = |
| | | (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub); |
| | | |
| | | if (changelogServer == null) |
| | | { |
| | | throw new ConfigException(MSGID_NEED_CHANGELOG_SERVER, |
| | | MessageHandler.getMessage(MSGID_NEED_CHANGELOG_SERVER, |
| | | configEntry.getDN().toString()) ); |
| | | } |
| | | changelogServers = changelogServer.activeValues(); |
| | | configAttributes.add(changelogServer); |
| | | |
| | | /* |
| | | * read the server Id information |
| | | * this is a single valued integer, its value must fit on a short integer |
| | | */ |
| | | IntegerConfigAttribute serverIdAttr = |
| | | (IntegerConfigAttribute) configEntry.getConfigAttribute(serverIdStub); |
| | | if (serverIdAttr == null) |
| | | { |
| | | throw new ConfigException(MSGID_NEED_SERVER_ID, |
| | | MessageHandler.getMessage(MSGID_NEED_SERVER_ID, |
| | | configEntry.getDN().toString()) ); |
| | | } |
| | | serverId = (short) serverIdAttr.activeIntValue(); |
| | | configAttributes.add(serverIdAttr); |
| | | |
| | | /* |
| | | * read the base DN |
| | | */ |
| | | DNConfigAttribute baseDn = |
| | | (DNConfigAttribute) configEntry.getConfigAttribute(baseDnStub); |
| | | if (baseDn == null) |
| | | baseDN = null; // Attribute is not present : don't set a limit |
| | | else |
| | | baseDN = baseDn.activeValue(); |
| | | configAttributes.add(baseDn); |
| | | // Read the configuration parameters. |
| | | changelogServers = configuration.getChangelogServer(); |
| | | serverId = (short) configuration.getServerId(); |
| | | baseDN = configuration.getSynchronizationDN(); |
| | | maxReceiveQueue = configuration.getMaxReceiveQueue(); |
| | | maxReceiveDelay = (int) configuration.getMaxReceiveDelay(); |
| | | maxSendQueue = configuration.getMaxSendQueue(); |
| | | maxSendDelay = (int) configuration.getMaxSendDelay(); |
| | | window = configuration.getWindowSize(); |
| | | heartbeatInterval = configuration.getHeartbeatInterval(); |
| | | |
| | | /* |
| | | * Modify conflicts are solved for all suffixes but the schema suffix |
| | |
| | | solveConflictFlag = true; |
| | | } |
| | | |
| | | /* |
| | | * Create a new Persistent Server State that will be used to store |
| | | * the last ChangeNmber seen from all LDAP servers in the topology. |
| | | */ |
| | | state = new PersistentServerState(baseDN); |
| | | |
| | | /* |
| | | * Read the Receive Status. |
| | | * Create a Synchronization monitor object responsible for publishing |
| | | * monitoring information below cn=monitor. |
| | | */ |
| | | BooleanConfigAttribute receiveStatusAttr = (BooleanConfigAttribute) |
| | | configEntry.getConfigAttribute(receiveStatusStub); |
| | | if (receiveStatusAttr != null) |
| | | { |
| | | receiveStatus = receiveStatusAttr.activeValue(); |
| | | configAttributes.add(receiveStatusAttr); |
| | | } |
| | | |
| | | /* |
| | | * read the synchronization flow control configuration. |
| | | */ |
| | | IntegerConfigAttribute maxReceiveQueueStub = |
| | | new IntegerConfigAttribute(MAX_RECEIVE_QUEUE, "max receive queue", |
| | | false, false, false, true, 0,false, 0); |
| | | |
| | | IntegerConfigAttribute maxReceiveQueueAttr = (IntegerConfigAttribute) |
| | | configEntry.getConfigAttribute(maxReceiveQueueStub); |
| | | if (maxReceiveQueueAttr == null) |
| | | maxReceiveQueue = 0; // Attribute is not present : don't set a limit |
| | | else |
| | | { |
| | | maxReceiveQueue = maxReceiveQueueAttr.activeIntValue(); |
| | | configAttributes.add(maxReceiveQueueAttr); |
| | | } |
| | | |
| | | IntegerConfigAttribute maxReceiveDelayStub = |
| | | new IntegerConfigAttribute(MAX_RECEIVE_DELAY, "max receive delay", |
| | | false, false, false, true, 0, false, 0); |
| | | IntegerConfigAttribute maxReceiveDelayAttr = (IntegerConfigAttribute) |
| | | configEntry.getConfigAttribute(maxReceiveDelayStub); |
| | | if (maxReceiveDelayAttr == null) |
| | | maxReceiveDelay = 0; // Attribute is not present : don't set a limit |
| | | else |
| | | { |
| | | maxReceiveDelay = maxReceiveDelayAttr.activeIntValue(); |
| | | configAttributes.add(maxReceiveDelayAttr); |
| | | } |
| | | |
| | | IntegerConfigAttribute maxSendQueueStub = |
| | | new IntegerConfigAttribute(MAX_SEND_QUEUE, "max send queue", |
| | | false, false, false, true, 0, false, 0); |
| | | IntegerConfigAttribute maxSendQueueAttr = |
| | | (IntegerConfigAttribute) configEntry.getConfigAttribute(maxSendQueueStub); |
| | | if (maxSendQueueAttr == null) |
| | | maxSendQueue = 0; // Attribute is not present : don't set a limit |
| | | else |
| | | { |
| | | maxSendQueue = maxSendQueueAttr.activeIntValue(); |
| | | configAttributes.add(maxSendQueueAttr); |
| | | } |
| | | |
| | | IntegerConfigAttribute maxSendDelayStub = |
| | | new IntegerConfigAttribute(MAX_SEND_DELAY, "max send delay", |
| | | false, false, false, true, 0, false, 0); |
| | | IntegerConfigAttribute maxSendDelayAttr = |
| | | (IntegerConfigAttribute) configEntry.getConfigAttribute(maxSendDelayStub); |
| | | if (maxSendDelayAttr == null) |
| | | maxSendDelay = 0; // Attribute is not present : don't set a limit |
| | | else |
| | | { |
| | | maxSendDelay = maxSendDelayAttr.activeIntValue(); |
| | | configAttributes.add(maxSendDelayAttr); |
| | | } |
| | | |
| | | Integer window; |
| | | IntegerConfigAttribute windowStub = |
| | | new IntegerConfigAttribute(WINDOW_SIZE, "window size", |
| | | false, false, false, true, 0, false, 0); |
| | | IntegerConfigAttribute windowAttr = |
| | | (IntegerConfigAttribute) configEntry.getConfigAttribute(windowStub); |
| | | if (windowAttr == null) |
| | | window = 100; // Attribute is not present : use the default value |
| | | else |
| | | { |
| | | window = windowAttr.activeIntValue(); |
| | | configAttributes.add(windowAttr); |
| | | } |
| | | |
| | | IntegerWithUnitConfigAttribute heartbeatStub = |
| | | new IntegerWithUnitConfigAttribute(HEARTBEAT_INTERVAL, |
| | | "heartbeat interval", |
| | | false, timeUnits, true, 0, false, 0); |
| | | IntegerWithUnitConfigAttribute heartbeatAttr = |
| | | (IntegerWithUnitConfigAttribute) |
| | | configEntry.getConfigAttribute(heartbeatStub); |
| | | if (heartbeatAttr == null) |
| | | { |
| | | // Attribute is not present : use the default value |
| | | heartbeatInterval = 1000; |
| | | } |
| | | else |
| | | { |
| | | heartbeatInterval = heartbeatAttr.activeCalculatedValue(); |
| | | configAttributes.add(heartbeatAttr); |
| | | } |
| | | |
| | | configDn = configEntry.getDN(); |
| | | DirectoryServer.registerConfigurableComponent(this); |
| | | |
| | | monitor = new SynchronizationMonitor(this); |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on the synchronization domain. |
| | | */ |
| | | changeNumberGenerator = new ChangeNumberGenerator(serverId, state); |
| | | |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | |
| | | * should we stop the modifications ? |
| | | */ |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public DN getConfigurableComponentEntryDN() |
| | | { |
| | | return configDn; |
| | | // listen for changes on the configuration |
| | | configuration.addChangeListener(this); |
| | | } |
| | | |
| | | /** |
| | |
| | | return configAttributes; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public boolean hasAcceptableConfiguration(ConfigEntry configEntry, |
| | | List<String> unacceptableReasons) |
| | | { |
| | | boolean acceptable = true; |
| | | StringConfigAttribute changelog = null; |
| | | try |
| | | { |
| | | changelog = (StringConfigAttribute) |
| | | configEntry.getConfigAttribute(changelogStub); |
| | | } catch (ConfigException e) |
| | | { |
| | | acceptable = false; |
| | | unacceptableReasons.add("Need at least one changelog server."); |
| | | } |
| | | if (changelog == null) |
| | | { |
| | | acceptable = false; |
| | | unacceptableReasons.add("Need at least one changelog server."); |
| | | } |
| | | return acceptable; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public ConfigChangeResult applyNewConfiguration(ConfigEntry configEntry, |
| | | boolean detailedResults) |
| | | { |
| | | StringConfigAttribute changelog = null; |
| | | List<String> newChangelogServers; |
| | | boolean newReceiveStatus; |
| | | |
| | | try |
| | | { |
| | | /* |
| | | * check if changelog server list changed |
| | | */ |
| | | changelog = (StringConfigAttribute) |
| | | configEntry.getConfigAttribute(changelogStub); |
| | | |
| | | newChangelogServers = changelog.activeValues(); |
| | | |
| | | boolean sameConf = true; |
| | | for (String s :newChangelogServers) |
| | | if (!changelogServers.contains(s)) |
| | | sameConf = false; |
| | | for (String s : changelogServers) |
| | | if (!newChangelogServers.contains(s)) |
| | | sameConf = false; |
| | | |
| | | if (!sameConf) |
| | | { |
| | | broker.stop(); |
| | | changelogServers = newChangelogServers; |
| | | broker.start(changelogServers); |
| | | } |
| | | |
| | | /* |
| | | * check if reception should be disabled |
| | | */ |
| | | newReceiveStatus = ((BooleanConfigAttribute) |
| | | configEntry.getConfigAttribute(receiveStatusStub)).activeValue(); |
| | | if (newReceiveStatus != receiveStatus) |
| | | { |
| | | /* |
| | | * was disabled and moved to enabled |
| | | */ |
| | | if (newReceiveStatus) |
| | | { |
| | | broker.restartReceive(); |
| | | for (int i=0; i<listenerThreadNumber; i++) |
| | | { |
| | | ListenerThread myThread = new ListenerThread(this); |
| | | myThread.start(); |
| | | synchroThreads.add(myThread); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | /* was enabled and moved to disabled */ |
| | | broker.suspendReceive(); |
| | | // FIXME Need a way to stop these threads. |
| | | // Setting the shutdown flag does not stop them until they have |
| | | // consumed and discarded one more message each. |
| | | // for (ListenerThread thread : synchroThreads) |
| | | // { |
| | | // thread.shutdown(); |
| | | // } |
| | | synchroThreads.clear(); |
| | | } |
| | | receiveStatus = newReceiveStatus; |
| | | } |
| | | |
| | | } catch (Exception e) |
| | | { |
| | | /* this should never happen because the parameters have been |
| | | * validated by hasAcceptableConfiguration |
| | | */ |
| | | return new ConfigChangeResult(ResultCode.OPERATIONS_ERROR, false); |
| | | } |
| | | |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |
| | | } |
| | | |
| | | /** |
| | | * Returns the base DN of this SynchronizationDomain. |
| | |
| | | public void receiveAck(AckMessage ack) |
| | | { |
| | | UpdateMessage update; |
| | | ChangeNumber changeNumber; |
| | | |
| | | changeNumber = ack.getChangeNumber(); |
| | | ChangeNumber changeNumber = ack.getChangeNumber(); |
| | | |
| | | synchronized (pendingChanges) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if a ConfigEntry is valid. |
| | | * @param configEntry The config entry that needs to be checked. |
| | | * @param unacceptableReason A description of the reason why the config entry |
| | | * is not acceptable (if return is false). |
| | | * @return a boolean indicating if the configEntry is valid. |
| | | */ |
| | | public static boolean checkConfigEntry(ConfigEntry configEntry, |
| | | StringBuilder unacceptableReason) |
| | | { |
| | | try |
| | | { |
| | | StringConfigAttribute changelogServer = |
| | | (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub); |
| | | |
| | | if (changelogServer == null) |
| | | { |
| | | unacceptableReason.append( |
| | | MessageHandler.getMessage(MSGID_NEED_CHANGELOG_SERVER, |
| | | configEntry.getDN().toString()) ); |
| | | return false; |
| | | } |
| | | |
| | | /* |
| | | * read the server Id information |
| | | * this is a single valued integer, its value must fit on a short integer |
| | | */ |
| | | IntegerConfigAttribute serverIdAttr = |
| | | (IntegerConfigAttribute) configEntry.getConfigAttribute(serverIdStub); |
| | | if (serverIdAttr == null) |
| | | { |
| | | unacceptableReason.append( |
| | | MessageHandler.getMessage(MSGID_NEED_SERVER_ID, |
| | | configEntry.getDN().toString()) ); |
| | | return false; |
| | | } |
| | | } |
| | | catch (ConfigException e) |
| | | { |
| | | unacceptableReason.append(e.getMessage()); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum receive window size. |
| | | * |
| | | * @return The maximum receive window size. |
| | |
| | | return broker.getNumLostConnections(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Check if the domain solve conflicts. |
| | | * |
| | |
| | | op.setResultCode(ResultCode.SUCCESS); |
| | | synchronize(op); |
| | | } |
| | | |
| | | /** |
| | | * Check if the provided configuration is acceptable for add. |
| | | * |
| | | * @param configuration The configuration to check. |
| | | * @param unacceptableReasons When the configuration is not acceptable, this |
| | | * table is use to return the reasons why this |
| | | * configuration is not acceptbale. |
| | | * |
| | | * @return true if the configuration is acceptable, false other wise. |
| | | */ |
| | | public static boolean isConfigurationAcceptable( |
| | | MultimasterDomainCfg configuration, List<String> unacceptableReasons) |
| | | { |
| | | // Check that there is not already a domain with the same DN |
| | | // TODO : Check that the server id is a short |
| | | DN dn = configuration.getSynchronizationDN(); |
| | | if (MultimasterSynchronization.findDomain(dn,null) != null) |
| | | { |
| | | String message = getMessage(MSGID_SYNC_INVALID_DN, dn.toString()); |
| | | unacceptableReasons.add(message); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | MultimasterDomainCfg configuration) |
| | | { |
| | | // server id and base dn are readonly. |
| | | // The other parameters needs to be renegociated with the Changelog Server. |
| | | // so that requires restarting the session with the Changelog Server. |
| | | changelogServers = configuration.getChangelogServer(); |
| | | maxReceiveQueue = configuration.getMaxReceiveQueue(); |
| | | maxReceiveDelay = (int) configuration.getMaxReceiveDelay(); |
| | | maxSendQueue = configuration.getMaxSendQueue(); |
| | | maxSendDelay = (int) configuration.getMaxSendDelay(); |
| | | window = configuration.getWindowSize(); |
| | | heartbeatInterval = configuration.getHeartbeatInterval(); |
| | | broker.changeConfig(changelogServers, maxReceiveQueue, maxReceiveDelay, |
| | | maxSendQueue, maxSendDelay, window, heartbeatInterval); |
| | | |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public boolean isConfigurationChangeAcceptable( |
| | | MultimasterDomainCfg configuration, List<String> unacceptableReasons) |
| | | { |
| | | return true; |
| | | } |
| | | } |