| | |
| | | * ServerState class. |
| | | * This object is used to store the last update seem on this server |
| | | * from each server. |
| | | * It is exchanged with the changelog servers at connection establishment time. |
| | | * It is exchanged with the replication servers at connection establishment |
| | | * time. |
| | | */ |
| | | public class ServerState implements Iterable<Short> |
| | | { |
| | |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | |
| | | /** |
| | | * Thread that is used to get messages from the Changelog servers |
| | | * Thread that is used to get messages from the Replication servers |
| | | * and replay them in the current server. |
| | | */ |
| | | public class ListenerThread extends DirectoryThread |
| | |
| | | BackupTaskListener, RestoreTaskListener, ImportTaskListener, |
| | | ExportTaskListener |
| | | { |
| | | static String CHANGELOG_DN = "cn=Changelog Server," + |
| | | "cn=Multimaster Synchronization, cn=Synchronization Providers, cn=config"; |
| | | static String SYNCHRONIZATION_CLASS = |
| | | "ds-cfg-synchronization-provider-config"; |
| | | |
| | | private ChangelogListener changelog = null; |
| | | private ReplicationServerListener replicationServer = null; |
| | | private static Map<DN, ReplicationDomain> domains = |
| | | new HashMap<DN, ReplicationDomain>() ; |
| | | |
| | |
| | | MultimasterSynchronizationProviderCfg configuration) |
| | | throws ConfigException |
| | | { |
| | | changelog = new ChangelogListener(configuration); |
| | | replicationServer = new ReplicationServerListener(configuration); |
| | | |
| | | // Register as an add and delete listener with the root configuration so we |
| | | // can be notified if Multimaster domain entries are added or removed. |
| | |
| | | domain.shutdown(); |
| | | } |
| | | |
| | | // shutdown the Changelog Service if necessary |
| | | if (changelog != null) |
| | | changelog.shutdown(); |
| | | // shutdown the ReplicationServer Service if necessary |
| | | if (replicationServer != null) |
| | | replicationServer.shutdown(); |
| | | |
| | | DirectoryServer.deregisterBackupTaskListener(this); |
| | | DirectoryServer.deregisterRestoreTaskListener(this); |
| File was renamed from opends/src/server/org/opends/server/replication/plugin/ChangelogBroker.java |
| | |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.ChangelogStartMessage; |
| | | import org.opends.server.replication.protocol.ReplServerStartMessage; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ServerStartMessage; |
| | | import org.opends.server.replication.protocol.SocketSession; |
| | |
| | | /** |
| | | * The broker for Multi-master Replication. |
| | | */ |
| | | public class ChangelogBroker implements InternalSearchListener |
| | | public class ReplicationBroker implements InternalSearchListener |
| | | { |
| | | private boolean shutdown = false; |
| | | private Collection<String> servers; |
| | | private boolean connected = false; |
| | | private final Object lock = new Object(); |
| | | private String changelogServer = "Not connected"; |
| | | private String replicationServer = "Not connected"; |
| | | private TreeSet<FakeOperation> replayOperations; |
| | | private ProtocolSession session = null; |
| | | private final ServerState state; |
| | |
| | | |
| | | |
| | | /** |
| | | * Creates a new Changelog Broker for a particular ReplicationDomain. |
| | | * Creates a new ReplicationServer Broker for a particular ReplicationDomain. |
| | | * |
| | | * @param state The ServerState that should be used by this broker |
| | | * when negociating the session with the changelog servers. |
| | | * when negociating the session with the replicationServer. |
| | | * @param baseDn The base DN that should be used by this broker |
| | | * when negociating the session with the changelog servers. |
| | | * when negociating the session with the replicationServer. |
| | | * @param serverID The server ID that should be used by this broker |
| | | * when negociating the session with the changelog servers. |
| | | * when negociating the session with the replicationServer. |
| | | * @param maxReceiveQueue The maximum size of the receive queue to use on |
| | | * the changelog server. |
| | | * the replicationServer. |
| | | * @param maxReceiveDelay The maximum replication delay to use on the |
| | | * changelog server. |
| | | * replicationServer. |
| | | * @param maxSendQueue The maximum size of the send queue to use on |
| | | * the changelog server. |
| | | * @param maxSendDelay The maximum send delay to use on the changelog server. |
| | | * the replicationServer. |
| | | * @param maxSendDelay The maximum send delay to use on the replicationServer. |
| | | * @param window The size of the send and receive window to use. |
| | | * @param heartbeatInterval The interval between heartbeats requested of the |
| | | * changelog server, or zero if no heartbeats are requested. |
| | | * replicationServer, or zero if no heartbeats are requested. |
| | | */ |
| | | public ChangelogBroker(ServerState state, DN baseDn, short serverID, |
| | | public ReplicationBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Start the ChangelogBroker. |
| | | * Start the ReplicationBroker. |
| | | * |
| | | * @param servers list of servers used |
| | | * @throws Exception : in case of errors |
| | |
| | | throws Exception |
| | | { |
| | | /* |
| | | * Open Socket to the Changelog |
| | | * Open Socket to the ReplicationServer |
| | | * Send the Start message |
| | | */ |
| | | shutdown = false; |
| | |
| | | |
| | | |
| | | /** |
| | | * Connect to a Changelog server. |
| | | * Connect to a ReplicationServer. |
| | | * |
| | | * @throws NumberFormatException address was invalid |
| | | * @throws IOException error during connection phase |
| | | */ |
| | | private void connect() throws NumberFormatException, IOException |
| | | { |
| | | ChangelogStartMessage startMsg; |
| | | ReplServerStartMessage startMsg; |
| | | |
| | | // Stop any existing heartbeat monitor from a previous session. |
| | | if (heartbeatMonitor != null) |
| | |
| | | |
| | | |
| | | /* |
| | | * Read the ChangelogStartMessage that should come back. |
| | | * Read the ReplServerStartMessage that should come back. |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | startMsg = (ChangelogStartMessage) session.receive(); |
| | | startMsg = (ReplServerStartMessage) session.receive(); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | /* |
| | | * We must not publish changes to a changelog that has not |
| | | * We must not publish changes to a replicationServer that has not |
| | | * seen all our previous changes because this could cause some |
| | | * other ldap servers to miss those changes. |
| | | * Check that the Changelog has seen all our previous changes. |
| | | * If not, try another changelog server. |
| | | * If no other changelog server has seen all our changes, recover |
| | | * those changes and send them again to any changelog server. |
| | | * Check that the ReplicationServer has seen all our previous changes. |
| | | * If not, try another replicationServer. |
| | | * If no other replicationServer has seen all our changes, recover |
| | | * those changes and send them again to any replicationServer. |
| | | */ |
| | | ChangeNumber changelogMaxChangeNumber = |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | startMsg.getServerState().getMaxChangeNumber(serverID); |
| | | if (changelogMaxChangeNumber == null) |
| | | changelogMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | if (replServerMaxChangeNumber == null) |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); |
| | | ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID); |
| | | if ((ourMaxChangeNumber == null) || |
| | | (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber))) |
| | | (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | changelogServer = ServerAddr.toString(); |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | |
| | | { |
| | | if (checkState == true) |
| | | { |
| | | /* This changelog server is missing some |
| | | /* This replicationServer is missing some |
| | | * of our changes, we are going to try another server |
| | | * but before log a notice message |
| | | */ |
| | |
| | | { |
| | | replayOperations.clear(); |
| | | /* |
| | | * Get all the changes that have not been seen by this changelog |
| | | * server and update it |
| | | * Get all the changes that have not been seen by this |
| | | * replicationServer and update it |
| | | */ |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | LDAPFilter filter = LDAPFilter.decode( |
| | | "("+ Historical.HISTORICALATTRIBUTENAME + |
| | | ">=dummy:" + changelogMaxChangeNumber + ")"); |
| | | ">=dummy:" + replServerMaxChangeNumber + ")"); |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(Historical.HISTORICALATTRIBUTENAME); |
| | | InternalSearchOperation op = conn.processSearch( |
| | |
| | | } |
| | | else |
| | | { |
| | | changelogServer = ServerAddr.toString(); |
| | | replicationServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | |
| | | { |
| | | /* |
| | | * There was no server waiting on this host:port |
| | | * Log a notice and try the next changelog server in the list |
| | | * Log a notice and try the next replicationServer in the list |
| | | */ |
| | | int msgID = MSGID_NO_CHANGELOG_SERVER_LISTENING; |
| | | String message = getMessage(msgID, server); |
| | |
| | | if (checkState == true) |
| | | { |
| | | /* |
| | | * We could not find a changelog server that has seen all the |
| | | * We could not find a replicationServer that has seen all the |
| | | * changes that this server has already processed, start again |
| | | * the loop looking for any changelog server. |
| | | * the loop looking for any replicationServer. |
| | | */ |
| | | try |
| | | { |
| | |
| | | else |
| | | { |
| | | /* |
| | | * This server could not find any changelog server |
| | | * This server could not find any replicationServer |
| | | * Let's wait a little and try again. |
| | | */ |
| | | synchronized (this) |
| | |
| | | |
| | | |
| | | /** |
| | | * Restart the Changelog broker after a failure. |
| | | * Restart the ReplicationServer broker after a failure. |
| | | * |
| | | * @param failingSession the socket which failed |
| | | */ |
| | |
| | | */ |
| | | public void stop() |
| | | { |
| | | changelogServer = "stopped"; |
| | | replicationServer = "stopped"; |
| | | shutdown = true; |
| | | connected = false; |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("ChangelogBroker Stop Closing session"); |
| | | debugInfo("ReplicationBroker Stop Closing session"); |
| | | } |
| | | |
| | | if (session != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the name of the changelog server to which this broker is currently |
| | | * Get the name of the replicationServer to which this broker is currently |
| | | * connected. |
| | | * |
| | | * @return the name of the changelog server to which this domain |
| | | * @return the name of the replicationServer to which this domain |
| | | * is currently connected. |
| | | */ |
| | | public String getChangelogServer() |
| | | public String getReplicationServer() |
| | | { |
| | | return changelogServer; |
| | | return replicationServer; |
| | | } |
| | | /** |
| | | * {@inheritDoc} |
| | |
| | | * TODO : implement code for ADD, DEL, MODDN operation |
| | | * |
| | | * Parse all ds-sync-hist attribute values |
| | | * - for each Changenumber>changelogMaxChangeNumber : |
| | | * - for each Changenumber > replication server MaxChangeNumber : |
| | | * build an attribute mod |
| | | * |
| | | */ |
| | |
| | | /** |
| | | * Change some config parameters. |
| | | * |
| | | * @param changelogServers The new list of changelog servers. |
| | | * @param replicationServers The new list of replication servers. |
| | | * @param maxReceiveQueue The max size of receive queue. |
| | | * @param maxReceiveDelay The max receive delay. |
| | | * @param maxSendQueue The max send queue. |
| | |
| | | * @param window The max window size. |
| | | * @param heartbeatInterval The heartbeat interval. |
| | | */ |
| | | public void changeConfig(Collection<String> changelogServers, |
| | | public void changeConfig(Collection<String> replicationServers, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window, long heartbeatInterval) |
| | | { |
| | | this.servers = changelogServers; |
| | | this.servers = replicationServers; |
| | | this.maxRcvWindow = window; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.maxReceiveDelay = maxReceiveDelay; |
| | |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxSendQueue = maxSendQueue; |
| | | // TODO : Changing those parameters requires to either restart a new |
| | | // session with the changelog server or renegociate the parameters that |
| | | // session with the replicationServer or renegociate the parameters that |
| | | // were sent in the ServerStart message |
| | | } |
| | | } |
| | |
| | | * This class implements the bulk part of the.of the Directory Server side |
| | | * of the replication code. |
| | | * It contains the root method for publishing a change, |
| | | * processing a change received from the changelog service, |
| | | * processing a change received from the replicationServer service, |
| | | * handle conflict resolution, |
| | | * handle protocol messages from the changelog server. |
| | | * handle protocol messages from the replicationServer. |
| | | */ |
| | | public class ReplicationDomain extends DirectoryThread |
| | | implements ConfigurationChangeListener<MultimasterDomainCfg> |
| | |
| | | private ReplicationMonitor monitor; |
| | | |
| | | private ChangeNumberGenerator changeNumberGenerator; |
| | | private ChangelogBroker broker; |
| | | private ReplicationBroker broker; |
| | | |
| | | private List<ListenerThread> synchroThreads = |
| | | new ArrayList<ListenerThread>(); |
| | |
| | | private int listenerThreadNumber = 10; |
| | | private boolean receiveStatus = true; |
| | | |
| | | private Collection<String> changelogServers; |
| | | private Collection<String> replicationServers; |
| | | |
| | | private DN baseDN; |
| | | |
| | |
| | | super("replication flush"); |
| | | |
| | | // Read the configuration parameters. |
| | | changelogServers = configuration.getChangelogServer(); |
| | | replicationServers = configuration.getChangelogServer(); |
| | | serverId = (short) configuration.getServerId(); |
| | | baseDN = configuration.getSynchronizationDN(); |
| | | maxReceiveQueue = configuration.getMaxReceiveQueue(); |
| | |
| | | */ |
| | | try |
| | | { |
| | | broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue, |
| | | broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue, |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay, window, |
| | | heartbeatInterval); |
| | | synchronized (broker) |
| | | { |
| | | broker.start(changelogServers); |
| | | broker.start(replicationServers); |
| | | if (!receiveStatus) |
| | | broker.suspendReceive(); |
| | | } |
| | |
| | | |
| | | } catch (Exception e) |
| | | { |
| | | /* TODO should mark that changelog service is |
| | | /* TODO should mark that replicationServer service is |
| | | * not available, log an error and retry upon timeout |
| | | * should we stop the modifications ? |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * Receives an update message from the changelog. |
| | | * Receives an update message from the replicationServer. |
| | | * also responsible for updating the list of pending changes |
| | | * @return the received message - null if none |
| | | */ |
| | |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // changelog did not find any import source. |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | } |
| | | } |
| | |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName()); |
| | | |
| | | // stop the ChangelogBroker |
| | | // stop the ReplicationBroker |
| | | broker.stop(); |
| | | } |
| | | |
| | | /** |
| | | * Get the name of the changelog server to which this domain is currently |
| | | * Get the name of the replicationServer to which this domain is currently |
| | | * connected. |
| | | * |
| | | * @return the name of the changelog server to which this domain |
| | | * @return the name of the replicationServer to which this domain |
| | | * is currently connected. |
| | | */ |
| | | public String getChangelogServer() |
| | | public String getReplicationServer() |
| | | { |
| | | return broker.getChangelogServer(); |
| | | return broker.getReplicationServer(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Push all committed local changes to the changelog service. |
| | | * Push all committed local changes to the replicationServer service. |
| | | * PRECONDITION : The pendingChanges lock must be held before calling |
| | | * this method. |
| | | */ |
| | |
| | | |
| | | try |
| | | { |
| | | broker.start(changelogServers); |
| | | broker.start(replicationServers); |
| | | } catch (Exception e) |
| | | { |
| | | /* TODO should mark that changelog service is |
| | | /* TODO should mark that replicationServer service is |
| | | * not available, log an error and retry upon timeout |
| | | * should we stop the modifications ? |
| | | */ |
| | |
| | | |
| | | // Re-exchange state with SS |
| | | broker.stop(); |
| | | broker.start(changelogServers); |
| | | broker.start(replicationServers); |
| | | |
| | | } |
| | | catch(Exception e) |
| | |
| | | MultimasterDomainCfg configuration) |
| | | { |
| | | // server id and base dn are readonly. |
| | | // The other parameters needs to be renegociated with the Changelog Server. |
| | | // so that requires restarting the session with the Changelog Server. |
| | | changelogServers = configuration.getChangelogServer(); |
| | | // The other parameters needs to be renegociated with the ReplicationServer. |
| | | // so that requires restarting the session with the ReplicationServer. |
| | | replicationServers = configuration.getChangelogServer(); |
| | | maxReceiveQueue = configuration.getMaxReceiveQueue(); |
| | | maxReceiveDelay = (int) configuration.getMaxReceiveDelay(); |
| | | maxSendQueue = configuration.getMaxSendQueue(); |
| | | maxSendDelay = (int) configuration.getMaxSendDelay(); |
| | | window = configuration.getWindowSize(); |
| | | heartbeatInterval = configuration.getHeartbeatInterval(); |
| | | broker.changeConfig(changelogServers, maxReceiveQueue, maxReceiveDelay, |
| | | broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay, |
| | | maxSendQueue, maxSendDelay, window, heartbeatInterval); |
| | | |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |
| | |
| | | attributes.add(attr); |
| | | |
| | | /* get the base dn */ |
| | | attr = new Attribute("connected-to", domain.getChangelogServer()); |
| | | attr = new Attribute("connected-to", domain.getReplicationServer()); |
| | | attributes.add(attr); |
| | | |
| | | /* get number of lost connections */ |
| File was renamed from opends/src/server/org/opends/server/replication/plugin/ChangelogListener.java |
| | |
| | | import org.opends.server.admin.std.server.ChangelogServerCfg; |
| | | import org.opends.server.admin.std.server.MultimasterSynchronizationProviderCfg; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.replication.server.Changelog; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | /** |
| | | * This class is used to create and object that can |
| | | * register in the admin framework as a listener for changes, add and delete |
| | | * on the Changelog Server configuration objects. |
| | | * on the ReplicationServer configuration objects. |
| | | * |
| | | */ |
| | | public class ChangelogListener |
| | | public class ReplicationServerListener |
| | | implements ConfigurationAddListener<ChangelogServerCfg>, |
| | | ConfigurationDeleteListener<ChangelogServerCfg> |
| | | { |
| | | Changelog changelog = null; |
| | | ReplicationServer replicationServer = null; |
| | | |
| | | /** |
| | | * Build a Changelog Listener from the given Multimaster configuration. |
| | | * Build a ReplicationServer Listener from the given Multimaster |
| | | * configuration. |
| | | * |
| | | * @param configuration The configuration that will be used to listen |
| | | * for changelog configuration changes. |
| | | * for replicationServer configuration changes. |
| | | * |
| | | * @throws ConfigException if the ChangelogListener can't register for |
| | | * @throws ConfigException if the ReplicationServerListener can't register for |
| | | * listening to changes on the provided configuration |
| | | * object. |
| | | */ |
| | | public ChangelogListener( |
| | | public ReplicationServerListener( |
| | | MultimasterSynchronizationProviderCfg configuration) |
| | | throws ConfigException |
| | | { |
| | |
| | | if (configuration.hasChangelogServer()) |
| | | { |
| | | ChangelogServerCfg server = configuration.getChangelogServer(); |
| | | changelog = new Changelog(server); |
| | | replicationServer = new ReplicationServer(server); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | try |
| | | { |
| | | changelog = new Changelog(configuration); |
| | | replicationServer = new ReplicationServer(configuration); |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |
| | | } catch (ConfigException e) |
| | | { |
| | |
| | | public boolean isConfigurationAddAcceptable( |
| | | ChangelogServerCfg configuration, List<String> unacceptableReasons) |
| | | { |
| | | return Changelog.isConfigurationAcceptable( |
| | | return ReplicationServer.isConfigurationAcceptable( |
| | | configuration, unacceptableReasons); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the Changelog servers. |
| | | * Shutdown the Replication servers. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | if (changelog != null) |
| | | changelog.shutdown(); |
| | | if (replicationServer != null) |
| | | replicationServer.shutdown(); |
| | | } |
| | | |
| | | /** |
| | |
| | | public ConfigChangeResult applyConfigurationDelete( |
| | | ChangelogServerCfg configuration) |
| | | { |
| | | // There can be only one changelog, just shutdown the changelog |
| | | // currently configured. |
| | | if (changelog != null) |
| | | // There can be only one replicationServer, just shutdown the |
| | | // replicationServer currently configured. |
| | | if (replicationServer != null) |
| | | { |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | } |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |
| | | } |
| | |
| | | * <A HREF="ReplicationDomain.html"><B>ReplicationDomain</B></A> |
| | | * contains the bulk of the Directory Server side of the |
| | | * replication code. Most notably it contains the root method for |
| | | * publishing a change, processing a change received from the changelog |
| | | * publishing a change, processing a change received from the replicationServer |
| | | * service, handle conflict resolution, handle protocol messages from the |
| | | * changelog server. |
| | | * replicationServer. |
| | | * </li> |
| | | * </ul> |
| | | */ |
| | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | |
| | | /** |
| | | * Used to send acks between LDAP and changelog servers. |
| | | * Used to send acks between LDAP and replication servers. |
| | | */ |
| | | public class AckMessage extends ReplicationMessage |
| | | { |
| | |
| | | |
| | | /** |
| | | * This class is used to exchange Add operation between LDAP servers |
| | | * and changelog servers. |
| | | * and replication servers. |
| | | */ |
| | | public class AddMsg extends UpdateMessage |
| | | { |
| | |
| | | import org.opends.server.types.Operation; |
| | | |
| | | /** |
| | | * Object used when sending delete information to Changelogs. |
| | | * Object used when sending delete information to replication servers. |
| | | */ |
| | | public class DeleteMsg extends UpdateMessage |
| | | { |
| | |
| | | |
| | | /** |
| | | * This message is part of the replication protocol. |
| | | * This message is sent by a server or a changelog server when an error |
| | | * This message is sent by a server or a replication server when an error |
| | | * is detected in the context of a total update. |
| | | */ |
| | | public class ErrorMessage extends RoutableMessage implements |
| | |
| | | /** |
| | | * Create a InitializeMessage. |
| | | * |
| | | * @param destination changelog server id |
| | | * @param destination replication server id |
| | | * @param msgID error message ID |
| | | * @param details details of the error |
| | | */ |
| File was renamed from opends/src/server/org/opends/server/replication/protocol/ChangelogStartMessage.java |
| | |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * Message sent by a changelog server to another changelog server at Startup. |
| | | * Message sent by a replication server to another replication server |
| | | * at Startup. |
| | | */ |
| | | public class ChangelogStartMessage extends ReplicationMessage implements |
| | | public class ReplServerStartMessage extends ReplicationMessage implements |
| | | Serializable |
| | | { |
| | | private static final long serialVersionUID = -5871385537169856856L; |
| | |
| | | private int windowSize; |
| | | |
| | | /** |
| | | * Create a ChangelogStartMessage. |
| | | * Create a ReplServerStartMessage. |
| | | * |
| | | * @param serverId changelog server id |
| | | * @param serverURL changelog server URL |
| | | * @param baseDn base DN for which the ChangelogStartMessage is created. |
| | | * @param serverId replication server id |
| | | * @param serverURL replication server URL |
| | | * @param baseDn base DN for which the ReplServerStartMessage is created. |
| | | * @param windowSize The window size. |
| | | * @param serverState our ServerState for this baseDn. |
| | | */ |
| | | public ChangelogStartMessage(short serverId, String serverURL, DN baseDn, |
| | | public ReplServerStartMessage(short serverId, String serverURL, DN baseDn, |
| | | int windowSize, |
| | | ServerState serverState) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Creates a new ChangelogStartMessage by decoding the provided byte array. |
| | | * Creates a new ReplServerStartMessage by decoding the provided byte array. |
| | | * @param in A byte array containing the encoded information for the |
| | | * ChangelogStartMessage |
| | | * ReplServerStartMessage |
| | | * @throws DataFormatException If the in does not contain a properly |
| | | * encoded ChangelogStartMessage. |
| | | * encoded ReplServerStartMessage. |
| | | */ |
| | | public ChangelogStartMessage(byte[] in) throws DataFormatException |
| | | public ReplServerStartMessage(byte[] in) throws DataFormatException |
| | | { |
| | | /* The ChangelogStartMessage is encoded in the form : |
| | | /* The ReplServerStartMessage is encoded in the form : |
| | | * <baseDn><ServerId><ServerUrl><windowsize><ServerState> |
| | | */ |
| | | try |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != MSG_TYPE_CHANGELOG_START) |
| | | throw new DataFormatException("input is not a valid ChangelogStartMsg"); |
| | | if (in[0] != MSG_TYPE_REPL_SERVER_START) |
| | | throw new DataFormatException( |
| | | "input is not a valid ReplServerStartMsg"); |
| | | int pos = 1; |
| | | |
| | | /* read the dn |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the base DN from this ChangelogStartMessage. |
| | | * Get the base DN from this ReplServerStartMessage. |
| | | * |
| | | * @return the base DN from this ChangelogStartMessage. |
| | | * @return the base DN from this ReplServerStartMessage. |
| | | */ |
| | | public DN getBaseDn() |
| | | { |
| | |
| | | @Override |
| | | public byte[] getBytes() |
| | | { |
| | | /* The ChangelogStartMessage is stored in the form : |
| | | /* The ReplServerStartMessage is stored in the form : |
| | | * <operation type><basedn><serverid><serverURL><windowsize><serverState> |
| | | */ |
| | | try { |
| | |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = MSG_TYPE_CHANGELOG_START; |
| | | resultByteArray[0] = MSG_TYPE_REPL_SERVER_START; |
| | | int pos = 1; |
| | | |
| | | /* put the baseDN and a terminating 0 */ |
| | |
| | | static final byte MSG_TYPE_MODIFYDN_REQUEST = 4; |
| | | static final byte MSG_TYPE_ACK = 5; |
| | | static final byte MSG_TYPE_SERVER_START = 6; |
| | | static final byte MSG_TYPE_CHANGELOG_START = 7; |
| | | static final byte MSG_TYPE_REPL_SERVER_START = 7; |
| | | static final byte MSG_TYPE_WINDOW = 8; |
| | | static final byte MSG_TYPE_HEARTBEAT = 9; |
| | | static final byte MSG_TYPE_INITIALIZE_REQUEST = 10; |
| | |
| | | * MSG_TYPE_MODIFY_DN_REQUEST |
| | | * MSG_TYPE_ACK |
| | | * MSG_TYPE_SERVER_START |
| | | * MSG_TYPE_CHANGELOG_START |
| | | * MSG_TYPE_REPL_SERVER_START |
| | | * MSG_TYPE_WINDOW |
| | | * MSG_TYPE_HEARTBEAT |
| | | * MSG_TYPE_INITIALIZE |
| | |
| | | case MSG_TYPE_SERVER_START: |
| | | msg = new ServerStartMessage(buffer); |
| | | break; |
| | | case MSG_TYPE_CHANGELOG_START: |
| | | msg = new ChangelogStartMessage(buffer); |
| | | case MSG_TYPE_REPL_SERVER_START: |
| | | msg = new ReplServerStartMessage(buffer); |
| | | break; |
| | | case MSG_TYPE_WINDOW: |
| | | msg = new WindowMessage(buffer); |
| | |
| | | |
| | | /** |
| | | * Creates a routable message. |
| | | * @param senderID changelog server id |
| | | * @param destination changelog server id |
| | | * @param senderID replication server id |
| | | * @param destination replication server id |
| | | */ |
| | | public RoutableMessage(short senderID, short destination) |
| | | { |
| | |
| | | |
| | | /** |
| | | * This message is used by LDAP server when they first connect. |
| | | * to a changelog server to let them know who they are and what is their state |
| | | * to a replication server to let them know who they are and what is their state |
| | | * (their RUV) |
| | | */ |
| | | public class ServerStartMessage extends ReplicationMessage implements |
| | |
| | | |
| | | /** |
| | | * This message is used by LDAP server when they first connect. |
| | | * to a changelog server to let them know who they are and what is their state |
| | | * to a replication server to let them know who they are and what is their state |
| | | * (their RUV) |
| | | */ |
| | | public class WindowMessage extends ReplicationMessage implements |
| | |
| | | */ |
| | | |
| | | /** |
| | | * This package contains the code used by the changelog and by the |
| | | * This package contains the code used by the replication server and by the |
| | | * code running on the Directory Server side to exchange their information. |
| | | * <br> |
| | | * <br> |
| | |
| | | * <ul> |
| | | * <li><A HREF="SocketSession.html"><B>SocketSession</B></A> |
| | | * implements the ProtocolSession interface that is |
| | | * used by the changelog server and the directory server to communicate. |
| | | * used by the replication server and the directory server to communicate. |
| | | * This is done by using the innate encoding/decoding capabilities of the |
| | | * ReplicationMessages objects. This class is used by both the |
| | | * changelog and the replication package. |
| | | * server and the plugin package. |
| | | * </li> |
| | | * <li><A HREF="ReplicationMessage.html"><B>ReplicationMessage</B></A> |
| | | * This class and the class that inherit from it contain the |
| | | * messages that are used for communication between the changelog and the |
| | | * Directory Server as well as the methods fro encoding/decoding them. |
| | | * messages that are used for communication between the replication server and |
| | | * the Directory Server as well as the methods fro encoding/decoding them. |
| | | * </li> |
| | | * </ul> |
| | | */ |
| | |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.server.ChangelogDB.ChangelogCursor; |
| | | import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | | /** |
| | | * This class is used for managing the changelog database for each servers |
| | | * in the topology. |
| | | * This class is used for managing the replicationServer database for each |
| | | * server in the topology. |
| | | * It is responsible for efficiently saving the updates that is received from |
| | | * each master server into stable storage. |
| | | * This class is also able to generate a ChangelogIterator that can be |
| | | * This class is also able to generate a ReplicationIterator that can be |
| | | * used to read all changes from a given ChangeNUmber. |
| | | * |
| | | * This class publish some monitoring information below cn=monitor. |
| | |
| | | // This queue hold all the updates not yet saved to stable storage |
| | | // it is only used as a temporary placeholder so that the write |
| | | // in the stable storage can be grouped for efficiency reason. |
| | | // it is never read back by changelog threads that are responsible |
| | | // for pushing the changes to other changelog server or to LDAP server |
| | | // it is never read back by replicationServer threads that are responsible |
| | | // for pushing the changes to other replication server or to LDAP server |
| | | private LinkedList<UpdateMessage> msgQueue = new LinkedList<UpdateMessage>(); |
| | | private ChangelogDB db; |
| | | private ReplicationDB db; |
| | | private ChangeNumber firstChange = null; |
| | | private ChangeNumber lastChange = null; |
| | | private short serverId; |
| | |
| | | * |
| | | * @param id Identifier of the DB. |
| | | * @param baseDn the baseDn for which this DB was created. |
| | | * @param changelog The Changelog that creates this dbHandler. |
| | | * @param dbenv the Database Env to use to create the Changelog DB. |
| | | * @param replicationServer The ReplicationServer that creates this dbHandler. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * @throws DatabaseException If a database problem happened |
| | | */ |
| | | public DbHandler(short id, DN baseDn, Changelog changelog, |
| | | ChangelogDbEnv dbenv) |
| | | public DbHandler(short id, DN baseDn, ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv) |
| | | throws DatabaseException |
| | | { |
| | | this.serverId = id; |
| | | this.baseDn = baseDn; |
| | | this.trimage = changelog.getTrimage(); |
| | | db = new ChangelogDB(id, baseDn, changelog, dbenv); |
| | | this.trimage = replicationServer.getTrimage(); |
| | | db = new ReplicationDB(id, baseDn, replicationServer, dbenv); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | thread = new DirectoryThread(this, "changelog db " + id + " " + baseDn); |
| | | thread = new DirectoryThread(this, |
| | | "Replication Server db " + id + " " + baseDn); |
| | | thread.start(); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider( |
| | |
| | | } |
| | | |
| | | /** |
| | | * Generate a new ChangelogIterator that allows to browse the db |
| | | * Generate a new ReplicationIterator that allows to browse the db |
| | | * managed by this dbHandler and starting at the position defined |
| | | * by a given changeNumber. |
| | | * |
| | | * @param changeNumber The position where the iterator must start. |
| | | * |
| | | * @return a new ChangelogIterator that allows to browse the db |
| | | * @return a new ReplicationIterator that allows to browse the db |
| | | * managed by this dbHandler and starting at the position defined |
| | | * by a given changeNumber. |
| | | * |
| | |
| | | * @throws Exception If there is no other change to push after change |
| | | * with changeNumber number. |
| | | */ |
| | | public ChangelogIterator generateIterator(ChangeNumber changeNumber) |
| | | public ReplicationIterator generateIterator(ChangeNumber changeNumber) |
| | | throws DatabaseException, Exception |
| | | { |
| | | /* |
| | |
| | | flush(); |
| | | } |
| | | |
| | | return new ChangelogIterator(serverId, db, changeNumber); |
| | | return new ReplicationIterator(serverId, db, changeNumber); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | /** |
| | | * Run method for this class. |
| | | * Periodically Flushes the ChangelogCache from memory to the stable storage |
| | | * Periodically Flushes the ReplicationCache from memory to the stable storage |
| | | * and trims the old updates. |
| | | */ |
| | | public void run() |
| | |
| | | } |
| | | |
| | | /** |
| | | * Flush old change information from this changelog database. |
| | | * Flush old change information from this replicationServer database. |
| | | * @throws DatabaseException In case of database problem. |
| | | */ |
| | | private void trim() throws DatabaseException, Exception |
| | |
| | | /* the trim is done by group in order to save some CPU and IO bandwidth |
| | | * start the transaction then do a bunch of remove then commit |
| | | */ |
| | | ChangelogCursor cursor; |
| | | ReplServerDBCursor cursor; |
| | | |
| | | cursor = db.openDeleteCursor(); |
| | | |
| | |
| | | { |
| | | private DbMonitorProvider() |
| | | { |
| | | super("Changelog Database"); |
| | | super("ReplicationServer Database"); |
| | | } |
| | | |
| | | /** |
| | |
| | | public ArrayList<Attribute> getMonitorData() |
| | | { |
| | | ArrayList<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(new Attribute("changelog-database", |
| | | attributes.add(new Attribute("replicationServer-database", |
| | | String.valueOf(serverId))); |
| | | attributes.add(new Attribute("base-dn", baseDn.toString())); |
| | | if (firstChange != null) |
| | |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | return "Changelog database " + baseDn.toString() + |
| | | return "ReplicationServer database " + baseDn.toString() + |
| | | " " + String.valueOf(serverId); |
| | | } |
| | | |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | |
| | | /** |
| | | * This class is used to store acks for update messages coming from |
| | | * other replication servers. |
| | | */ |
| | | public class ReplServerAckMessageList extends AckMessageList |
| | | { |
| | | private short replicationServerId; |
| | | private ReplicationCache replicationCache; |
| | | |
| | | /** |
| | | * Creates a new AckMessageList for a given ChangeNumber. |
| | | * |
| | | * @param changeNumber The ChangeNumber for which the ack list is created. |
| | | * @param numExpectedAcks The number of acks waited before acking the |
| | | * original change. |
| | | * @param replicationServerId The Identifier of the replication server |
| | | * from which the change was received. |
| | | * @param replicationCache The ReplicationCache from which he change |
| | | * was received. |
| | | */ |
| | | public ReplServerAckMessageList(ChangeNumber changeNumber, |
| | | int numExpectedAcks, |
| | | short replicationServerId, |
| | | ReplicationCache replicationCache) |
| | | { |
| | | super(changeNumber, numExpectedAcks); |
| | | this.replicationServerId = replicationServerId; |
| | | this.replicationCache = replicationCache; |
| | | } |
| | | |
| | | /** |
| | | * Get the Identifier of the replication server from which we received the |
| | | * change. |
| | | * @return Returns the Identifier of the replication server from which we |
| | | * received the change. |
| | | */ |
| | | public short getReplicationServerId() |
| | | { |
| | | return replicationServerId; |
| | | } |
| | | |
| | | /** |
| | | * Get the replicationCache of the replication server from which we received |
| | | * the change. |
| | | * @return Returns the replicationCache of the replication server from which |
| | | * we received the change . |
| | | */ |
| | | public ReplicationCache getChangelogCache() |
| | | { |
| | | return replicationCache; |
| | | } |
| | | |
| | | |
| | | } |
| File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogCache.java |
| | |
| | | /** |
| | | * This class define an in-memory cache that will be used to store |
| | | * the messages that have been received from an LDAP server or |
| | | * from another changelog server and that should be forwarded to |
| | | * from another replication server and that should be forwarded to |
| | | * other servers. |
| | | * |
| | | * The size of the cache is set by configuration. |
| | |
| | | * received to the disk and for trimming them |
| | | * Decision to trim can be based on disk space or age of the message |
| | | */ |
| | | public class ChangelogCache |
| | | public class ReplicationCache |
| | | { |
| | | private Object flowControlLock = new Object(); |
| | | private DN baseDn = null; |
| | |
| | | * must push to this particular server |
| | | * |
| | | * We add new TreeSet in the HashMap when a new server register |
| | | * to this changelog server. |
| | | * to this replication server. |
| | | * |
| | | */ |
| | | private Map<Short, ServerHandler> connectedServers = |
| | | new ConcurrentHashMap<Short, ServerHandler>(); |
| | | |
| | | /* |
| | | * This map contains one ServerHandler for each changelog servers |
| | | * with which we are connected (so normally all the changelogs) |
| | | * This map contains one ServerHandler for each replication servers |
| | | * with which we are connected (so normally all the replication servers) |
| | | * the first update in the balanced tree is the next change that we |
| | | * must push to this particular server |
| | | * |
| | | * We add new TreeSet in the HashMap when a new changelog server register |
| | | * to this changelog server. |
| | | * We add new TreeSet in the HashMap when a new replication server register |
| | | * to this replication server. |
| | | */ |
| | | private Map<Short, ServerHandler> changelogServers = |
| | | private Map<Short, ServerHandler> replicationServers = |
| | | new ConcurrentHashMap<Short, ServerHandler>(); |
| | | |
| | | /* |
| | |
| | | */ |
| | | private Map<Short, DbHandler> sourceDbHandlers = |
| | | new ConcurrentHashMap<Short, DbHandler>(); |
| | | private Changelog changelog; |
| | | private ReplicationServer replicationServer; |
| | | |
| | | /** |
| | | * Creates a new ChangelogCache associated to the DN baseDn. |
| | | * Creates a new ReplicationCache associated to the DN baseDn. |
| | | * |
| | | * @param baseDn The baseDn associated to the ChangelogCache. |
| | | * @param changelog the Changelog that created this changelog cache. |
| | | * @param baseDn The baseDn associated to the ReplicationCache. |
| | | * @param replicationServer the ReplicationServer that created this |
| | | * replicationServer cache. |
| | | */ |
| | | public ChangelogCache(DN baseDn, Changelog changelog) |
| | | public ReplicationCache(DN baseDn, ReplicationServer replicationServer) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.changelog = changelog; |
| | | this.replicationServer = replicationServer; |
| | | } |
| | | |
| | | /** |
| | |
| | | /* |
| | | * TODO : In case that the source server is a LDAP server this method |
| | | * should check that change did get pushed to at least one |
| | | * other changelog server before pushing it to the LDAP servers |
| | | * other replication server before pushing it to the LDAP servers |
| | | */ |
| | | |
| | | sourceHandler.updateServerState(update); |
| | |
| | | int count = this.NumServers(); |
| | | if (count > 1) |
| | | { |
| | | if (sourceHandler.isChangelogServer()) |
| | | if (sourceHandler.isReplicationServer()) |
| | | ServerHandler.addWaitingAck(update, sourceHandler.getServerId(), |
| | | this, count - 1); |
| | | else |
| | |
| | | { |
| | | try |
| | | { |
| | | dbHandler = changelog.newDbHandler(id, baseDn); |
| | | dbHandler = replicationServer.newDbHandler(id, baseDn); |
| | | } catch (DatabaseException e) |
| | | { |
| | | /* |
| | | * Because of database problem we can't save any more changes |
| | | * from at least one LDAP server. |
| | | * This changelog therefore can't do it's job properly anymore |
| | | * This replicationServer therefore can't do it's job properly anymore |
| | | * and needs to close all its connections and shutdown itself. |
| | | */ |
| | | int msgID = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR; |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | return; |
| | | } |
| | | sourceDbHandlers.put(id, dbHandler); |
| | |
| | | |
| | | |
| | | /* |
| | | * Push the message to the changelog servers |
| | | * Push the message to the replication servers |
| | | */ |
| | | if (!sourceHandler.isChangelogServer()) |
| | | if (!sourceHandler.isReplicationServer()) |
| | | { |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | handler.add(update, sourceHandler); |
| | | } |
| | |
| | | |
| | | /** |
| | | * Create initialize context necessary for finding the changes |
| | | * that must be sent to a given LDAP or changelog server. |
| | | * that must be sent to a given LDAP or replication server. |
| | | * |
| | | * @param handler handler for the server that must be started |
| | | * @throws Exception when method has failed |
| | |
| | | { |
| | | handler.stopHandler(); |
| | | |
| | | if (handler.isChangelogServer()) |
| | | changelogServers.remove(handler.getServerId()); |
| | | if (handler.isReplicationServer()) |
| | | replicationServers.remove(handler.getServerId()); |
| | | else |
| | | connectedServers.remove(handler.getServerId()); |
| | | } |
| | | |
| | | /** |
| | | * Create initialize context necessary for finding the changes |
| | | * that must be sent to a given changelog server. |
| | | * that must be sent to a given replication server. |
| | | * |
| | | * @param handler the server ID to which we want to forward changes |
| | | * @throws Exception in case of errors |
| | | */ |
| | | public void startChangelog(ServerHandler handler) throws Exception |
| | | public void startReplicationServer(ServerHandler handler) throws Exception |
| | | { |
| | | /* |
| | | * create the balanced tree that will be used to forward changes |
| | | * TODO throw proper exception |
| | | */ |
| | | synchronized (changelogServers) |
| | | synchronized (replicationServers) |
| | | { |
| | | if (changelogServers.containsKey(handler.getServerId())) |
| | | if (replicationServers.containsKey(handler.getServerId())) |
| | | { |
| | | throw new Exception("changelog Id already registered"); |
| | | throw new Exception("Replication Server Id already registered"); |
| | | } |
| | | changelogServers.put(handler.getServerId(), handler); |
| | | replicationServers.put(handler.getServerId(), handler); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return a Set of String containing the lists of Changelog servers |
| | | * Return a Set of String containing the lists of Replication servers |
| | | * connected to this server. |
| | | * @return the set of connected servers |
| | | */ |
| | |
| | | { |
| | | LinkedHashSet<String> mySet = new LinkedHashSet<String>(); |
| | | |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | mySet.add(handler.getServerAddressURL()); |
| | | } |
| | |
| | | |
| | | |
| | | /** |
| | | * Return a Set containing the servers known by this changelog. |
| | | * @return a set containing the servers known by this changelog. |
| | | * Return a Set containing the servers known by this replicationServer. |
| | | * @return a set containing the servers known by this replicationServer. |
| | | */ |
| | | public Set<Short> getServers() |
| | | { |
| | |
| | | * |
| | | * @param serverId Identifier of the server for which the iterator is created. |
| | | * @param changeNumber Starting point for the iterator. |
| | | * @return the created ChangelogIterator. |
| | | * @return the created ReplicationIterator. |
| | | */ |
| | | public ChangelogIterator getChangelogIterator(short serverId, |
| | | public ReplicationIterator getChangelogIterator(short serverId, |
| | | ChangeNumber changeNumber) |
| | | { |
| | | DbHandler handler = sourceDbHandlers.get(serverId); |
| | |
| | | } |
| | | |
| | | /** |
| | | * creates a new ChangelogDB with specified identifier. |
| | | * @param id the identifier of the new ChangelogDB. |
| | | * creates a new ReplicationDB with specified identifier. |
| | | * @param id the identifier of the new ReplicationDB. |
| | | * @param db the new db. |
| | | * |
| | | * @throws DatabaseException If a database error happened. |
| | |
| | | */ |
| | | private int NumServers() |
| | | { |
| | | return changelogServers.size() + connectedServers.size(); |
| | | return replicationServers.size() + connectedServers.size(); |
| | | } |
| | | |
| | | |
| | |
| | | * In this case, we can find the handler from the connectedServers map |
| | | * - the message that was acked comes from a server to which we are not |
| | | * connected. |
| | | * In this case we need to find the changelog server that forwarded |
| | | * In this case we need to find the replication server that forwarded |
| | | * the change and send back the ack to this server. |
| | | */ |
| | | ServerHandler handler = connectedServers.get( |
| | |
| | | } |
| | | else if (msg.getDestination() == RoutableMessage.ALL_SERVERS) |
| | | { |
| | | if (!senderHandler.isChangelogServer()) |
| | | if (!senderHandler.isReplicationServer()) |
| | | { |
| | | // Send to all changelogServers |
| | | for (ServerHandler destinationHandler : changelogServers.values()) |
| | | // Send to all replicationServers |
| | | for (ServerHandler destinationHandler : replicationServers.values()) |
| | | { |
| | | servers.add(destinationHandler); |
| | | } |
| | |
| | | if (senderHandler.isLDAPserver()) |
| | | { |
| | | // let's forward to the other changelogs |
| | | servers.addAll(changelogServers.values()); |
| | | servers.addAll(replicationServers.values()); |
| | | } |
| | | } |
| | | } |
| | |
| | | * |
| | | * @param changeNumber The ChangeNumber of the change that must be acked. |
| | | * @param isLDAPserver This boolean indicates if the server that sent the |
| | | * change was an LDAP server or a Changelog server. |
| | | * change was an LDAP server or a ReplicationServer. |
| | | */ |
| | | public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver) |
| | | { |
| | |
| | | * |
| | | * @param changeNumber The ChangeNumber of the change that must be acked. |
| | | * @param isLDAPserver This boolean indicates if the server that sent the |
| | | * change was an LDAP server or a Changelog server. |
| | | * change was an LDAP server or a ReplicationServer. |
| | | * @param serverId The identifier of the server from which we |
| | | * received the change.. |
| | | */ |
| | |
| | | if (isLDAPserver) |
| | | handler = connectedServers.get(serverId); |
| | | else |
| | | handler = changelogServers.get(serverId); |
| | | handler = replicationServers.get(serverId); |
| | | |
| | | // TODO : check for null handler and log error |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ChangelogCache. |
| | | * Shutdown this ReplicationCache. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | // Close session with other changelogs |
| | | for (ServerHandler serverHandler : changelogServers.values()) |
| | | for (ServerHandler serverHandler : replicationServers.values()) |
| | | { |
| | | serverHandler.shutdown(); |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "ChangelogCache " + baseDn; |
| | | return "ReplicationCache " + baseDn; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void checkAllSaturation() throws IOException |
| | | { |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | handler.checkWindow(); |
| | | } |
| | |
| | | */ |
| | | public boolean restartAfterSaturation(ServerHandler sourceHandler) |
| | | { |
| | | for (ServerHandler handler : changelogServers.values()) |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (!handler.restartAfterSaturation(sourceHandler)) |
| | | return false; |
| File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogDB.java |
| | |
| | | * and the dbHandler class. |
| | | * This is the only class that should have code using the BDB interfaces. |
| | | */ |
| | | public class ChangelogDB |
| | | public class ReplicationDB |
| | | { |
| | | private Database db = null; |
| | | private ChangelogDbEnv dbenv = null; |
| | | private Changelog changelog; |
| | | private ReplicationDbEnv dbenv = null; |
| | | private ReplicationServer replicationServer; |
| | | private Short serverId; |
| | | private DN baseDn; |
| | | |
| | |
| | | * to store and retrieve changes from an LDAP server. |
| | | * @param serverId Identifier of the LDAP server. |
| | | * @param baseDn baseDn of the LDAP server. |
| | | * @param changelog the Changelog that needs to be shutdown |
| | | * @param replicationServer the ReplicationServer that needs to be shutdown |
| | | * @param dbenv the Db encironemnet to use to create the db |
| | | * @throws DatabaseException if a database problem happened |
| | | */ |
| | | public ChangelogDB(Short serverId, DN baseDn, Changelog changelog, |
| | | ChangelogDbEnv dbenv) |
| | | public ReplicationDB(Short serverId, DN baseDn, |
| | | ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv) |
| | | throws DatabaseException |
| | | { |
| | | this.serverId = serverId; |
| | | this.baseDn = baseDn; |
| | | this.dbenv = dbenv; |
| | | this.changelog = changelog; |
| | | this.replicationServer = replicationServer; |
| | | db = dbenv.getOrAddDb(serverId, baseDn); |
| | | |
| | | } |
| | |
| | | |
| | | for (UpdateMessage change : changes) |
| | | { |
| | | DatabaseEntry key = new ChangelogKey(change.getChangeNumber()); |
| | | DatabaseEntry data = new ChangelogData(change); |
| | | DatabaseEntry key = new ReplicationKey(change.getChangeNumber()); |
| | | DatabaseEntry data = new ReplicationData(change); |
| | | |
| | | try |
| | | { |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | } |
| | | } |
| | | |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | if (txn != null) |
| | | { |
| | | try |
| | |
| | | txn.abort(); |
| | | } catch (DatabaseException e1) |
| | | { |
| | | // can't do much more. The Changelog server is shuting down. |
| | | // can't do much more. The ReplicationServer is shuting down. |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Create a cursor that can be used to search or iterate on this Changelog DB. |
| | | * Create a cursor that can be used to search or iterate on this |
| | | * ReplicationServer DB. |
| | | * |
| | | * @param changeNumber The ChangeNumber from which the cursor must start. |
| | | * @throws DatabaseException If a database error prevented the cursor |
| | | * creation. |
| | | * @throws Exception if the ChangelogCursor creation failed. |
| | | * @return The ChangelogCursor. |
| | | * @throws Exception if the ReplServerDBCursor creation failed. |
| | | * @return The ReplServerDBCursor. |
| | | */ |
| | | public ChangelogCursor openReadCursor(ChangeNumber changeNumber) |
| | | public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber) |
| | | throws DatabaseException, Exception |
| | | { |
| | | return new ChangelogCursor(changeNumber); |
| | | return new ReplServerDBCursor(changeNumber); |
| | | } |
| | | |
| | | /** |
| | | * Create a cursor that can be used to delete some record from this |
| | | * Changelog database. |
| | | * ReplicationServer database. |
| | | * |
| | | * @throws DatabaseException If a database error prevented the cursor |
| | | * creation. |
| | | * @throws Exception if the ChangelogCursor creation failed. |
| | | * @return The ChangelogCursor. |
| | | * @throws Exception if the ReplServerDBCursor creation failed. |
| | | * @return The ReplServerDBCursor. |
| | | */ |
| | | public ChangelogCursor openDeleteCursor() |
| | | public ReplServerDBCursor openDeleteCursor() |
| | | throws DatabaseException, Exception |
| | | { |
| | | return new ChangelogCursor(); |
| | | return new ReplServerDBCursor(); |
| | | } |
| | | |
| | | /** |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | return null; |
| | | } |
| | | } |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | return null; |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * This Class implements a cursor that can be used to browse a changelog |
| | | * database. |
| | | * This Class implements a cursor that can be used to browse a |
| | | * replicationServer database. |
| | | */ |
| | | public class ChangelogCursor |
| | | public class ReplServerDBCursor |
| | | { |
| | | private Cursor cursor = null; |
| | | private Transaction txn = null; |
| | |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | |
| | | /** |
| | | * Creates a ChangelogCursor that can be used for browsing a changelog db. |
| | | * Creates a ReplServerDBCursor that can be used for browsing a |
| | | * replicationServer db. |
| | | * |
| | | * @param startingChangeNumber The ChangeNumber from which the cursor must |
| | | * start. |
| | | * @throws Exception When the startingChangeNumber does not exist. |
| | | */ |
| | | private ChangelogCursor(ChangeNumber startingChangeNumber) |
| | | private ReplServerDBCursor(ChangeNumber startingChangeNumber) |
| | | throws Exception |
| | | { |
| | | cursor = db.openCursor(txn, null); |
| | | |
| | | if (startingChangeNumber != null) |
| | | { |
| | | key = new ChangelogKey(startingChangeNumber); |
| | | key = new ReplicationKey(startingChangeNumber); |
| | | data = new DatabaseEntry(); |
| | | |
| | | if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != |
| | |
| | | } |
| | | } |
| | | |
| | | private ChangelogCursor() throws DatabaseException |
| | | private ReplServerDBCursor() throws DatabaseException |
| | | { |
| | | txn = dbenv.beginTransaction(); |
| | | cursor = db.openCursor(txn, null); |
| | | } |
| | | |
| | | /** |
| | | * Close the Changelog Cursor. |
| | | * Close the ReplicationServer Cursor. |
| | | */ |
| | | public void close() |
| | | { |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | } |
| | | if (txn != null) |
| | | { |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | } |
| | | } |
| | | } |
| | |
| | | return null; |
| | | } |
| | | try { |
| | | currentChange = ChangelogData.generateChange(data.getData()); |
| | | currentChange = ReplicationData.generateChange(data.getData()); |
| | | } catch (Exception e) { |
| | | /* |
| | | * An error happening trying to convert the data from the changelog |
| | | * database to an Update Message. |
| | | * An error happening trying to convert the data from the |
| | | * replicationServer database to an Update Message. |
| | | * This can only happen if the database is corrupted. |
| | | * There is not much more that we can do at this point except trying |
| | | * to continue with the next record. |
| File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogDBException.java |
| | |
| | | |
| | | /** |
| | | * This class define an Exception that must be used when some error |
| | | * condition was detected in the changelog database that cannot be recovered |
| | | * automatically. |
| | | * condition was detected in the replicationServer database that cannot be |
| | | * recovered automatically. |
| | | */ |
| | | public class ChangelogDBException extends IdentifiedException |
| | | public class ReplicationDBException extends IdentifiedException |
| | | { |
| | | private int messageID; |
| | | |
| | | private static final long serialVersionUID = -8812600147768060090L; |
| | | |
| | | /** |
| | | * Creates a new Changelog db exception with the provided message. |
| | | * This Exception must be used when the full changelog service is |
| | | * Creates a new ReplicationServer db exception with the provided message. |
| | | * This Exception must be used when the full replicationServer service is |
| | | * compromised by the exception |
| | | * |
| | | * @param messageID The unique message ID for the provided message. |
| | | * @param message The message to use for this exception. |
| | | */ |
| | | public ChangelogDBException(int messageID, String message) |
| | | public ReplicationDBException(int messageID, String message) |
| | | { |
| | | super(message); |
| | | |
| File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogData.java |
| | |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | |
| | | /** |
| | | * SuperClass of DatabaseEntry used for data stored in the Changelog Databases. |
| | | * SuperClass of DatabaseEntry used for data stored in the ReplicationServer |
| | | * Databases. |
| | | */ |
| | | public class ChangelogData extends DatabaseEntry |
| | | public class ReplicationData extends DatabaseEntry |
| | | { |
| | | /** |
| | | * Creates a new ChangelogData object from an UpdateMessage. |
| | | * @param change the UpdateMessage used to create the ChangelogData. |
| | | * Creates a new ReplicationData object from an UpdateMessage. |
| | | * @param change the UpdateMessage used to create the ReplicationData. |
| | | */ |
| | | public ChangelogData(UpdateMessage change) |
| | | public ReplicationData(UpdateMessage change) |
| | | { |
| | | this.setData(change.getBytes()); |
| | | } |
| File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogDbEnv.java |
| | |
| | | |
| | | /** |
| | | * This class is used to represent a Db environement that can be used |
| | | * to create ChangelogDB. |
| | | * to create ReplicationDB. |
| | | */ |
| | | public class ChangelogDbEnv |
| | | public class ReplicationDbEnv |
| | | { |
| | | private Environment dbEnvironment = null; |
| | | private Database stateDb = null; |
| | | private Changelog changelog = null; |
| | | private ReplicationServer replicationServer = null; |
| | | |
| | | /** |
| | | * Initialize this class. |
| | |
| | | * It also reads the currently known databases from the "changelogstate" |
| | | * database. |
| | | * @param path Path where the backing files must be created. |
| | | * @param changelog the Changelog that creates this ChangelogDbEnv. |
| | | * @param replicationServer the ReplicationServer that creates this |
| | | * ReplicationDbEnv. |
| | | * @throws DatabaseException If a DatabaseException occured that prevented |
| | | * the initialization to happen. |
| | | * @throws ChangelogDBException If a changelog internal error caused |
| | | * a failure of the changelog processing. |
| | | * @throws ReplicationDBException If a replicationServer internal error caused |
| | | * a failure of the replicationServer processing. |
| | | */ |
| | | public ChangelogDbEnv(String path, Changelog changelog) |
| | | throws DatabaseException, ChangelogDBException |
| | | public ReplicationDbEnv(String path, ReplicationServer replicationServer) |
| | | throws DatabaseException, ReplicationDBException |
| | | { |
| | | this.changelog = changelog; |
| | | this.replicationServer = replicationServer; |
| | | EnvironmentConfig envConfig = new EnvironmentConfig(); |
| | | |
| | | /* Create the DB Environment that will be used for all |
| | | * the Changelog activities related to the db |
| | | * the ReplicationServer activities related to the db |
| | | */ |
| | | envConfig.setAllowCreate(true); |
| | | envConfig.setTransactional(true); |
| | |
| | | * for each of them. |
| | | * |
| | | * @throws DatabaseException in case of underlying DatabaseException |
| | | * @throws ChangelogDBException when the information from the database |
| | | * @throws ReplicationDBException when the information from the database |
| | | * cannot be decoded correctly. |
| | | */ |
| | | private void start() throws DatabaseException, ChangelogDBException |
| | | private void start() throws DatabaseException, ReplicationDBException |
| | | { |
| | | Cursor cursor = stateDb.openCursor(null, null); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | |
| | | message, msgID); |
| | | } |
| | | DbHandler dbHandler = |
| | | new DbHandler(serverId, baseDn, changelog, this); |
| | | changelog.getChangelogCache(baseDn).newDb(serverId, dbHandler); |
| | | new DbHandler(serverId, baseDn, replicationServer, this); |
| | | replicationServer.getReplicationCache(baseDn).newDb(serverId, |
| | | dbHandler); |
| | | } catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | throw new ChangelogDBException(0, |
| | | "changelog state database has a wrong format"); |
| | | throw new ReplicationDBException(0, |
| | | "replicationServer state database has a wrong format"); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens |
| | | throw new ChangelogDBException(0, "need UTF-8 support"); |
| | | throw new ReplicationDBException(0, "need UTF-8 support"); |
| | | } |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogIterator.java |
| | |
| | | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.server.ChangelogDB.ChangelogCursor; |
| | | import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; |
| | | |
| | | /** |
| | | * This class allows to iterate through the changes received from a given |
| | | * LDAP Server Identifier. |
| | | */ |
| | | public class ChangelogIterator |
| | | public class ReplicationIterator |
| | | { |
| | | private UpdateMessage currentChange = null; |
| | | private ChangelogCursor cursor = null; |
| | | private ReplServerDBCursor cursor = null; |
| | | |
| | | /** |
| | | * Creates a new ChangelogIterator. |
| | | * Creates a new ReplicationIterator. |
| | | * @param id the Identifier of the server on which the iterator applies. |
| | | * @param db The db where the iterator must be created. |
| | | * @param changeNumber The ChangeNumber after which the iterator must start. |
| | |
| | | * with changeNumber number. |
| | | * @throws DatabaseException if a database problem happened. |
| | | */ |
| | | public ChangelogIterator(short id, ChangelogDB db, ChangeNumber changeNumber) |
| | | throws Exception, DatabaseException |
| | | public ReplicationIterator( |
| | | short id, ReplicationDB db, ChangeNumber changeNumber) |
| | | throws Exception, DatabaseException |
| | | { |
| | | cursor = db.openReadCursor(changeNumber); |
| | | if (cursor == null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Go to the next change in the ChangelogDB or in the server Queue. |
| | | * Go to the next change in the ReplicationDB or in the server Queue. |
| | | * @return false if the iterator is already on the last change before |
| | | * this call. |
| | | */ |
| File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogIteratorComparator.java |
| | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | |
| | | /** |
| | | * This Class define a Comparator that allows to know which ChangelogIterator |
| | | * This Class define a Comparator that allows to know which ReplicationIterator |
| | | * contain the next UpdateMessage in the order defined by the ChangeNumber |
| | | * of the UpdateMessage. |
| | | */ |
| | | public class ChangelogIteratorComparator |
| | | implements Comparator<ChangelogIterator> |
| | | public class ReplicationIteratorComparator |
| | | implements Comparator<ReplicationIterator> |
| | | { |
| | | /** |
| | | * Compare the ChangeNumber of the ChangelogIterators. |
| | | * Compare the ChangeNumber of the ReplicationIterator. |
| | | * |
| | | * @param o1 first ChangelogIterator. |
| | | * @param o2 second ChangelogIterator. |
| | | * @param o1 first ReplicationIterator. |
| | | * @param o2 second ReplicationIterator. |
| | | * @return result of the comparison. |
| | | */ |
| | | public int compare(ChangelogIterator o1, ChangelogIterator o2) |
| | | public int compare(ReplicationIterator o1, ReplicationIterator o2) |
| | | { |
| | | ChangeNumber csn1 = o1.getChange().getChangeNumber(); |
| | | ChangeNumber csn2 = o2.getChange().getChangeNumber(); |
| File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogKey.java |
| | |
| | | |
| | | /** |
| | | * Superclass of DatabaseEntry. |
| | | * Useful to create Changelog keys from ChangeNumbers. |
| | | * Useful to create ReplicationServer keys from ChangeNumbers. |
| | | */ |
| | | public class ChangelogKey extends DatabaseEntry |
| | | public class ReplicationKey extends DatabaseEntry |
| | | { |
| | | /** |
| | | * Creates a new ChangelogKey from the given ChangeNumber. |
| | | * Creates a new ReplicationKey from the given ChangeNumber. |
| | | * @param changeNumber The changeNumber to use. |
| | | */ |
| | | public ChangelogKey(ChangeNumber changeNumber) |
| | | public ReplicationKey(ChangeNumber changeNumber) |
| | | { |
| | | try |
| | | { |
| File was renamed from opends/src/server/org/opends/server/replication/server/Changelog.java |
| | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | | /** |
| | | * Changelog Listener. |
| | | * ReplicationServer Listener. |
| | | * |
| | | * This singleton is the main object of the changelog server |
| | | * This singleton is the main object of the replication server |
| | | * It waits for the incoming connections and create listener |
| | | * and publisher objects for |
| | | * connection with LDAP servers and with changelog servers |
| | | * connection with LDAP servers and with replication servers |
| | | * |
| | | * It is responsible for creating the changelog cache and managing it |
| | | * It is responsible for creating the replication server cache and managing it |
| | | */ |
| | | public class Changelog |
| | | public class ReplicationServer |
| | | implements Runnable, ConfigurableComponent, |
| | | ConfigurationChangeListener<ChangelogServerCfg> |
| | | { |
| | |
| | | |
| | | private boolean runListen = true; |
| | | |
| | | /* The list of changelog servers configured by the administrator */ |
| | | private Collection<String> changelogServers; |
| | | /* The list of replication servers configured by the administrator */ |
| | | private Collection<String> replicationServers; |
| | | |
| | | /* This table is used to store the list of dn for which we are currently |
| | | * handling servers. |
| | | */ |
| | | private HashMap<DN, ChangelogCache> baseDNs = |
| | | new HashMap<DN, ChangelogCache>(); |
| | | private HashMap<DN, ReplicationCache> baseDNs = |
| | | new HashMap<DN, ReplicationCache>(); |
| | | |
| | | private String localURL = "null"; |
| | | private boolean shutdown = false; |
| | |
| | | private DN configDn; |
| | | private List<ConfigAttribute> configAttributes = |
| | | new ArrayList<ConfigAttribute>(); |
| | | private ChangelogDbEnv dbEnv; |
| | | private ReplicationDbEnv dbEnv; |
| | | private int rcvWindow; |
| | | private int queueSize; |
| | | private String dbDirname = null; |
| | |
| | | // de deleted from the persistent storage. |
| | | |
| | | /** |
| | | * Creates a new Changelog using the provided configuration entry. |
| | | * Creates a new Replication server using the provided configuration entry. |
| | | * |
| | | * @param configuration The configuration of this changelog. |
| | | * @param configuration The configuration of this replication server. |
| | | * @throws ConfigException When Configuration is invalid. |
| | | */ |
| | | public Changelog(ChangelogServerCfg configuration) throws ConfigException |
| | | public ReplicationServer(ChangelogServerCfg configuration) |
| | | throws ConfigException |
| | | { |
| | | shutdown = false; |
| | | runListen = true; |
| | | int changelogPort = configuration.getChangelogPort(); |
| | | changelogServerId = (short) configuration.getChangelogServerId(); |
| | | changelogServers = configuration.getChangelogServer(); |
| | | if (changelogServers == null) |
| | | changelogServers = new ArrayList<String>(); |
| | | replicationServers = configuration.getChangelogServer(); |
| | | if (replicationServers == null) |
| | | replicationServers = new ArrayList<String>(); |
| | | queueSize = configuration.getQueueSize(); |
| | | trimAge = configuration.getChangelogPurgeDelay(); |
| | | dbDirname = configuration.getChangelogDbDirectory(); |
| | |
| | | |
| | | /** |
| | | * The run method for the Listen thread. |
| | | * This thread accept incoming connections on the changelog server |
| | | * ports from other changelog servers or from LDAP servers |
| | | * This thread accept incoming connections on the replication server |
| | | * ports from other replication servers or from LDAP servers |
| | | * and spawn further thread responsible for handling those connections |
| | | */ |
| | | |
| | |
| | | Socket newSocket = null; |
| | | while (shutdown == false) |
| | | { |
| | | // Wait on the changelog port. |
| | | // Read incoming messages and create LDAP or Changelog listener and |
| | | // Publisher. |
| | | // Wait on the replicationServer port. |
| | | // Read incoming messages and create LDAP or ReplicationServer listener |
| | | // and Publisher. |
| | | |
| | | try |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * This method manages the connection with the other changelog servers. |
| | | * It periodically checks that this changelog server is indeed connected |
| | | * to all the other changelog servers and if not attempts to |
| | | * This method manages the connection with the other replication servers. |
| | | * It periodically checks that this replication server is indeed connected |
| | | * to all the other replication servers and if not attempts to |
| | | * make the connection. |
| | | */ |
| | | private void runConnect() |
| | |
| | | { |
| | | /* |
| | | * periodically check that we are connected to all other |
| | | * changelog servers and if not establish the connection |
| | | * replication servers and if not establish the connection |
| | | */ |
| | | for (ChangelogCache changelogCache: baseDNs.values()) |
| | | for (ReplicationCache replicationCache: baseDNs.values()) |
| | | { |
| | | Set<String> connectedChangelogs = changelogCache.getChangelogs(); |
| | | Set<String> connectedChangelogs = replicationCache.getChangelogs(); |
| | | /* |
| | | * check that all changelog in the config are in the connected Set |
| | | * if not create the connection |
| | | * check that all replication server in the config are in the connected |
| | | * Set. If not create the connection |
| | | */ |
| | | for (String serverURL : changelogServers) |
| | | for (String serverURL : replicationServers) |
| | | { |
| | | if ((serverURL.compareTo(this.serverURL) != 0) && |
| | | (!connectedChangelogs.contains(serverURL))) |
| | | { |
| | | this.connect(serverURL, changelogCache.getBaseDn()); |
| | | this.connect(serverURL, replicationCache.getBaseDn()); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * initialization function for the changelog. |
| | | * initialization function for the replicationServer. |
| | | * |
| | | * @param changelogId The unique identifier for this changelog. |
| | | * @param changelogPort The port on which the changelog should listen. |
| | | * @param changelogId The unique identifier for this replicationServer. |
| | | * @param changelogPort The port on which the replicationServer should |
| | | * listen. |
| | | * |
| | | */ |
| | | private void initialize(short changelogId, int changelogPort) |
| | |
| | | try |
| | | { |
| | | /* |
| | | * Initialize the changelog database. |
| | | * Initialize the replicationServer database. |
| | | */ |
| | | dbEnv = new ChangelogDbEnv(getFileForPath(dbDirname).getAbsolutePath(), |
| | | dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(), |
| | | this); |
| | | |
| | | /* |
| | | * create changelog cache |
| | | * create replicationServer cache |
| | | */ |
| | | serverId = changelogId; |
| | | |
| | | /* |
| | | * Open changelog socket |
| | | * Open replicationServer socket |
| | | */ |
| | | String localhostname = InetAddress.getLocalHost().getHostName(); |
| | | String localAdddress = InetAddress.getLocalHost().getHostAddress(); |
| | |
| | | /* |
| | | * create working threads |
| | | */ |
| | | myListenThread = new DirectoryThread(this, "Changelog Listener"); |
| | | myListenThread = new DirectoryThread(this, "Replication Server Listener"); |
| | | myListenThread.start(); |
| | | myConnectThread = new DirectoryThread(this, "Changelog Connect"); |
| | | myConnectThread = new DirectoryThread(this, "Replication Server Connect"); |
| | | myConnectThread.start(); |
| | | |
| | | } catch (DatabaseException e) |
| | |
| | | String message = getMessage(msgID, dbDirname); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } catch (ChangelogDBException e) |
| | | } catch (ReplicationDBException e) |
| | | { |
| | | int msgID = MSGID_COULD_NOT_READ_DB; |
| | | String message = getMessage(msgID, dbDirname); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the ChangelogCache associated to the base DN given in parameter. |
| | | * Get the ReplicationCache associated to the base DN given in parameter. |
| | | * |
| | | * @param baseDn The base Dn for which the ChangelogCache must be returned. |
| | | * @return The ChangelogCache associated to the base DN given in parameter. |
| | | * @param baseDn The base Dn for which the ReplicationCache must be returned. |
| | | * @return The ReplicationCache associated to the base DN given in parameter. |
| | | */ |
| | | public ChangelogCache getChangelogCache(DN baseDn) |
| | | public ReplicationCache getReplicationCache(DN baseDn) |
| | | { |
| | | ChangelogCache changelogCache; |
| | | ReplicationCache replicationCache; |
| | | |
| | | synchronized (baseDNs) |
| | | { |
| | | changelogCache = baseDNs.get(baseDn); |
| | | if (changelogCache == null) |
| | | changelogCache = new ChangelogCache(baseDn, this); |
| | | baseDNs.put(baseDn, changelogCache); |
| | | replicationCache = baseDNs.get(baseDn); |
| | | if (replicationCache == null) |
| | | replicationCache = new ReplicationCache(baseDn, this); |
| | | baseDNs.put(baseDn, replicationCache); |
| | | } |
| | | |
| | | return changelogCache; |
| | | return replicationCache; |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the Changelog service and all its connections. |
| | | * Shutdown the Replication Server service and all its connections. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | |
| | | listenSocket.close(); |
| | | } catch (IOException e) |
| | | { |
| | | // changelog service is closing anyway. |
| | | // replication Server service is closing anyway. |
| | | } |
| | | |
| | | // shutdown all the ChangelogCaches |
| | | for (ChangelogCache changelogCache : baseDNs.values()) |
| | | for (ReplicationCache replicationCache : baseDNs.values()) |
| | | { |
| | | changelogCache.shutdown(); |
| | | replicationCache.shutdown(); |
| | | } |
| | | |
| | | dbEnv.shutdown(); |
| | |
| | | |
| | | |
| | | /** |
| | | * Creates a new DB handler for this Changelog and the serverId and |
| | | * Creates a new DB handler for this ReplicationServer and the serverId and |
| | | * DN given in parameter. |
| | | * |
| | | * @param id The serverId for which the dbHandler must be created. |
| | | * @param baseDn The DN for which the dbHandler muste be created. |
| | | * @return The new DB handler for this Changelog and the serverId and |
| | | * @return The new DB handler for this ReplicationServer and the serverId and |
| | | * DN given in parameter. |
| | | * @throws DatabaseException in case of underlying database problem. |
| | | */ |
| | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.AckMessage; |
| | | import org.opends.server.replication.protocol.ChangelogStartMessage; |
| | | import org.opends.server.replication.protocol.ReplServerStartMessage; |
| | | import org.opends.server.replication.protocol.HeartbeatThread; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.RoutableMessage; |
| | |
| | | |
| | | /** |
| | | * This class defines a server handler, which handles all interaction with a |
| | | * changelog server. |
| | | * replication server. |
| | | */ |
| | | public class ServerHandler extends MonitorProvider |
| | | { |
| | |
| | | private MsgQueue lateQueue = new MsgQueue(); |
| | | private final Map<ChangeNumber, AckMessageList> waitingAcks = |
| | | new HashMap<ChangeNumber, AckMessageList>(); |
| | | private ChangelogCache changelogCache = null; |
| | | private ReplicationCache replicationCache = null; |
| | | private String serverURL; |
| | | private int outCount = 0; // number of update sent to the server |
| | | private int inCount = 0; // number of updates received from the server |
| | |
| | | // flow controled and should |
| | | // be stopped from sending messsages. |
| | | private int saturationCount = 0; |
| | | private short changelogId; |
| | | private short replicationServerId; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | |
| | | */ |
| | | HeartbeatThread heartbeatThread = null; |
| | | |
| | | private static final Map<ChangeNumber, ChangelogAckMessageList> |
| | | changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>(); |
| | | private static final Map<ChangeNumber, ReplServerAckMessageList> |
| | | changelogsWaitingAcks = |
| | | new HashMap<ChangeNumber, ReplServerAckMessageList>(); |
| | | |
| | | /** |
| | | * Creates a new server handler instance with the provided socket. |
| | |
| | | |
| | | /** |
| | | * Do the exchange of start messages to know if the remote |
| | | * server is an LDAP or changelog server and to exchange serverID. |
| | | * server is an LDAP or replication server and to exchange serverID. |
| | | * Then create the reader and writer thread. |
| | | * |
| | | * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. |
| | | * null if this is an incoming connection. |
| | | * @param changelogId The identifier of the changelog that creates this |
| | | * server handler. |
| | | * @param changelogURL The URL of the changelog that creates this |
| | | * server handler. |
| | | * @param replicationServerId The identifier of the replicationServer that |
| | | * creates this server handler. |
| | | * @param replicationServerURL The URL of the replicationServer that creates |
| | | * this server handler. |
| | | * @param windowSize the window size that this server handler must use. |
| | | * @param changelog the Changelog that created this server handler. |
| | | * @param replicationServer the ReplicationServer that created this server |
| | | * handler. |
| | | */ |
| | | public void start(DN baseDn, short changelogId, String changelogURL, |
| | | int windowSize, Changelog changelog) |
| | | public void start(DN baseDn, short replicationServerId, |
| | | String replicationServerURL, |
| | | int windowSize, ReplicationServer replicationServer) |
| | | { |
| | | this.changelogId = changelogId; |
| | | this.replicationServerId = replicationServerId; |
| | | rcvWindowSizeHalf = windowSize/2; |
| | | maxRcvWindow = windowSize; |
| | | rcvWindow = windowSize; |
| | |
| | | if (baseDn != null) |
| | | { |
| | | this.baseDn = baseDn; |
| | | changelogCache = changelog.getChangelogCache(baseDn); |
| | | ServerState localServerState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage msg = |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | replicationCache = replicationServer.getReplicationCache(baseDn); |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | ReplServerStartMessage msg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | baseDn, windowSize, localServerState); |
| | | |
| | | session.publish(msg); |
| | |
| | | |
| | | serverIsLDAPserver = true; |
| | | |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | ServerState localServerState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage myStartMsg = |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | ReplServerStartMessage myStartMsg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | this.baseDn, windowSize, localServerState); |
| | | session.publish(myStartMsg); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | } |
| | | else if (msg instanceof ChangelogStartMessage) |
| | | else if (msg instanceof ReplServerStartMessage) |
| | | { |
| | | ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg; |
| | | ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg; |
| | | serverId = receivedMsg.getServerId(); |
| | | serverURL = receivedMsg.getServerURL(); |
| | | String[] splittedURL = serverURL.split(":"); |
| | |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | | if (baseDn == null) |
| | | { |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | ServerState serverState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage outMsg = |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | this.baseDn, windowSize, serverState); |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | ServerState serverState = replicationCache.getDbServerState(); |
| | | ReplServerStartMessage outMsg = |
| | | new ReplServerStartMessage(replicationServerId, |
| | | replicationServerURL, |
| | | this.baseDn, windowSize, serverState); |
| | | session.publish(outMsg); |
| | | } |
| | | else |
| | |
| | | return; // we did not recognize the message, ignore it |
| | | } |
| | | |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | |
| | | if (serverIsLDAPserver) |
| | | { |
| | | changelogCache.startServer(this); |
| | | replicationCache.startServer(this); |
| | | } |
| | | else |
| | | { |
| | | changelogCache.startChangelog(this); |
| | | replicationCache.startReplicationServer(this); |
| | | } |
| | | |
| | | writer = new ServerWriter(session, serverId, this, changelogCache); |
| | | writer = new ServerWriter(session, serverId, this, replicationCache); |
| | | |
| | | reader = new ServerReader(session, serverId, this, |
| | | changelogCache); |
| | | replicationCache); |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if the server associated to this ServerHandler is a changelog server. |
| | | * Check if the server associated to this ServerHandler is a replication |
| | | * server. |
| | | * @return true if the server associated to this ServerHandler is a |
| | | * changelog server. |
| | | * replication server. |
| | | */ |
| | | public boolean isChangelogServer() |
| | | public boolean isReplicationServer() |
| | | { |
| | | return (!serverIsLDAPserver); |
| | | } |
| | |
| | | * the sum of the number of missing changes for every dbHandler. |
| | | */ |
| | | int totalCount = 0; |
| | | ServerState dbState = changelogCache.getDbServerState(); |
| | | ServerState dbState = replicationCache.getDbServerState(); |
| | | for (short id : dbState) |
| | | { |
| | | int max = dbState.getMaxChangeNumber(id).getSeqnum(); |
| | |
| | | * Get an approximation of the delay by looking at the age of the odest |
| | | * message that has not been sent to this server. |
| | | * This is an approximation because the age is calculated using the |
| | | * clock of the servee where the changelog is currently running |
| | | * clock of the servee where the replicationServer is currently running |
| | | * while it should be calculated using the clock of the server |
| | | * that originally processed the change. |
| | | * |
| | |
| | | saturationCount = 0; |
| | | try |
| | | { |
| | | changelogCache.checkAllSaturation(); |
| | | replicationCache.checkAllSaturation(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | |
| | | * load this change on the delayList |
| | | * |
| | | */ |
| | | ChangelogIteratorComparator comparator = |
| | | new ChangelogIteratorComparator(); |
| | | SortedSet<ChangelogIterator> iteratorSortedSet = |
| | | new TreeSet<ChangelogIterator>(comparator); |
| | | ReplicationIteratorComparator comparator = |
| | | new ReplicationIteratorComparator(); |
| | | SortedSet<ReplicationIterator> iteratorSortedSet = |
| | | new TreeSet<ReplicationIterator>(comparator); |
| | | /* fill the lateQueue */ |
| | | for (short serverId : changelogCache.getServers()) |
| | | for (short serverId : replicationCache.getServers()) |
| | | { |
| | | ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); |
| | | ChangelogIterator iterator = |
| | | changelogCache.getChangelogIterator(serverId, lastCsn); |
| | | ReplicationIterator iterator = |
| | | replicationCache.getChangelogIterator(serverId, lastCsn); |
| | | if ((iterator != null) && (iterator.getChange() != null)) |
| | | { |
| | | iteratorSortedSet.add(iterator); |
| | |
| | | } |
| | | while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100)) |
| | | { |
| | | ChangelogIterator iterator = iteratorSortedSet.first(); |
| | | ReplicationIterator iterator = iteratorSortedSet.first(); |
| | | iteratorSortedSet.remove(iterator); |
| | | lateQueue.add(iterator.getChange()); |
| | | if (iterator.next()) |
| | |
| | | else |
| | | iterator.releaseCursor(); |
| | | } |
| | | for (ChangelogIterator iterator : iteratorSortedSet) |
| | | for (ReplicationIterator iterator : iteratorSortedSet) |
| | | { |
| | | iterator.releaseCursor(); |
| | | } |
| | |
| | | } |
| | | if (completedFlag) |
| | | { |
| | | changelogCache.sendAck(changeNumber, true); |
| | | replicationCache.sendAck(changeNumber, true); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Process reception of an for an update that was received from a |
| | | * Changelog Server. |
| | | * ReplicationServer. |
| | | * |
| | | * @param message the ack message that was received. |
| | | * @param ackingServerId The id of the server that acked the change. |
| | |
| | | public static void ackChangelog(AckMessage message, short ackingServerId) |
| | | { |
| | | ChangeNumber changeNumber = message.getChangeNumber(); |
| | | ChangelogAckMessageList ackList; |
| | | ReplServerAckMessageList ackList; |
| | | boolean completedFlag; |
| | | synchronized (changelogsWaitingAcks) |
| | | { |
| | |
| | | } |
| | | if (completedFlag) |
| | | { |
| | | ChangelogCache changelogCache = ackList.getChangelogCache(); |
| | | changelogCache.sendAck(changeNumber, false, |
| | | ackList.getChangelogServerId()); |
| | | ReplicationCache replicationCache = ackList.getChangelogCache(); |
| | | replicationCache.sendAck(changeNumber, false, |
| | | ackList.getReplicationServerId()); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Add an update to the list of update received from a changelog server and |
| | | * Add an update to the list of update received from a replicationServer and |
| | | * waiting for acks. |
| | | * |
| | | * @param update The update that must be added to the list. |
| | | * @param ChangelogServerId The identifier of the changelog that sent the |
| | | * update. |
| | | * @param changelogCache The ChangelogCache from which the change was |
| | | * processed and to which the ack must later be sent. |
| | | * @param ChangelogServerId The identifier of the replicationServer that sent |
| | | * the update. |
| | | * @param replicationCache The ReplicationCache from which the change was |
| | | * processed and to which the ack must later be sent. |
| | | * @param nbWaitedAck The number of ack that must be received before |
| | | * the update is fully acked. |
| | | */ |
| | | public static void addWaitingAck(UpdateMessage update, |
| | | short ChangelogServerId, ChangelogCache changelogCache, int nbWaitedAck) |
| | | public static void addWaitingAck( |
| | | UpdateMessage update, |
| | | short ChangelogServerId, ReplicationCache replicationCache, |
| | | int nbWaitedAck) |
| | | { |
| | | ChangelogAckMessageList ackList = |
| | | new ChangelogAckMessageList(update.getChangeNumber(), |
| | | ReplServerAckMessageList ackList = |
| | | new ReplServerAckMessageList(update.getChangeNumber(), |
| | | nbWaitedAck, |
| | | ChangelogServerId, changelogCache); |
| | | ChangelogServerId, replicationCache); |
| | | synchronized(changelogsWaitingAcks) |
| | | { |
| | | changelogsWaitingAcks.put(update.getChangeNumber(), ackList); |
| | |
| | | * Check type of server handled. |
| | | * |
| | | * @return true if the handled server is an LDAP server. |
| | | * false if the handled server is a changelog server |
| | | * false if the handled server is a replicationServer |
| | | */ |
| | | public boolean isLDAPserver() |
| | | { |
| | |
| | | if (serverIsLDAPserver) |
| | | return "LDAP Server " + str; |
| | | else |
| | | return "Changelog Server " + str; |
| | | return "Replication Server " + str; |
| | | } |
| | | |
| | | /** |
| | |
| | | if (serverIsLDAPserver) |
| | | attributes.add(new Attribute("LDAP-Server", serverURL)); |
| | | else |
| | | attributes.add(new Attribute("Changelog-Server", serverURL)); |
| | | attributes.add(new Attribute("ReplicationServer-Server", serverURL)); |
| | | attributes.add(new Attribute("server-id", |
| | | String.valueOf(serverId))); |
| | | attributes.add(new Attribute("base-dn", |
| | |
| | | if (serverIsLDAPserver) |
| | | localString = "Directory Server "; |
| | | else |
| | | localString = "Changelog Server "; |
| | | localString = "Replication Server "; |
| | | |
| | | |
| | | localString += serverId + " " + serverURL + " " + baseDn; |
| | |
| | | { |
| | | if (flowControl) |
| | | { |
| | | if (changelogCache.restartAfterSaturation(this)) |
| | | if (replicationCache.restartAfterSaturation(this)) |
| | | { |
| | | flowControl = false; |
| | | } |
| | |
| | | public void process(RoutableMessage msg) |
| | | { |
| | | if (debugEnabled()) |
| | | debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId); |
| | | debugInfo("SH(" + replicationServerId + ") forwards " + |
| | | msg + " to " + serverId); |
| | | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1); |
| | | "SH(" + replicationServerId + ") receives " + msg + |
| | | " from " + serverId, 1); |
| | | |
| | | changelogCache.process(msg, this); |
| | | replicationCache.process(msg, this); |
| | | } |
| | | |
| | | /** |
| | |
| | | public void send(RoutableMessage msg) throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId); |
| | | debugInfo("SH(" + replicationServerId + ") forwards " + |
| | | msg + " to " + serverId); |
| | | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1); |
| | | "SH(" + replicationServerId + ") forwards " + |
| | | msg + " to " + serverId, 1); |
| | | |
| | | session.publish(msg); |
| | | } |
| | |
| | | |
| | | |
| | | /** |
| | | * This class implement the part of the changelog that is reading |
| | | * This class implement the part of the replicationServer that is reading |
| | | * the connection from the LDAP servers to get all the updates that |
| | | * were done on this replica and forward them to other servers. |
| | | * |
| | | * A single thread is dedicated to this work. |
| | | * It waits in a blocking mode on the connection from the LDAP server |
| | | * and upon receiving an update puts in into the changelog cache |
| | | * and upon receiving an update puts in into the replicationServer cache |
| | | * from where the other servers will grab it. |
| | | */ |
| | | public class ServerReader extends DirectoryThread |
| | |
| | | private short serverId; |
| | | private ProtocolSession session; |
| | | private ServerHandler handler; |
| | | private ChangelogCache changelogCache; |
| | | private ReplicationCache replicationCache; |
| | | |
| | | /** |
| | | * Constructor for the LDAP server reader part of the changelog. |
| | | * Constructor for the LDAP server reader part of the replicationServer. |
| | | * |
| | | * @param session The ProtocolSession from which to read the data. |
| | | * @param serverId The server ID of the server from which we read changes. |
| | | * @param handler The server handler for this server reader. |
| | | * @param changelogCache The ChangelogCache for this server reader. |
| | | * @param replicationCache The ReplicationCache for this server reader. |
| | | */ |
| | | public ServerReader(ProtocolSession session, short serverId, |
| | | ServerHandler handler, ChangelogCache changelogCache) |
| | | ServerHandler handler, ReplicationCache replicationCache) |
| | | { |
| | | super(handler.toString() + " reader"); |
| | | this.session = session; |
| | | this.serverId = serverId; |
| | | this.handler = handler; |
| | | this.changelogCache = changelogCache; |
| | | this.replicationCache = replicationCache; |
| | | } |
| | | |
| | | /** |
| | |
| | | /* |
| | | * TODO : catch exceptions in case of bugs |
| | | * wait on input stream |
| | | * grab all incoming messages and publish them to the changelogCache |
| | | * grab all incoming messages and publish them to the replicationCache |
| | | */ |
| | | try |
| | | { |
| | |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | | handler.checkWindow(); |
| | | changelogCache.ack(ack, serverId); |
| | | replicationCache.ack(ack, serverId); |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | UpdateMessage update = (UpdateMessage) msg; |
| | | handler.decAndCheckWindow(); |
| | | changelogCache.put(update, handler); |
| | | replicationCache.put(update, handler); |
| | | } |
| | | else if (msg instanceof WindowMessage) |
| | | { |
| | |
| | | { |
| | | // ignore |
| | | } |
| | | changelogCache.stopServer(handler); |
| | | replicationCache.stopServer(handler); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | private ProtocolSession session; |
| | | private ServerHandler handler; |
| | | private ChangelogCache changelogCache; |
| | | private ReplicationCache replicationCache; |
| | | |
| | | /** |
| | | * Create a ServerWriter. |
| | |
| | | * @param session the ProtocolSession that will be used to send updates. |
| | | * @param serverId the Identifier of the server. |
| | | * @param handler handler for which the ServerWriter is created. |
| | | * @param changelogCache The ChangelogCache of this ServerWriter. |
| | | * @param replicationCache The ReplicationCache of this ServerWriter. |
| | | */ |
| | | public ServerWriter(ProtocolSession session, short serverId, |
| | | ServerHandler handler, ChangelogCache changelogCache) |
| | | ServerHandler handler, ReplicationCache replicationCache) |
| | | { |
| | | super(handler.toString() + " writer"); |
| | | |
| | | this.session = session; |
| | | this.handler = handler; |
| | | this.changelogCache = changelogCache; |
| | | this.replicationCache = replicationCache; |
| | | } |
| | | |
| | | /** |
| | | * Run method for the ServerWriter. |
| | | * Loops waiting for changes from the ChangelogCache and forward them |
| | | * Loops waiting for changes from the ReplicationCache and forward them |
| | | * to the other servers |
| | | */ |
| | | public void run() |
| | |
| | | try { |
| | | while (true) |
| | | { |
| | | UpdateMessage update = changelogCache.take(this.handler); |
| | | UpdateMessage update = replicationCache.take(this.handler); |
| | | if (update == null) |
| | | return; /* this connection is closing */ |
| | | session.publish(update); |
| | |
| | | { |
| | | // Can't do much more : ignore |
| | | } |
| | | changelogCache.stopServer(handler); |
| | | replicationCache.stopServer(handler); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | |
| | | /** |
| | | * This package contains the code for the changelog service part |
| | | * This package contains the code for the Replication Server part |
| | | * of the Multimaster replication feature. |
| | | * <br> |
| | | * |
| | | * A changelog server is responsible for : |
| | | * A replication server is responsible for : |
| | | * <br> |
| | | * <ul> |
| | | * <li>listen for connections from ldap servers.</li> |
| | | * <li>Connect/manage connection to other changelog servers.</li> |
| | | * <li>Connect/manage connection to other replication servers.</li> |
| | | * <li>Receive changes from ldap servers.</li> |
| | | * <li>Forward changes to ldap server and other changelog servers.</li> |
| | | * <li>Forward changes to ldap server and other replication servers.</li> |
| | | * <li>Save changes to stable storage (includes trimming of older operations). |
| | | * </li> |
| | | * </ul> |
| | |
| | | * <ul> |
| | | * <li><A HREF="SocketSession.html"><B>SocketSession</B></A> |
| | | * implements the ProtocolSession interface that is |
| | | * used by the changelog server and the directory server to communicate. |
| | | * used by the replication server and the directory server to communicate. |
| | | * This is done by using the innate encoding/decoding capabilities of the |
| | | * ReplicationMessages objects. This class is used by both the |
| | | * changelog and the replication package. |
| | | * replicationServer and the replication package. |
| | | * </li> |
| | | * <li><A HREF="ChangelogCache.html"><B>ChangelogCache</B></A> |
| | | * implements the multiplexing part of the changelog |
| | | * <li><A HREF="ReplicationCache.html"><B>ReplicationCache</B></A> |
| | | * implements the multiplexing part of the replication |
| | | * server. It contains method for forwarding all the received messages to |
| | | * the ServerHandler and to the dbHandler objects.<br> |
| | | * </li> |
| | | * <li><A HREF="ServerHandler.html"><B>ServerHandler</B></A> |
| | | * contains the code related to handler of remote |
| | | * server. It can manage changelog servers of directory servers (may be it |
| | | * server. It can manage replication servers of directory servers (may be it |
| | | * shoudl be splitted in two different classes, one for each of these).<br> |
| | | * </li> |
| | | * <li><A HREF="ServerWriter.html"><B>ServerWriter</B></A> |
| | |
| | | import org.opends.server.messages.TaskMessages; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.replication.plugin.ChangelogBroker; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.plugin.ReplicationDomain; |
| | | import org.opends.server.replication.protocol.DoneMessage; |
| | | import org.opends.server.replication.protocol.EntryMessage; |
| | |
| | | import org.opends.server.replication.protocol.RoutableMessage; |
| | | import org.opends.server.replication.protocol.SocketSession; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.replication.server.Changelog; |
| | | import org.opends.server.replication.server.ChangelogFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.schema.DirectoryStringSyntax; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.DN; |
| | |
| | | int changelogPort = 8989; |
| | | |
| | | private DN baseDn; |
| | | ChangelogBroker server2 = null; |
| | | Changelog changelog1 = null; |
| | | Changelog changelog2 = null; |
| | | ReplicationBroker server2 = null; |
| | | ReplicationServer changelog1 = null; |
| | | ReplicationServer changelog2 = null; |
| | | boolean emptyOldChanges = true; |
| | | ReplicationDomain sd = null; |
| | | |
| | |
| | | * @param destinationServerID The target server. |
| | | * @param requestorID The initiator server. |
| | | */ |
| | | private void makeBrokerPublishEntries(ChangelogBroker broker, |
| | | private void makeBrokerPublishEntries(ReplicationBroker broker, |
| | | short senderID, short destinationServerID, short requestorID) |
| | | { |
| | | // Send entries |
| | |
| | | } |
| | | } |
| | | |
| | | void receiveUpdatedEntries(ChangelogBroker broker, short serverID, |
| | | void receiveUpdatedEntries(ReplicationBroker broker, short serverID, |
| | | String[] updatedEntries) |
| | | { |
| | | // Expect the broker to receive the entries |
| | |
| | | } |
| | | |
| | | /** |
| | | * Creates a new changelog server. |
| | | * @param changelogId The serverID of the changelog to create. |
| | | * @return The new changelog server. |
| | | * Creates a new replicationServer. |
| | | * @param changelogId The serverID of the replicationServer to create. |
| | | * @return The new replicationServer. |
| | | */ |
| | | private Changelog createChangelogServer(short changelogId) |
| | | private ReplicationServer createChangelogServer(short changelogId) |
| | | { |
| | | try |
| | | { |
| | |
| | | { |
| | | int chPort = getChangelogPort(changelogId); |
| | | |
| | | ChangelogFakeConfiguration conf = |
| | | new ChangelogFakeConfiguration(chPort, null, 0, changelogId, 0, 100, |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100, |
| | | null); |
| | | Changelog changelog = new Changelog(conf); |
| | | ReplicationServer replicationServer = new ReplicationServer(conf); |
| | | Thread.sleep(1000); |
| | | |
| | | return changelog; |
| | | return replicationServer; |
| | | } |
| | | } |
| | | catch (Exception e) |
| | |
| | | |
| | | /** |
| | | * Create a synchronized suffix in the current server providing the |
| | | * changelog serverID. |
| | | * replication Server ID. |
| | | * @param changelogID |
| | | */ |
| | | private void connectServer1ToChangelog(short changelogID) |
| | | { |
| | | // Connect DS to the changelog |
| | | // Connect DS to the replicationServer |
| | | try |
| | | { |
| | | // suffix synchronized |
| | |
| | | { |
| | | changelog1 = createChangelogServer(changelog1ID); |
| | | |
| | | // Connect DS to the changelog |
| | | // Connect DS to the replicationServer |
| | | connectServer1ToChangelog(changelog1ID); |
| | | |
| | | if (server2 == null) |
| | |
| | | |
| | | changelog1 = createChangelogServer(changelog1ID); |
| | | |
| | | // Connect DS to the changelog |
| | | // Connect DS to the replicationServer |
| | | connectServer1ToChangelog(changelog1ID); |
| | | |
| | | addTestEntriesToDB(); |
| | |
| | | server2 = openChangelogSession(DN.decode("dc=example,dc=com"), |
| | | server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges); |
| | | |
| | | ChangelogBroker server3 = openChangelogSession(DN.decode("dc=example,dc=com"), |
| | | ReplicationBroker server3 = openChangelogSession(DN.decode("dc=example,dc=com"), |
| | | server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges); |
| | | |
| | | Thread.sleep(1000); |
| | |
| | | changelog2 = createChangelogServer(changelog2ID); |
| | | Thread.sleep(3000); |
| | | |
| | | // Connect DS to the changelog 1 |
| | | // Connect DS to the replicationServer 1 |
| | | connectServer1ToChangelog(changelog1ID); |
| | | |
| | | // Put entries in DB |
| | |
| | | server2.stop(); |
| | | |
| | | TestCaseUtils.sleep(100); // give some time to the broker to disconnect |
| | | // fromthe changelog server. |
| | | // from the replicationServer. |
| | | server2 = null; |
| | | } |
| | | super.cleanRealEntries(); |
| | |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.replication.plugin.ChangelogBroker; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.types.Attribute; |
| | |
| | | |
| | | /** |
| | | * Test the window mechanism by : |
| | | * - creating a Changelog service client using the ChangelogBroker class. |
| | | * - creating a ReplicationServer service client using the ReplicationBroker class. |
| | | * - set a small window size. |
| | | * - perform more than the window size operations. |
| | | * - check that the Changelog has not sent more than window size operations. |
| | | * - receive all messages from the ChangelogBroker, check that |
| | | * - check that the ReplicationServer has not sent more than window size operations. |
| | | * - receive all messages from the ReplicationBroker, check that |
| | | * the client receives the correct number of operations. |
| | | */ |
| | | @Test(enabled=true, groups="slow") |
| | |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 13, |
| | | ReplicationBroker broker = openChangelogSession(baseDn, (short) 13, |
| | | WINDOW_SIZE, 8989, 1000, true); |
| | | |
| | | try { |
| | | |
| | | /* Test that changelog monitor and synchro plugin monitor informations |
| | | /* Test that replicationServer monitor and synchro plugin monitor informations |
| | | * publish the correct window size. |
| | | * This allows both the check the monitoring code and to test that |
| | | * configuration is working. |
| | |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD Replication message is not for the excepted DN"); |
| | | |
| | | // send (2 * window + changelog queue) modify operations |
| | | // so that window + changelog queue get stuck in the changelog queue |
| | | // send (2 * window + replicationServer queue) modify operations |
| | | // so that window + replicationServer queue get stuck in the replicationServer queue |
| | | int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE; |
| | | processModify(count); |
| | | |
| | | // let some time to the message to reach the changelog client |
| | | // let some time to the message to reach the replicationServer client |
| | | Thread.sleep(500); |
| | | |
| | | // check that the changelog only sent WINDOW_SIZE messages |
| | | // check that the replicationServer only sent WINDOW_SIZE messages |
| | | assertTrue(searchUpdateSent()); |
| | | |
| | | int rcvCount=0; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check that the Changelog queue size has correctly been configured |
| | | * Check that the ReplicationServer queue size has correctly been configured |
| | | * by reading the monitoring information. |
| | | * @throws LDAPException |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * Search that the changelog has stopped sending changes after |
| | | * Search that the replicationServer has stopped sending changes after |
| | | * having reach the limit of the window size. |
| | | * And that the number of waiting changes is accurate. |
| | | * Do this by checking the monitoring information. |
| | |
| | | public void setup() throws Exception |
| | | { |
| | | /* |
| | | * - Start a server and a changelog server, configure replication |
| | | * - Start a server and a replicationServer, configure replication |
| | | * - Do some changes. |
| | | */ |
| | | TestCaseUtils.startServer(); |
| | | |
| | | // find a free port for the changelog server |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | int changelogPort = socket.getLocalPort(); |
| | | socket.close(); |
| | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.ChangelogBroker; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.plugin.PersistentServerState; |
| | | import org.opends.server.schema.IntegerSyntax; |
| | | import org.opends.server.core.DeleteOperation; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Open a changelog session to the local Changelog server. |
| | | * Open a replicationServer session to the local ReplicationServer. |
| | | * |
| | | */ |
| | | protected ChangelogBroker openChangelogSession( |
| | | protected ReplicationBroker openChangelogSession( |
| | | final DN baseDn, short serverId, int window_size, |
| | | int port, int timeout, boolean emptyOldChanges) |
| | | throws Exception, SocketException |
| | |
| | | else |
| | | state = new ServerState(); |
| | | |
| | | ChangelogBroker broker = new ChangelogBroker( |
| | | ReplicationBroker broker = new ReplicationBroker( |
| | | state, baseDn, serverId, 0, 0, 0, 0, window_size, 0); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | |
| | | if (timeout != 0) |
| | | broker.setSoTimeout(timeout); |
| | | TestCaseUtils.sleep(100); // give some time to the broker to connect |
| | | // to the changelog server. |
| | | // to the replicationServer. |
| | | if (emptyOldChanges) |
| | | { |
| | | /* |
| | |
| | | } |
| | | |
| | | /** |
| | | * Open a new session to the Changelog Server |
| | | * Open a new session to the ReplicationServer |
| | | * starting with a given ServerState. |
| | | */ |
| | | protected ChangelogBroker openChangelogSession( |
| | | protected ReplicationBroker openChangelogSession( |
| | | final DN baseDn, short serverId, int window_size, |
| | | int port, int timeout, ServerState state) |
| | | throws Exception, SocketException |
| | | { |
| | | ChangelogBroker broker = new ChangelogBroker( |
| | | ReplicationBroker broker = new ReplicationBroker( |
| | | state, baseDn, serverId, 0, 0, 0, 0, window_size, 0); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Open a changelog session with flow control to the local Changelog server. |
| | | * Open a replicationServer session with flow control to the local |
| | | * ReplicationServer. |
| | | * |
| | | */ |
| | | protected ChangelogBroker openChangelogSession( |
| | | protected ReplicationBroker openChangelogSession( |
| | | final DN baseDn, short serverId, int window_size, |
| | | int port, int timeout, int maxSendQueue, int maxRcvQueue, |
| | | boolean emptyOldChanges) |
| | |
| | | else |
| | | state = new ServerState(); |
| | | |
| | | ChangelogBroker broker = new ChangelogBroker( |
| | | ReplicationBroker broker = new ReplicationBroker( |
| | | state, baseDn, serverId, maxRcvQueue, 0, |
| | | maxSendQueue, 0, window_size, 0); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | |
| | | "Unable to add the Multimaster replication plugin"); |
| | | |
| | | |
| | | // Add the changelog server |
| | | // Add the replication server |
| | | DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()), |
| | | "Unable to add the changeLog server"); |
| | |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.plugin.ChangelogBroker; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.protocol.ModifyMsg; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.types.Attribute; |
| | |
| | | TestCaseUtils.startServer(); |
| | | schemaCheck = DirectoryServer.checkSchema(); |
| | | |
| | | // find a free port for the changelog server |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | changelogPort = socket.getLocalPort(); |
| | | socket.close(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Checks that changes done to the schema are pushed to the changelog |
| | | * Checks that changes done to the schema are pushed to the replicationServer |
| | | * clients. |
| | | */ |
| | | @Test() |
| | |
| | | |
| | | final DN baseDn = DN.decode("cn=schema"); |
| | | |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession(baseDn, (short) 2, 100, changelogPort, 5000, true); |
| | | |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Checks that changes to the schema pushed to the changelog |
| | | * Checks that changes to the schema pushed to the replicationServer |
| | | * are received and correctly replayed by replication plugin. |
| | | */ |
| | | @Test(dependsOnMethods = { "pushSchemaChange" }) |
| | |
| | | |
| | | final DN baseDn = DN.decode("cn=schema"); |
| | | |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession(baseDn, (short) 2, 100, changelogPort, 5000, true); |
| | | |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short)2, 0); |
| | |
| | | |
| | | /** |
| | | * Checks that changes done to the schema files are pushed to the |
| | | * Changelog servers and that the ServerState is updated in the schema |
| | | * ReplicationServers and that the ServerState is updated in the schema |
| | | * file. |
| | | * FIXME: This test is disabled because it has side effects. |
| | | * It causes schema tests in org.opends.server.core.AddOperationTestCase |
| | |
| | | |
| | | final DN baseDn = DN.decode("cn=schema"); |
| | | |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession(baseDn, (short) 3, 100, changelogPort, 5000, true); |
| | | |
| | | // create a schema change Notification |
| | |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.replication.plugin.ChangelogBroker; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.types.Attribute; |
| | |
| | | import org.testng.annotations.Test; |
| | | |
| | | /** |
| | | * Stress test for the synchronization code using the ChangelogBroker API. |
| | | * Stress test for the synchronization code using the ReplicationBroker API. |
| | | */ |
| | | public class StressTest extends ReplicationTestCase |
| | | { |
| | |
| | | // WORKAROUND FOR BUG #639 - END - |
| | | |
| | | /** |
| | | * Stress test from LDAP server to client using the ChangelogBroker API. |
| | | * Stress test from LDAP server to client using the ReplicationBroker API. |
| | | */ |
| | | @Test(enabled=true, groups="slow") |
| | | public void fromServertoBroker() throws Exception |
| | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | final int TOTAL_MESSAGES = 1000; |
| | | |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession(baseDn, (short) 18, 100, 8989, 5000, true); |
| | | Monitor monitor = new Monitor("stress test monitor"); |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | |
| | | try { |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | * replicationServer and forwarded to our replicationServer broker session. |
| | | */ |
| | | |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Continuously reads messages from a changelog broker until there is nothing |
| | | * Continuously reads messages from a replicationServer broker until there is nothing |
| | | * left. Count the number of received messages. |
| | | */ |
| | | private class BrokerReader extends Thread |
| | | { |
| | | private ChangelogBroker broker; |
| | | private ReplicationBroker broker; |
| | | private int count = 0; |
| | | private Boolean finished = false; |
| | | |
| | |
| | | * Creates a new Stress Test Reader |
| | | * @param broker |
| | | */ |
| | | public BrokerReader(ChangelogBroker broker) |
| | | public BrokerReader(ReplicationBroker broker) |
| | | { |
| | | this.broker = broker; |
| | | } |
| | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.plugins.ShortCircuitPlugin; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.plugin.ChangelogBroker; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | | import org.opends.server.replication.protocol.HeartbeatThread; |
| | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | /* |
| | | * Open a session to the changelog server using the broker API. |
| | | * Open a session to the replicationServer using the broker API. |
| | | * This must use a different serverId to that of the directory server. |
| | | */ |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true); |
| | | |
| | | |
| | | /* |
| | | * Create a Change number generator to generate new changenumbers |
| | | * when we need to send operation messages to the changelog server. |
| | | * when we need to send operation messages to the replicationServer. |
| | | */ |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0); |
| | | |
| | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | /* |
| | | * Open a session to the changelog server using the broker API. |
| | | * Open a session to the replicationServer using the broker API. |
| | | * This must use a different serverId to that of the directory server. |
| | | */ |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true); |
| | | |
| | | |
| | | /* |
| | | * Create a Change number generator to generate new changenumbers |
| | | * when we need to send operation messages to the changelog server. |
| | | * when we need to send operation messages to the replicationServer. |
| | | */ |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0); |
| | | |
| | |
| | | /** |
| | | * Tests the naming conflict resolution code. |
| | | * In this test, the local server act both as an LDAP server and |
| | | * a changelog server that are inter-connected. |
| | | * a replicationServer that are inter-connected. |
| | | * |
| | | * The test creates an other session to the changelog server using |
| | | * directly the ChangelogBroker API. |
| | | * The test creates an other session to the replicationServer using |
| | | * directly the ReplicationBroker API. |
| | | * It then uses this session to siomulate conflicts and therefore |
| | | * test the naming conflict resolution code. |
| | | */ |
| | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | /* |
| | | * Open a session to the changelog server using the Changelog broker API. |
| | | * Open a session to the replicationServer using the ReplicationServer broker API. |
| | | * This must use a serverId different from the LDAP server ID |
| | | */ |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true); |
| | | |
| | | /* |
| | | * Create a Change number generator to generate new changenumbers |
| | | * when we need to send operations messages to the changelog server. |
| | | * when we need to send operations messages to the replicationServer. |
| | | */ |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0); |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Tests done using directly the ChangelogBroker interface. |
| | | * Tests done using directly the ReplicationBroker interface. |
| | | */ |
| | | @Test(enabled=false, dataProvider="assured") |
| | | public void updateOperations(boolean assured) throws Exception |
| | |
| | | |
| | | cleanRealEntries(); |
| | | |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession(baseDn, (short) 27, 100, 8989, 1000, true); |
| | | try { |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0); |
| | | |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | * replicationServer and forwarded to our replicationServer broker session. |
| | | */ |
| | | |
| | | // Create an Entry (add operation) |
| | |
| | | "The received DELETE message is not for the excepted DN"); |
| | | |
| | | /* |
| | | * Now check that when we send message to the Changelog server |
| | | * Now check that when we send message to the ReplicationServer |
| | | * and that they are received and correctly replayed by the server. |
| | | * |
| | | * Start by testing the Add message reception |
| | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | Thread.sleep(2000); |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession(baseDn, (short) 11, 100, 8989, 1000, true); |
| | | try |
| | | { |
| | |
| | | |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.plugin.ChangelogBroker; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.plugin.Historical; |
| | | import org.opends.server.replication.protocol.ModifyMsg; |
| | | import org.opends.server.TestCaseUtils; |
| | |
| | | DirectoryServer.getAttributeType("entryuuid"); |
| | | |
| | | /* |
| | | * Open a session to the changelog server using the broker API. |
| | | * Open a session to the replicationServer using the broker API. |
| | | * This must use a different serverId to that of the directory server. |
| | | */ |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true); |
| | | |
| | | |
| | |
| | | } |
| | | |
| | | private static |
| | | void publishModify(ChangelogBroker broker, ChangeNumber changeNum, |
| | | void publishModify(ReplicationBroker broker, ChangeNumber changeNum, |
| | | DN dn, String entryuuid, Modification mod) |
| | | { |
| | | List<Modification> mods = new ArrayList<Modification>(1); |
| | |
| | | import org.opends.server.replication.protocol.AckMessage; |
| | | import org.opends.server.replication.protocol.AddContext; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.ChangelogStartMessage; |
| | | import org.opends.server.replication.protocol.ReplServerStartMessage; |
| | | import org.opends.server.replication.protocol.DeleteContext; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | | import org.opends.server.replication.protocol.DoneMessage; |
| | |
| | | |
| | | /** |
| | | * Test that changelogStartMessage encoding and decoding works |
| | | * by checking that : msg == new ChangelogStartMessage(msg.getBytes()). |
| | | * by checking that : msg == new ReplServerStartMessage(msg.getBytes()). |
| | | */ |
| | | @Test(dataProvider="changelogStart") |
| | | public void ChangelogStartMessageTest(short serverId, DN baseDN, int window, |
| | | String url, ServerState state) throws Exception |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ChangelogStartMessage msg = new ChangelogStartMessage(serverId, |
| | | ReplServerStartMessage msg = new ReplServerStartMessage(serverId, |
| | | url, baseDN, window, state); |
| | | ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes()); |
| | | ReplServerStartMessage newMsg = new ReplServerStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ChangelogFakeConfiguration.java |
| | |
| | | |
| | | /** |
| | | * This Class implements an object that can be used to instantiate |
| | | * The Changelog class for tests purpose. |
| | | * The ReplicationServer class for tests purpose. |
| | | */ |
| | | public class ChangelogFakeConfiguration implements ChangelogServerCfg |
| | | public class ReplServerFakeConfiguration implements ChangelogServerCfg |
| | | { |
| | | int port; |
| | | String dirName; |
| | |
| | | int windowSize; |
| | | private SortedSet<String> servers; |
| | | |
| | | public ChangelogFakeConfiguration( |
| | | public ReplServerFakeConfiguration( |
| | | int port, String dirName, int purgeDelay, int serverId, |
| | | int queueSize, int windowSize, SortedSet<String> servers) |
| | | { |
| File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ChangelogTest.java |
| | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.ChangelogBroker; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | | import org.opends.server.replication.protocol.ModifyDNMsg; |
| | | import org.opends.server.replication.protocol.ModifyDnContext; |
| | | import org.opends.server.replication.protocol.ModifyMsg; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.replication.server.Changelog; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | |
| | | import org.testng.annotations.Test; |
| | | |
| | | /** |
| | | * Tests for the changelog service code. |
| | | * Tests for the replicationServer code. |
| | | */ |
| | | |
| | | public class ChangelogTest extends ReplicationTestCase |
| | | public class ReplicationServerTest extends ReplicationTestCase |
| | | { |
| | | /** |
| | | * The changelog server that will be used in this test. |
| | | * The replicationServer that will be used in this test. |
| | | */ |
| | | private Changelog changelog = null; |
| | | private ReplicationServer replicationServer = null; |
| | | |
| | | /** |
| | | * The port of the changelog server. |
| | | * The port of the replicationServer. |
| | | */ |
| | | private int changelogPort; |
| | | |
| | |
| | | |
| | | /** |
| | | * Before starting the tests, start the server and configure a |
| | | * changelog server. |
| | | * replicationServer. |
| | | */ |
| | | @BeforeClass() |
| | | public void configure() throws Exception |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | |
| | | // find a free port for the changelog server |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | changelogPort = socket.getLocalPort(); |
| | | socket.close(); |
| | | |
| | | ChangelogFakeConfiguration conf = |
| | | new ChangelogFakeConfiguration(changelogPort, null, 0, 1, 0, 0, null); |
| | | changelog = new Changelog(conf); |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(changelogPort, null, 0, 1, 0, 0, null); |
| | | replicationServer = new ReplicationServer(conf); |
| | | } |
| | | |
| | | /** |
| | | * Basic test of the changelog code : |
| | | * Connect 2 clients to the changelog server and exchange messages |
| | | * Basic test of the replicationServer code : |
| | | * Connect 2 clients to the replicationServer and exchange messages |
| | | * between the clients. |
| | | * |
| | | * Note : Other tests in this file depends on this test and may need to |
| | |
| | | @Test() |
| | | public void changelogBasic() throws Exception |
| | | { |
| | | ChangelogBroker server1 = null; |
| | | ChangelogBroker server2 = null; |
| | | ReplicationBroker server1 = null; |
| | | ReplicationBroker server2 = null; |
| | | |
| | | try { |
| | | /* |
| | | * Open a sender session and a receiver session to the changelog |
| | | * Open a sender session and a receiver session to the replicationServer |
| | | */ |
| | | server1 = openChangelogSession( |
| | | DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, |
| | |
| | | /* |
| | | * Create a ChangeNumber between firstChangeNumberServer1 and |
| | | * secondChangeNumberServer1 that will not be used to create a |
| | | * change sent to the changelog server but that will be used |
| | | * change sent to the replicationServer but that will be used |
| | | * in the Server State when opening a connection to the |
| | | * Changelog Server to make sure that the Changelog server is |
| | | * ReplicationServer to make sure that the ReplicationServer is |
| | | * able to accept such clients. |
| | | */ |
| | | unknownChangeNumberServer1 = new ChangeNumber(time+1, 1, (short) 1); |
| | |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertTrue(del.toString().equals(msg.toString()), |
| | | "Changelog basic : incorrect message body received."); |
| | | "ReplicationServer basic : incorrect message body received."); |
| | | } |
| | | else |
| | | fail("Changelog basic : incorrect message type received."); |
| | | fail("ReplicationServer basic : incorrect message type received."); |
| | | |
| | | /* |
| | | * Send and receive a second Delete Msg |
| | |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertTrue(del.toString().equals(msg.toString()), |
| | | "Changelog basic : incorrect message body received."); |
| | | "ReplicationServer basic : incorrect message body received."); |
| | | } |
| | | else |
| | | fail("Changelog basic : incorrect message type received."); |
| | | fail("ReplicationServer basic : incorrect message type received."); |
| | | |
| | | /* |
| | | * Send and receive a Delete Msg from server 1 to server 2 |
| | |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertTrue(del.toString().equals(msg.toString()), |
| | | "Changelog basic : incorrect message body received."); |
| | | "ReplicationServer basic : incorrect message body received."); |
| | | } |
| | | else |
| | | fail("Changelog basic : incorrect message type received."); |
| | | fail("ReplicationServer basic : incorrect message type received."); |
| | | |
| | | /* |
| | | * Send and receive a second Delete Msg |
| | |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertTrue(del.toString().equals(msg.toString()), |
| | | "Changelog basic : incorrect message body received."); |
| | | "ReplicationServer basic : incorrect message body received."); |
| | | } |
| | | else |
| | | fail("Changelog basic : incorrect message type received."); |
| | | fail("ReplicationServer basic : incorrect message type received."); |
| | | } |
| | | finally |
| | | { |
| | |
| | | @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) |
| | | public void newClient() throws Exception |
| | | { |
| | | ChangelogBroker broker = null; |
| | | ReplicationBroker broker = null; |
| | | |
| | | try { |
| | | broker = |
| | |
| | | |
| | | ReplicationMessage msg2 = broker.receive(); |
| | | if (!(msg2 instanceof DeleteMsg)) |
| | | fail("Changelog basic transmission failed"); |
| | | fail("ReplicationServer basic transmission failed"); |
| | | else |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | |
| | | private void newClientWithChanges( |
| | | ServerState state, ChangeNumber nextChangeNumber) throws Exception |
| | | { |
| | | ChangelogBroker broker = null; |
| | | ReplicationBroker broker = null; |
| | | |
| | | /* |
| | | * Connect to the changelog server using the state created above. |
| | | * Connect to the replicationServer using the state created above. |
| | | */ |
| | | try { |
| | | broker = |
| | |
| | | |
| | | ReplicationMessage msg2 = broker.receive(); |
| | | if (!(msg2 instanceof DeleteMsg)) |
| | | fail("Changelog basic transmission failed"); |
| | | fail("ReplicationServer basic transmission failed"); |
| | | else |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | |
| | | |
| | | /** |
| | | * Test with a client that has already seen a Change that the |
| | | * Changelog server has not seen. |
| | | * ReplicationServer has not seen. |
| | | */ |
| | | @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) |
| | | public void newClientWithUnknownChanges() throws Exception |
| | |
| | | |
| | | /** |
| | | * Test that newClient() and newClientWithFirstChange() still works |
| | | * after stopping and restarting the changelog server. |
| | | * after stopping and restarting the replicationServer. |
| | | */ |
| | | @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) |
| | | public void stopChangelog() throws Exception |
| | | { |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | configure(); |
| | | newClient(); |
| | | newClientWithFirstChanges(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Stress test from client using the ChangelogBroker API |
| | | * to the changelog server. |
| | | * Stress test from client using the ReplicationBroker API |
| | | * to the replicationServer. |
| | | * This test allow to investigate the behaviour of the |
| | | * Changelog server when it needs to distribute the load of |
| | | * ReplicationServer when it needs to distribute the load of |
| | | * updates from a single LDAP server to a number of LDAP servers. |
| | | * |
| | | * This test i sconfigured by a relatively low stress |
| | |
| | | @Test(enabled=true, groups="slow") |
| | | public void oneWriterMultipleReader() throws Exception |
| | | { |
| | | ChangelogBroker server = null; |
| | | ReplicationBroker server = null; |
| | | int TOTAL_MSG = 1000; // number of messages to send during the test |
| | | int CLIENT_THREADS = 2; // number of threads that will try to read |
| | | // the messages |
| | |
| | | new ChangeNumberGenerator((short)5 , (long) 0); |
| | | |
| | | BrokerReader client[] = new BrokerReader[CLIENT_THREADS]; |
| | | ChangelogBroker clientBroker[] = new ChangelogBroker[CLIENT_THREADS]; |
| | | ReplicationBroker clientBroker[] = new ReplicationBroker[CLIENT_THREADS]; |
| | | |
| | | try |
| | | { |
| | |
| | | |
| | | /* |
| | | * Simple loop creating changes and sending them |
| | | * to the changelog server. |
| | | * to the replicationServer. |
| | | */ |
| | | for (int i = 0; i< TOTAL_MSG; i++) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Stress test from client using the ChangelogBroker API |
| | | * to the changelog server. |
| | | * Stress test from client using the ReplicationBroker API |
| | | * to the replicationServer. |
| | | * |
| | | * This test allow to investigate the behaviour of the |
| | | * Changelog server when it needs to distribute the load of |
| | | * ReplicationServer when it needs to distribute the load of |
| | | * updates from multiple LDAP server to a number of LDAP servers. |
| | | * |
| | | * This test is sconfigured for a relatively low stress |
| | |
| | | @Test(enabled=true, groups="slow") |
| | | public void multipleWriterMultipleReader() throws Exception |
| | | { |
| | | ChangelogBroker server = null; |
| | | ReplicationBroker server = null; |
| | | final int TOTAL_MSG = 1000; // number of messages to send during the test |
| | | final int THREADS = 2; // number of threads that will produce |
| | | // and read the messages. |
| | |
| | | short serverId = (short) (10+i); |
| | | ChangeNumberGenerator gen = |
| | | new ChangeNumberGenerator(serverId , (long) 0); |
| | | ChangelogBroker broker = |
| | | ReplicationBroker broker = |
| | | openChangelogSession( DN.decode("dc=example,dc=com"), serverId, |
| | | 100, changelogPort, 1000, 1000, 0, true); |
| | | |
| | |
| | | |
| | | |
| | | /** |
| | | * Chaining tests of the changelog code with 2 changelog servers involved |
| | | * Chaining tests of the replication Server code with 2 replication servers involved |
| | | * 2 tests are done here (itest=0 or itest=1) |
| | | * |
| | | * Test 1 |
| | | * - Create changelog server 1 |
| | | * - Create changelog server 2 connected with changelog server 1 |
| | | * - Create and connect client 1 to changelog server 1 |
| | | * - Create and connect client 2 to changelog server 2 |
| | | * - Create replication server 1 |
| | | * - Create replication server 2 connected with replication server 1 |
| | | * - Create and connect client 1 to replication server 1 |
| | | * - Create and connect client 2 to replication server 2 |
| | | * - Make client1 publish changes |
| | | * - Check that client 2 receives the changes published by client 1 |
| | | * |
| | | * Test 2 |
| | | * - Create changelog server 1 |
| | | * - Create and connect client1 to changelog server 1 |
| | | * - Create replication server 1 |
| | | * - Create and connect client1 to replication server 1 |
| | | * - Make client1 publish changes |
| | | * - Create changelog server 2 connected with changelog server 1 |
| | | * - Create and connect client 2 to changelog server 2 |
| | | * - Create replication server 2 connected with replication server 1 |
| | | * - Create and connect client 2 to replication server 2 |
| | | * - Check that client 2 receives the changes published by client 1 |
| | | * |
| | | */ |
| | |
| | | { |
| | | for (int itest = 0; itest <2; itest++) |
| | | { |
| | | ChangelogBroker broker2 = null; |
| | | ReplicationBroker broker2 = null; |
| | | boolean emptyOldChanges = true; |
| | | |
| | | // - Create 2 connected changelog servers |
| | | Changelog[] changelogs = new Changelog[2]; |
| | | // - Create 2 connected replicationServer |
| | | ReplicationServer[] changelogs = new ReplicationServer[2]; |
| | | int[] changelogPorts = new int[2]; |
| | | int[] changelogIds = new int[2]; |
| | | short[] brokerIds = new short[2]; |
| | |
| | | { |
| | | changelogs[i] = null; |
| | | |
| | | // for itest=0, create the 2 connected changelog servers |
| | | // for itest=1, create the 1rst changelog server, the second |
| | | // for itest=0, create the 2 connected replicationServer |
| | | // for itest=1, create the 1rst replicationServer, the second |
| | | // one will be created later |
| | | SortedSet<String> servers = new TreeSet<String>(); |
| | | servers.add( |
| | | "localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0])); |
| | | ChangelogFakeConfiguration conf = |
| | | new ChangelogFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0, |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0, |
| | | changelogIds[i], 0, 100, servers); |
| | | changelog = new Changelog(conf); |
| | | replicationServer = new ReplicationServer(conf); |
| | | } |
| | | |
| | | ChangelogBroker broker1 = null; |
| | | ReplicationBroker broker1 = null; |
| | | |
| | | try |
| | | { |
| | |
| | | |
| | | SortedSet<String> servers = new TreeSet<String>(); |
| | | servers.add("localhost:"+changelogPorts[0]); |
| | | ChangelogFakeConfiguration conf = |
| | | new ChangelogFakeConfiguration(changelogPorts[1], null, 0, |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(changelogPorts[1], null, 0, |
| | | changelogIds[1], 0, 0, null); |
| | | changelogs[1] = new Changelog(conf); |
| | | changelogs[1] = new ReplicationServer(conf); |
| | | |
| | | // Connect broker 2 to changelog2 |
| | | broker2 = openChangelogSession(DN.decode("dc=example,dc=com"), |
| | |
| | | } |
| | | else |
| | | { |
| | | fail("Changelog transmission failed: no expected message class."); |
| | | fail("ReplicationServer transmission failed: no expected message class."); |
| | | break; |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * After the tests stop the changelog server. |
| | | * After the tests stop the replicationServer. |
| | | */ |
| | | @AfterClass() |
| | | public void shutdown() throws Exception |
| | | { |
| | | if (changelog != null) |
| | | changelog.shutdown(); |
| | | if (replicationServer != null) |
| | | replicationServer.shutdown(); |
| | | } |
| | | |
| | | /** |
| | | * This class allows to creater reader thread. |
| | | * They continuously reads messages from a changelog broker until |
| | | * They continuously reads messages from a replication broker until |
| | | * there is nothing left. |
| | | * They Count the number of received messages. |
| | | */ |
| | | private class BrokerReader extends Thread |
| | | { |
| | | private ChangelogBroker broker; |
| | | private ReplicationBroker broker; |
| | | |
| | | /** |
| | | * Creates a new Stress Test Reader |
| | | * @param broker |
| | | */ |
| | | public BrokerReader(ChangelogBroker broker) |
| | | public BrokerReader(ReplicationBroker broker) |
| | | { |
| | | this.broker = broker; |
| | | } |
| | |
| | | |
| | | /** |
| | | * This class allows to create writer thread that can |
| | | * be used as producers for the Changelog stress tests. |
| | | * be used as producers for the ReplicationServer stress tests. |
| | | */ |
| | | private class BrokerWriter extends Thread |
| | | { |
| | | int count; |
| | | private ChangelogBroker broker; |
| | | private ReplicationBroker broker; |
| | | ChangeNumberGenerator gen; |
| | | |
| | | public BrokerWriter(ChangelogBroker broker, ChangeNumberGenerator gen, |
| | | public BrokerWriter(ReplicationBroker broker, ChangeNumberGenerator gen, |
| | | int count) |
| | | { |
| | | this.broker = broker; |
| | |
| | | { |
| | | /* |
| | | * Simple loop creating changes and sending them |
| | | * to the changelog server. |
| | | * to the replicationServer. |
| | | */ |
| | | while (count>0) |
| | | { |
| | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | | import org.opends.server.replication.server.Changelog; |
| | | import org.opends.server.replication.server.ChangelogDbEnv; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationDbEnv; |
| | | import org.opends.server.replication.server.DbHandler; |
| | | import org.opends.server.types.DN; |
| | | import org.testng.annotations.Test; |
| | |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | |
| | | // find a free port for the changelog server |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | int changelogPort = socket.getLocalPort(); |
| | | socket.close(); |
| | | |
| | | // configure a Changelog server. |
| | | ChangelogFakeConfiguration conf = |
| | | new ChangelogFakeConfiguration(changelogPort, null, 0, |
| | | // configure a ReplicationServer. |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(changelogPort, null, 0, |
| | | 2, 0, 100, null); |
| | | Changelog changelog = new Changelog(conf); |
| | | ReplicationServer replicationServer = new ReplicationServer(conf); |
| | | |
| | | // create or clean a directory for the dbHandler |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | |
| | | } |
| | | testRoot.mkdirs(); |
| | | |
| | | ChangelogDbEnv dbEnv = new ChangelogDbEnv(path, changelog); |
| | | ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer); |
| | | |
| | | DbHandler handler = |
| | | new DbHandler((short) 1, DN.decode("o=test"), changelog, dbEnv); |
| | | new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv); |
| | | |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0); |
| | | ChangeNumber changeNumber1 = gen.NewChangeNumber(); |
| | |
| | | |
| | | handler.shutdown(); |
| | | dbEnv.shutdown(); |
| | | changelog.shutdown(); |
| | | replicationServer.shutdown(); |
| | | |
| | | TestCaseUtils.deleteDirectory(testRoot); |
| | | } |