| | |
| | | * This class implements the bulk part of the.of the Directory Server side |
| | | * of the replication code. |
| | | * It contains the root method for publishing a change, |
| | | * processing a change received from the changelog service, |
| | | * processing a change received from the replicationServer service, |
| | | * handle conflict resolution, |
| | | * handle protocol messages from the changelog server. |
| | | * handle protocol messages from the replicationServer. |
| | | */ |
| | | public class ReplicationDomain extends DirectoryThread |
| | | implements ConfigurationChangeListener<MultimasterDomainCfg> |
| | |
| | | private ReplicationMonitor monitor; |
| | | |
| | | private ChangeNumberGenerator changeNumberGenerator; |
| | | private ChangelogBroker broker; |
| | | private ReplicationBroker broker; |
| | | |
| | | private List<ListenerThread> synchroThreads = |
| | | new ArrayList<ListenerThread>(); |
| | |
| | | private int listenerThreadNumber = 10; |
| | | private boolean receiveStatus = true; |
| | | |
| | | private Collection<String> changelogServers; |
| | | private Collection<String> replicationServers; |
| | | |
| | | private DN baseDN; |
| | | |
| | |
| | | super("replication flush"); |
| | | |
| | | // Read the configuration parameters. |
| | | changelogServers = configuration.getChangelogServer(); |
| | | replicationServers = configuration.getChangelogServer(); |
| | | serverId = (short) configuration.getServerId(); |
| | | baseDN = configuration.getSynchronizationDN(); |
| | | maxReceiveQueue = configuration.getMaxReceiveQueue(); |
| | |
| | | */ |
| | | try |
| | | { |
| | | broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue, |
| | | broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue, |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay, window, |
| | | heartbeatInterval); |
| | | synchronized (broker) |
| | | { |
| | | broker.start(changelogServers); |
| | | broker.start(replicationServers); |
| | | if (!receiveStatus) |
| | | broker.suspendReceive(); |
| | | } |
| | |
| | | |
| | | } catch (Exception e) |
| | | { |
| | | /* TODO should mark that changelog service is |
| | | /* TODO should mark that replicationServer service is |
| | | * not available, log an error and retry upon timeout |
| | | * should we stop the modifications ? |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * Receives an update message from the changelog. |
| | | * Receives an update message from the replicationServer. |
| | | * also responsible for updating the list of pending changes |
| | | * @return the received message - null if none |
| | | */ |
| | |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // changelog did not find any import source. |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | } |
| | | } |
| | |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName()); |
| | | |
| | | // stop the ChangelogBroker |
| | | // stop the ReplicationBroker |
| | | broker.stop(); |
| | | } |
| | | |
| | | /** |
| | | * Get the name of the changelog server to which this domain is currently |
| | | * Get the name of the replicationServer to which this domain is currently |
| | | * connected. |
| | | * |
| | | * @return the name of the changelog server to which this domain |
| | | * @return the name of the replicationServer to which this domain |
| | | * is currently connected. |
| | | */ |
| | | public String getChangelogServer() |
| | | public String getReplicationServer() |
| | | { |
| | | return broker.getChangelogServer(); |
| | | return broker.getReplicationServer(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Push all committed local changes to the changelog service. |
| | | * Push all committed local changes to the replicationServer service. |
| | | * PRECONDITION : The pendingChanges lock must be held before calling |
| | | * this method. |
| | | */ |
| | |
| | | |
| | | try |
| | | { |
| | | broker.start(changelogServers); |
| | | broker.start(replicationServers); |
| | | } catch (Exception e) |
| | | { |
| | | /* TODO should mark that changelog service is |
| | | /* TODO should mark that replicationServer service is |
| | | * not available, log an error and retry upon timeout |
| | | * should we stop the modifications ? |
| | | */ |
| | |
| | | |
| | | // Re-exchange state with SS |
| | | broker.stop(); |
| | | broker.start(changelogServers); |
| | | broker.start(replicationServers); |
| | | |
| | | } |
| | | catch(Exception e) |
| | |
| | | MultimasterDomainCfg configuration) |
| | | { |
| | | // server id and base dn are readonly. |
| | | // The other parameters needs to be renegociated with the Changelog Server. |
| | | // so that requires restarting the session with the Changelog Server. |
| | | changelogServers = configuration.getChangelogServer(); |
| | | // The other parameters needs to be renegociated with the ReplicationServer. |
| | | // so that requires restarting the session with the ReplicationServer. |
| | | replicationServers = configuration.getChangelogServer(); |
| | | maxReceiveQueue = configuration.getMaxReceiveQueue(); |
| | | maxReceiveDelay = (int) configuration.getMaxReceiveDelay(); |
| | | maxSendQueue = configuration.getMaxSendQueue(); |
| | | maxSendDelay = (int) configuration.getMaxSendDelay(); |
| | | window = configuration.getWindowSize(); |
| | | heartbeatInterval = configuration.getHeartbeatInterval(); |
| | | broker.changeConfig(changelogServers, maxReceiveQueue, maxReceiveDelay, |
| | | broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay, |
| | | maxSendQueue, maxSendDelay, window, heartbeatInterval); |
| | | |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |