These changes are for issue 787:
LDAP server need to detect failure of changelog servers
The synchronization server sends a regular heartbeat message when the session is idle and there are no synchronization updates flowing. The broker attempts to re-establish a connection to the same or alternative sync server when it detects loss of heartbeats.
3 files added
13 files modified
| | |
| | | NAME 'ds-cfg-group-implementation-enabled' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | attributeTypes: ( 1.3.6.1.4.1.26027.1.1.305 |
| | | NAME 'ds-cfg-heartbeat-interval' |
| | | SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.1 |
| | | NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL |
| | | MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled ) |
| | |
| | | $ ds-cfg-synchronization-dn ) |
| | | MAY ( cn $ ds-cfg-receive-status $ ds-cfg-max-receive-queue $ |
| | | ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay $ |
| | | ds-cfg-window-size ) |
| | | ds-cfg-window-size $ ds-cfg-heartbeat-interval ) |
| | | X-ORIGIN 'OpenDS Directory Server' ) |
| | | objectClasses: ( 1.3.6.1.4.1.26027.1.2.59 |
| | | NAME 'ds-cfg-length-based-password-validator' SUP ds-cfg-password-validator |
| | |
| | | } |
| | | |
| | | /** |
| | | * This method manage the connection with the other LDAP servers |
| | | * it periodically that this changelog server is correctly connected |
| | | * to all the other changelog servers and if not attempt to |
| | | * do the connection. |
| | | * This method manages the connection with the other changelog servers. |
| | | * It periodically checks that this changelog server is indeed connected |
| | | * to all the other changelog servers and if not attempts to |
| | | * make the connection. |
| | | */ |
| | | private void runConnect() |
| | | { |
| | |
| | | import org.opends.server.synchronization.protocol.SynchronizationMessage; |
| | | import org.opends.server.synchronization.protocol.UpdateMessage; |
| | | import org.opends.server.synchronization.protocol.WindowMessage; |
| | | import org.opends.server.synchronization.protocol.HeartbeatThread; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | /** |
| | |
| | | // be stopped from sending messsages. |
| | | private int saturationCount = 0; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the synchronization |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | |
| | | /** |
| | | * The thread that will send heartbeats. |
| | | */ |
| | | HeartbeatThread heartbeatThread = null; |
| | | |
| | | private static Map<ChangeNumber, ChangelogAckMessageList> |
| | | changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>(); |
| | | |
| | |
| | | maxReceiveQueue = receivedMsg.getMaxReceiveQueue(); |
| | | maxSendDelay = receivedMsg.getMaxSendDelay(); |
| | | maxSendQueue = receivedMsg.getMaxSendQueue(); |
| | | heartbeatInterval = receivedMsg.getHeartbeatInterval(); |
| | | |
| | | if (maxReceiveQueue > 0) |
| | | restartReceiveQueue = (maxReceiveQueue > 1000 ? |
| | |
| | | maxSendDelay); |
| | | else |
| | | restartSendDelay = 0; |
| | | |
| | | if (heartbeatInterval < 0) |
| | | { |
| | | heartbeatInterval = 0; |
| | | } |
| | | |
| | | serverIsLDAPserver = true; |
| | | |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | | |
| | | // Create a thread to send heartbeat messages. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatThread = new HeartbeatThread("Synchronization Heartbeat", |
| | | session, heartbeatInterval); |
| | | heartbeatThread.start(); |
| | | } |
| | | |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | } |
| | |
| | | msgQueue.notifyAll(); |
| | | } |
| | | |
| | | // Stop the heartbeat thread. |
| | | if (heartbeatThread != null) |
| | | { |
| | | heartbeatThread.shutdown(); |
| | | } |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); |
| | | } |
| | | |
| | |
| | | { |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | |
| | | /** |
| | | * Get our heartbeat interval. |
| | | * @return Our heartbeat interval. |
| | | */ |
| | | public long getHeartbeatInterval() |
| | | { |
| | | return heartbeatInterval; |
| | | } |
| | | } |
| | |
| | | private int timeout = 0; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the synchronization |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | |
| | | |
| | | /** |
| | | * A thread to monitor heartbeats on the session. |
| | | */ |
| | | private HeartbeatMonitor heartbeatMonitor = null; |
| | | |
| | | /** |
| | | * The number of times the connection was lost. |
| | | */ |
| | | private int numLostConnections = 0; |
| | | |
| | | |
| | | /** |
| | | * Creates a new Changelog Broker for a particular SynchronizationDomain. |
| | | * |
| | | * @param state The ServerState that should be used by this broker |
| | |
| | | * the changelog server. |
| | | * @param maxSendDelay The maximum send delay to use on the changelog server. |
| | | * @param window The size of the send and receive window to use. |
| | | * @param heartbeatInterval The interval between heartbeats requested of the |
| | | * changelog server, or zero if no heartbeats are requested. |
| | | */ |
| | | public ChangelogBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay, int window) |
| | | int maxSendDelay, int window, long heartbeatInterval) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.serverID = serverID; |
| | |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window/2; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | |
| | | /** |
| | | * Connect the Changelog server to other servers. |
| | | * Connect to a Changelog server. |
| | | * |
| | | * @throws NumberFormatException address was invalid |
| | | * @throws IOException error during connection phase |
| | |
| | | { |
| | | ChangelogStartMessage startMsg; |
| | | |
| | | // Stop any existing heartbeat monitor from a previous session. |
| | | if (heartbeatMonitor != null) |
| | | { |
| | | heartbeatMonitor.shutdown(); |
| | | heartbeatMonitor = null; |
| | | } |
| | | |
| | | boolean checkState = true; |
| | | while( !connected) |
| | | { |
| | |
| | | /* |
| | | * Send our ServerStartMessage. |
| | | */ |
| | | ServerStartMessage msg = new ServerStartMessage( serverID, baseDn, |
| | | ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | halfRcvWindow*2, state); |
| | | halfRcvWindow*2, heartbeatInterval, state); |
| | | session.publish(msg); |
| | | |
| | | |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Start a heartbeat monitor thread. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatMonitor = |
| | | new HeartbeatMonitor("Synchronization Heartbeat Monitor", session, |
| | | heartbeatInterval); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | | |
| | | |
| | |
| | | */ |
| | | private void reStart(ProtocolSession failingSession) |
| | | { |
| | | numLostConnections++; |
| | | |
| | | try |
| | | { |
| | | failingSession.close(); |
| | |
| | | /** |
| | | * Receive a message. |
| | | * @return the received message |
| | | * @throws SocketTimeoutException if the tiemout set by setSoTimeout |
| | | * @throws SocketTimeoutException if the timeout set by setSoTimeout |
| | | * has expired |
| | | */ |
| | | public SynchronizationMessage receive() throws SocketTimeoutException |
| | |
| | | } |
| | | return msg; |
| | | } |
| | | } catch (SocketTimeoutException e) |
| | | { |
| | | throw e; |
| | | } catch (Exception e) |
| | | { |
| | | if (e instanceof SocketTimeoutException) |
| | | { |
| | | SocketTimeoutException e1 = (SocketTimeoutException) e; |
| | | throw e1; |
| | | } |
| | | if (shutdown == false) |
| | | { |
| | | synchronized (lock) |
| | |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of times the connection was lost. |
| | | * @return The number of times the connection was lost. |
| | | */ |
| | | public int getNumLostConnections() |
| | | { |
| | | return numLostConnections; |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2007 Sun Microsystems, Inc. |
| | | */ |
| | | |
| | | package org.opends.server.synchronization.plugin; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.synchronization.protocol.ProtocolSession; |
| | | import static org.opends.server.loggers.Debug.debugMessage; |
| | | import static org.opends.server.types.DebugLogCategory.SYNCHRONIZATION; |
| | | import static org.opends.server.types.DebugLogSeverity.INFO; |
| | | |
| | | import java.io.IOException; |
| | | |
| | | /** |
| | | * This class implements a thread to monitor heartbeat messages from the |
| | | * synchronization server. Each broker runs one of these threads. |
| | | */ |
| | | public class HeartbeatMonitor extends DirectoryThread |
| | | { |
| | | /** |
| | | * The fully-qualified name of this class for debugging purposes. |
| | | */ |
| | | private static final String CLASS_NAME = |
| | | "org.opends.server.synchronization.plugin.HeartbeatMonitor"; |
| | | |
| | | |
| | | /** |
| | | * The session on which heartbeats are to be monitored. |
| | | */ |
| | | private ProtocolSession session; |
| | | |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the synchronization |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval; |
| | | |
| | | |
| | | /** |
| | | * Set this to stop the thread. |
| | | */ |
| | | private boolean shutdown = false; |
| | | |
| | | |
| | | /** |
| | | * Create a heartbeat monitor thread. |
| | | * @param threadName The name of the heartbeat thread. |
| | | * @param session The session on which heartbeats are to be monitored. |
| | | * @param heartbeatInterval The expected interval between heartbeats in |
| | | * milliseconds. |
| | | */ |
| | | public HeartbeatMonitor(String threadName, ProtocolSession session, |
| | | long heartbeatInterval) |
| | | { |
| | | super(threadName); |
| | | this.session = session; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | } |
| | | |
| | | /** |
| | | * Call this method to stop the thread. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | shutdown = true; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run", |
| | | "Heartbeat monitor is starting, expected interval is " |
| | | + heartbeatInterval); |
| | | try |
| | | { |
| | | while (!shutdown) |
| | | { |
| | | long now = System.currentTimeMillis(); |
| | | long lastReceiveTime = session.getLastReceiveTime(); |
| | | if (now > lastReceiveTime + 2 * heartbeatInterval) |
| | | { |
| | | // Heartbeat is well overdue so the server is assumed to be dead. |
| | | debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run", |
| | | "Heartbeat monitor is closing the broker " + |
| | | "session because it could not detect a " + |
| | | "heartbeat."); |
| | | session.close(); |
| | | break; |
| | | } |
| | | try |
| | | { |
| | | Thread.sleep(heartbeatInterval); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // That's OK. |
| | | } |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // Hope that's OK. |
| | | } |
| | | finally |
| | | { |
| | | debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run", |
| | | "Heartbeat monitor is exiting."); |
| | | } |
| | | } |
| | | } |
| | |
| | | package org.opends.server.synchronization.plugin; |
| | | |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.util.ServerConstants. |
| | | TIME_UNIT_MILLISECONDS_ABBR; |
| | | import static org.opends.server.util.ServerConstants. |
| | | TIME_UNIT_MILLISECONDS_FULL; |
| | | import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR; |
| | | import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL; |
| | | import static org.opends.server.synchronization.common.LogMessages.*; |
| | | import static org.opends.server.synchronization.plugin.Historical.*; |
| | | import static org.opends.server.synchronization.protocol.OperationContext.*; |
| | |
| | | import java.util.List; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | |
| | | import org.opends.server.config.DNConfigAttribute; |
| | | import org.opends.server.config.IntegerConfigAttribute; |
| | | import org.opends.server.config.StringConfigAttribute; |
| | | import org.opends.server.config.IntegerWithUnitConfigAttribute; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.DirectoryServer; |
| | |
| | | private int maxReceiveDelay = 0; |
| | | private int maxSendDelay = 0; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the synchronization |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | |
| | | private short serverId; |
| | | |
| | | private BooleanConfigAttribute receiveStatusStub; |
| | |
| | | static String MAX_SEND_QUEUE = "ds-cfg-max-send-queue"; |
| | | static String MAX_SEND_DELAY = "ds-cfg-max-send-delay"; |
| | | static String WINDOW_SIZE = "ds-cfg-window-size"; |
| | | static String HEARTBEAT_INTERVAL = "ds-cfg-heartbeat-interval"; |
| | | |
| | | private static final StringConfigAttribute changelogStub = |
| | | new StringConfigAttribute(CHANGELOG_SERVER_ATTR, |
| | |
| | | true, false, false); |
| | | |
| | | /** |
| | | * The set of time units that will be used for expressing the heartbeat |
| | | * interval. |
| | | */ |
| | | private static final LinkedHashMap<String,Double> timeUnits = |
| | | new LinkedHashMap<String,Double>(); |
| | | |
| | | |
| | | |
| | | static |
| | | { |
| | | timeUnits.put(TIME_UNIT_MILLISECONDS_ABBR, 1D); |
| | | timeUnits.put(TIME_UNIT_MILLISECONDS_FULL, 1D); |
| | | timeUnits.put(TIME_UNIT_SECONDS_ABBR, 1000D); |
| | | timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Creates a new SynchronizationDomain using configuration from configEntry. |
| | | * |
| | | * @param configEntry The ConfigEntry to use to read the configuration of this |
| | |
| | | configAttributes.add(windowAttr); |
| | | } |
| | | |
| | | IntegerWithUnitConfigAttribute heartbeatStub = |
| | | new IntegerWithUnitConfigAttribute(HEARTBEAT_INTERVAL, |
| | | "heartbeat interval", |
| | | false, timeUnits, true, 0, false, 0); |
| | | IntegerWithUnitConfigAttribute heartbeatAttr = |
| | | (IntegerWithUnitConfigAttribute) |
| | | configEntry.getConfigAttribute(heartbeatStub); |
| | | if (heartbeatAttr == null) |
| | | { |
| | | // Attribute is not present : use the default value |
| | | heartbeatInterval = 1000; |
| | | } |
| | | else |
| | | { |
| | | heartbeatInterval = heartbeatAttr.activeCalculatedValue(); |
| | | configAttributes.add(heartbeatAttr); |
| | | } |
| | | |
| | | configDn = configEntry.getDN(); |
| | | DirectoryServer.registerConfigurableComponent(this); |
| | | |
| | |
| | | try |
| | | { |
| | | broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue, |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay, window); |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay, window, |
| | | heartbeatInterval); |
| | | synchronized (broker) |
| | | { |
| | | broker.start(changelogServers); |
| | |
| | | { |
| | | return broker.getCurrentSendWindow(); |
| | | } |
| | | |
| | | /** |
| | | * Get the number of times the synchronization connection was lost. |
| | | * @return The number of times the synchronization connection was lost. |
| | | */ |
| | | public int getNumLostConnections() |
| | | { |
| | | return broker.getNumLostConnections(); |
| | | } |
| | | } |
| | |
| | | attr = new Attribute("connected-to", domain.getChangelogServer()); |
| | | attributes.add(attr); |
| | | |
| | | /* get number of lost connections */ |
| | | addMonitorData(attributes, "lost-connections", |
| | | domain.getNumLostConnections()); |
| | | |
| | | /* get number of received updates */ |
| | | addMonitorData(attributes, "received-updates", domain.getNumRcvdUpdates()); |
| | | |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2007 Sun Microsystems, Inc. |
| | | */ |
| | | |
| | | package org.opends.server.synchronization.protocol; |
| | | |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | /** |
| | | * This message is sent at regular intervals by the synchronization server |
| | | * when it is sending no other messages. It allows the directory server to |
| | | * detect a problem sooner when a synchronization server has crashed or has |
| | | * been isolated from the network. |
| | | */ |
| | | public class HeartbeatMessage extends SynchronizationMessage |
| | | { |
| | | /** |
| | | * Create a new HeartbeatMessage. |
| | | * |
| | | */ |
| | | public HeartbeatMessage() |
| | | { |
| | | } |
| | | |
| | | /** |
| | | * Creates a new heartbeat message from its encoded form. |
| | | * |
| | | * @param in The byte array containing the encoded form of the message. |
| | | * @throws java.util.zip.DataFormatException If the byte array does not |
| | | * contain a valid encoded form of the message. |
| | | */ |
| | | public HeartbeatMessage(byte[] in) throws DataFormatException |
| | | { |
| | | /* The heartbeat message is encoded in the form : |
| | | * <msg-type> |
| | | */ |
| | | |
| | | /* first byte is the type */ |
| | | if (in.length != 1 || in[0] != MSG_TYPE_HEARTBEAT) |
| | | throw new DataFormatException("Input is not a valid Heartbeat Message."); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public byte[] getBytes() |
| | | { |
| | | /* |
| | | * The heartbeat message contains: |
| | | * <msg-type> |
| | | */ |
| | | int length = 1; |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | | /* put the message type */ |
| | | resultByteArray[0] = MSG_TYPE_HEARTBEAT; |
| | | |
| | | return resultByteArray; |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2007 Sun Microsystems, Inc. |
| | | */ |
| | | |
| | | package org.opends.server.synchronization.protocol; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import static org.opends.server.loggers.Debug.debugMessage; |
| | | import static org.opends.server.types.DebugLogCategory.SYNCHRONIZATION; |
| | | import static org.opends.server.types.DebugLogSeverity.INFO; |
| | | import static org.opends.server.types.DebugLogSeverity.VERBOSE; |
| | | |
| | | import java.io.IOException; |
| | | |
| | | /** |
| | | * This thread publishes a heartbeat message on a given protocol session at |
| | | * regular intervals when there are no other synchronization messages being |
| | | * published. |
| | | */ |
| | | public class HeartbeatThread extends DirectoryThread |
| | | { |
| | | /** |
| | | * The fully-qualified name of this class for debugging purposes. |
| | | */ |
| | | private static final String CLASS_NAME = |
| | | "org.opends.server.synchronization.plugin.HeartbeatThread"; |
| | | |
| | | |
| | | /** |
| | | * For test purposes only to simulate loss of heartbeats. |
| | | */ |
| | | static private boolean heartbeatsDisabled = false; |
| | | |
| | | /** |
| | | * The session on which heartbeats are to be sent. |
| | | */ |
| | | private ProtocolSession session; |
| | | |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats. |
| | | */ |
| | | private long heartbeatInterval; |
| | | |
| | | |
| | | /** |
| | | * Set this to stop the thread. |
| | | */ |
| | | private boolean shutdown = false; |
| | | |
| | | |
| | | /** |
| | | * Create a heartbeat thread. |
| | | * @param threadName The name of the heartbeat thread. |
| | | * @param session The session on which heartbeats are to be sent. |
| | | * @param heartbeatInterval The desired interval between heartbeats in |
| | | * milliseconds. |
| | | */ |
| | | public HeartbeatThread(String threadName, ProtocolSession session, |
| | | long heartbeatInterval) |
| | | { |
| | | super(threadName); |
| | | this.session = session; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | try |
| | | { |
| | | debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run", |
| | | "Heartbeat thread is starting, interval is " + |
| | | heartbeatInterval); |
| | | HeartbeatMessage heartbeatMessage = new HeartbeatMessage(); |
| | | |
| | | while (!shutdown) |
| | | { |
| | | long now = System.currentTimeMillis(); |
| | | debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run", |
| | | "Heartbeat thread awoke at " + now + |
| | | ", last message was sent at " + |
| | | session.getLastPublishTime()); |
| | | |
| | | if (now > session.getLastPublishTime() + heartbeatInterval) |
| | | { |
| | | if (!heartbeatsDisabled) |
| | | { |
| | | debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run", |
| | | "Heartbeat sent at " + now); |
| | | session.publish(heartbeatMessage); |
| | | } |
| | | } |
| | | |
| | | try |
| | | { |
| | | long sleepTime = session.getLastPublishTime() + |
| | | heartbeatInterval - now; |
| | | if (sleepTime <= 0) |
| | | { |
| | | sleepTime = heartbeatInterval; |
| | | } |
| | | |
| | | debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run", |
| | | "Heartbeat thread sleeping for " + sleepTime); |
| | | Thread.sleep(sleepTime); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Keep looping. |
| | | } |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run", |
| | | "Heartbeat thread could not send a heartbeat."); |
| | | // This will be caught in another thread. |
| | | } |
| | | finally |
| | | { |
| | | debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run", |
| | | "Heartbeat thread is exiting."); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Call this method to stop the thread. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | shutdown = true; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * For testing purposes only to simulate loss of heartbeats. |
| | | * @param heartbeatsDisabled Set true to prevent heartbeats from being sent. |
| | | */ |
| | | public static void setHeartbeatsDisabled(boolean heartbeatsDisabled) |
| | | { |
| | | HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled; |
| | | } |
| | | } |
| | |
| | | |
| | | /** |
| | | * This method is called when the session with the remote must be closed. |
| | | * It must |
| | | * This object won't be used anymore after this method is called. |
| | | * This object won't be used anymore after this method is called. |
| | | * |
| | | * @throws IOException If an error happen during the close process. |
| | | */ |
| | |
| | | * such as a TCP error. |
| | | */ |
| | | public abstract void setSoTimeout(int timeout) throws SocketException; |
| | | |
| | | |
| | | |
| | | /** |
| | | * Gets the time the last synchronization message was published on this |
| | | * session. |
| | | * @return The timestamp in milliseconds of the last message published. |
| | | */ |
| | | public abstract long getLastPublishTime(); |
| | | |
| | | |
| | | |
| | | /** |
| | | * Gets the time the last synchronization message was received on this |
| | | * session. |
| | | * @return The timestamp in milliseconds of the last message received. |
| | | */ |
| | | public abstract long getLastReceiveTime(); |
| | | } |
| | |
| | | private ServerState serverState = null; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the synchronization |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | |
| | | /** |
| | | * Create a new ServerStartMessage. |
| | | * |
| | | * @param serverId The serverId of the server for which the ServerStartMessage |
| | |
| | | * @param maxSendDelay The max Send Delay from this server. |
| | | * @param maxSendQueue The max send Queue from this server. |
| | | * @param windowSize The window size used by this server. |
| | | * @param heartbeatInterval The requested heartbeat interval. |
| | | * @param serverState The state of this server. |
| | | */ |
| | | public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay, |
| | | int maxReceiveQueue, int maxSendDelay, |
| | | int maxSendQueue, int windowSize, |
| | | long heartbeatInterval, |
| | | ServerState serverState) |
| | | { |
| | | this.serverId = serverId; |
| | |
| | | this.maxReceiveQueue = maxReceiveQueue; |
| | | this.maxSendDelay = maxSendDelay; |
| | | this.maxSendQueue = maxSendQueue; |
| | | this.serverState = serverState; |
| | | this.windowSize = windowSize; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | |
| | | this.serverState = serverState; |
| | | |
| | | try |
| | | { |
| | |
| | | { |
| | | /* The ServerStartMessage is encoded in the form : |
| | | * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><window><ServerState> |
| | | * <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState> |
| | | */ |
| | | try |
| | | { |
| | |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the heartbeatInterval |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | heartbeatInterval = Integer.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerState |
| | | */ |
| | | serverState = new ServerState(in, pos, in.length-1); |
| | |
| | | /* |
| | | * ServerStartMessage contains. |
| | | * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><windowsize><ServerState> |
| | | * <maxSendDelay><maxSendQueue><windowsize><heartbeatInterval><ServerState> |
| | | */ |
| | | try { |
| | | byte[] byteDn = baseDn.getBytes("UTF-8"); |
| | |
| | | String.valueOf(maxSendQueue).getBytes("UTF-8"); |
| | | byte[] byteWindowSize = |
| | | String.valueOf(windowSize).getBytes("UTF-8"); |
| | | byte[] byteHeartbeatInterval = |
| | | String.valueOf(heartbeatInterval).getBytes("UTF-8"); |
| | | byte[] byteServerState = serverState.getBytes(); |
| | | |
| | | int length = 1 + byteDn.length + 1 + byteServerId.length + 1 + |
| | |
| | | byteMaxSendDelay.length + 1 + |
| | | byteMaxSendQueue.length + 1 + |
| | | byteWindowSize.length + 1 + |
| | | byteHeartbeatInterval.length + 1 + |
| | | byteServerState.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | |
| | | |
| | | pos = addByteArray(byteWindowSize, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteHeartbeatInterval, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteServerState, resultByteArray, pos); |
| | | |
| | | return resultByteArray; |
| | |
| | | { |
| | | return windowSize; |
| | | } |
| | | |
| | | /** |
| | | * Get the heartbeat interval requested by the ldap server that created the |
| | | * message. |
| | | * |
| | | * @return The heartbeat interval requested by the ldap server that created |
| | | * the message. |
| | | */ |
| | | public long getHeartbeatInterval() |
| | | { |
| | | return heartbeatInterval; |
| | | } |
| | | } |
| | |
| | | byte[] rcvLengthBuf = new byte[8]; |
| | | |
| | | /** |
| | | * The time the last message published to this session. |
| | | */ |
| | | private long lastPublishTime = 0; |
| | | |
| | | |
| | | /** |
| | | * The time the last message was received on this session. |
| | | */ |
| | | private long lastReceiveTime = 0; |
| | | |
| | | |
| | | /** |
| | | * Creates a new SocketSession based on the provided socket. |
| | | * |
| | | * @param socket The Socket on which the SocketSession will be based. |
| | |
| | | output.write(sendLengthBuf); |
| | | output.write(buffer); |
| | | output.flush(); |
| | | |
| | | lastPublishTime = System.currentTimeMillis(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | int read = input.read(rcvLengthBuf, length, 8-length); |
| | | if (read == -1) |
| | | { |
| | | throw new IOException("no more data"); |
| | | } |
| | | else |
| | | { |
| | | length += read; |
| | | } |
| | | } |
| | | |
| | | int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16); |
| | |
| | | length = 0; |
| | | byte[] buffer = new byte[totalLength]; |
| | | while (length < totalLength) |
| | | { |
| | | length += input.read(buffer, length, totalLength - length); |
| | | } |
| | | |
| | | lastReceiveTime = System.currentTimeMillis(); |
| | | return SynchronizationMessage.generateMsg(buffer); |
| | | } |
| | | catch (OutOfMemoryError e) |
| | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public long getLastPublishTime() |
| | | { |
| | | return lastPublishTime; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public long getLastReceiveTime() |
| | | { |
| | | return lastReceiveTime; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public String getRemoteAddress() |
| | | { |
| | | return socket.getInetAddress().getHostAddress(); |
| | |
| | | static final byte MSG_TYPE_SERVER_START = 6; |
| | | static final byte MSG_TYPE_CHANGELOG_START = 7; |
| | | static final byte MSG_TYPE_WINDOW = 8; |
| | | static final byte MSG_TYPE_HEARTBEAT = 9; |
| | | |
| | | /** |
| | | * Return the byte[] representation of this message. |
| | |
| | | * MSG_TYPE_ACK |
| | | * MSG_TYPE_SERVER_START |
| | | * MSG_TYPE_CHANGELOG_START |
| | | * MSG_TYPE_WINDOW |
| | | * MSG_TYPE_HEARTBEAT |
| | | * |
| | | * @return the byte[] representation of this message. |
| | | */ |
| | |
| | | case MSG_TYPE_WINDOW: |
| | | msg = new WindowMessage(buffer); |
| | | break; |
| | | case MSG_TYPE_HEARTBEAT: |
| | | msg = new HeartbeatMessage(buffer); |
| | | break; |
| | | default: |
| | | throw new DataFormatException("received message with unknown type"); |
| | | } |
| | |
| | | if (emptyOldChanges) |
| | | state.loadState(); |
| | | ChangelogBroker broker = new ChangelogBroker( |
| | | state, baseDn, serverId, 0, 0, 0, 0, window_size); |
| | | state, baseDn, serverId, 0, 0, 0, 0, window_size, 0); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | | broker.start(servers); |
| | |
| | | throws Exception, SocketException |
| | | { |
| | | ChangelogBroker broker = new ChangelogBroker( |
| | | state, baseDn, serverId, 0, 0, 0, 0, window_size); |
| | | state, baseDn, serverId, 0, 0, 0, 0, window_size, 0); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | | broker.start(servers); |
| | |
| | | |
| | | return broker; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Open a changelog session with flow control to the local Changelog server. |
| | | * |
| | |
| | | if (emptyOldChanges) |
| | | state.loadState(); |
| | | ChangelogBroker broker = new ChangelogBroker( |
| | | state, baseDn, serverId, maxRcvQueue, 0, maxSendQueue, 0, window_size); |
| | | state, baseDn, serverId, maxRcvQueue, 0, |
| | | maxSendQueue, 0, window_size, 0); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | | broker.start(servers); |
| | |
| | | import org.opends.server.synchronization.protocol.ModifyDNMsg; |
| | | import org.opends.server.synchronization.protocol.ModifyMsg; |
| | | import org.opends.server.synchronization.protocol.SynchronizationMessage; |
| | | import org.opends.server.synchronization.protocol.HeartbeatThread; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.DirectoryServer; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Tests whether the synchronization provider fails over when it loses |
| | | * the heartbeat from the synchronization server. |
| | | */ |
| | | @Test(groups = "slow") |
| | | public void lostHeartbeatFailover() throws Exception |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting synchronization test : lostHeartbeatFailover" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | |
| | | /* |
| | | * Open a session to the changelog server using the broker API. |
| | | * This must use a different serverId to that of the directory server. |
| | | */ |
| | | ChangelogBroker broker = |
| | | openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true); |
| | | |
| | | |
| | | /* |
| | | * Create a Change number generator to generate new changenumbers |
| | | * when we need to send operation messages to the changelog server. |
| | | */ |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0); |
| | | |
| | | |
| | | // Create and publish an update message to add an entry. |
| | | AddMsg addMsg = new AddMsg(gen.NewChangeNumber(), |
| | | personWithUUIDEntry.getDN().toString(), |
| | | user1entryUUID, |
| | | baseUUID, |
| | | personWithUUIDEntry.getObjectClassAttribute(), |
| | | personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>()); |
| | | broker.publish(addMsg); |
| | | |
| | | entryList.add(personWithUUIDEntry.getDN()); |
| | | Entry resultEntry; |
| | | |
| | | // Check that the entry has been created in the directory server. |
| | | resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, true); |
| | | assertNotNull(resultEntry, |
| | | "The ADD synchronization message was not replayed"); |
| | | |
| | | // Send a first modify operation message. |
| | | List<Modification> mods = generatemods("telephonenumber", "01 02 45"); |
| | | ModifyMsg modMsg = new ModifyMsg(gen.NewChangeNumber(), |
| | | personWithUUIDEntry.getDN(), mods, |
| | | user1entryUUID); |
| | | broker.publish(modMsg); |
| | | |
| | | // Check that the modify has been replayed. |
| | | boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(), |
| | | "telephonenumber", "01 02 45", 10000, true); |
| | | if (!found) |
| | | { |
| | | fail("The first modification was not replayed."); |
| | | } |
| | | |
| | | // Simulate loss of heartbeats. |
| | | HeartbeatThread.setHeartbeatsDisabled(true); |
| | | Thread.sleep(3000); |
| | | HeartbeatThread.setHeartbeatsDisabled(false); |
| | | |
| | | // Send a second modify operation message. |
| | | mods = generatemods("description", "Description was changed"); |
| | | modMsg = new ModifyMsg(gen.NewChangeNumber(), |
| | | personWithUUIDEntry.getDN(), mods, |
| | | user1entryUUID); |
| | | broker.publish(modMsg); |
| | | |
| | | // Check that the modify has been replayed. |
| | | found = checkEntryHasAttribute(personWithUUIDEntry.getDN(), |
| | | "description", "Description was changed", |
| | | 10000, true); |
| | | if (!found) |
| | | { |
| | | fail("The second modification was not replayed."); |
| | | } |
| | | |
| | | // Delete the entries to clean the database. |
| | | DeleteMsg delMsg = |
| | | new DeleteMsg(personWithUUIDEntry.getDN().toString(), |
| | | gen.NewChangeNumber(), user1entryUUID); |
| | | broker.publish(delMsg); |
| | | resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, false); |
| | | |
| | | // Check that the delete operation has been applied. |
| | | assertNull(resultEntry, |
| | | "The DELETE synchronization message was not replayed"); |
| | | broker.stop(); |
| | | } |
| | | |
| | | /** |
| | | * Tests the naming conflict resolution code. |
| | | * In this test, the local server act both as an LDAP server and |
| | | * a changelog server that are inter-connected. |
| | |
| | | import org.testng.annotations.Test; |
| | | import static org.testng.Assert.*; |
| | | |
| | | import org.opends.server.api.ClientConnection; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.DirectoryServer; |
| | |
| | | import org.opends.server.synchronization.common.ChangeNumber; |
| | | import org.opends.server.synchronization.common.ServerState; |
| | | import org.opends.server.synchronization.plugin.PendingChange; |
| | | import org.opends.server.synchronization.protocol.AckMessage; |
| | | import org.opends.server.synchronization.protocol.AddMsg; |
| | | import org.opends.server.synchronization.protocol.ChangelogStartMessage; |
| | | import org.opends.server.synchronization.protocol.DeleteMsg; |
| | | import org.opends.server.synchronization.protocol.ModifyDNMsg; |
| | | import org.opends.server.synchronization.protocol.ModifyMsg; |
| | | import org.opends.server.synchronization.protocol.ServerStartMessage; |
| | | import org.opends.server.synchronization.protocol.SynchronizationMessage; |
| | | import org.opends.server.synchronization.protocol.UpdateMessage; |
| | | import org.opends.server.synchronization.protocol.WindowMessage; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.ModificationType; |
| | |
| | | throws Exception |
| | | { |
| | | DN dn = DN.decode(rawdn); |
| | | ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid");; |
| | | ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid"); |
| | | |
| | | // Check uuid |
| | | assertEquals("fakeuniqueid", msg.getUniqueId()); |
| | |
| | | |
| | | /** |
| | | * Build some data for the DeleteMsg test below. |
| | | * @throws DirectoryException |
| | | */ |
| | | @DataProvider(name = "deleteEncodeDecode") |
| | | public Object[][] createDelData() { |
| | |
| | | //Create an Add operation and generate and Add msg from it |
| | | DN dn = DN.decode(rawDN); |
| | | |
| | | addOp = new AddOperation((ClientConnection) connection, |
| | | addOp = new AddOperation(connection, |
| | | (long) 1, 1, null, dn, objectClassList, userAttList, opList); |
| | | OperationContext opCtx = new AddContext(cn, "thisIsaUniqueID", |
| | | "parentUniqueId"); |
| | |
| | | |
| | | /** |
| | | * Build some data for the AckMsg test below. |
| | | * @throws DirectoryException |
| | | */ |
| | | @DataProvider(name = "ackMsg") |
| | | public Object[][] createAckData() { |
| | |
| | | // Check that retrieved CN is OK |
| | | msg2 = (AckMessage) SynchronizationMessage.generateMsg(msg1.getBytes()); |
| | | } |
| | | |
| | | |
| | | @DataProvider(name="serverStart") |
| | | public Object [][] createServerStartMessageTestData() throws Exception |
| | | { |
| | |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ServerStartMessage msg = new ServerStartMessage(serverId, baseDN, |
| | | window, window, window, window, window, state); |
| | | window, window, window, window, window, window, state); |
| | | ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); |
| | | assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay()); |
| | | assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue()); |
| | | assertEquals(msg.getMaxSendDelay(), newMsg.getMaxSendDelay()); |
| | | assertEquals(msg.getMaxSendQueue(), newMsg.getMaxSendQueue()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval()); |
| | | assertEquals(msg.getServerState().getMaxChangeNumber((short)1), |
| | | newMsg.getServerState().getMaxChangeNumber((short)1)); |
| | | } |
| | | |
| | | |
| | | @DataProvider(name="changelogStart") |
| | | public Object [][] createChangelogStartMessageTestData() throws Exception |
| | | { |
| | |
| | | ServerState state = new ServerState(); |
| | | return new Object [][] { {(short)1, baseDN, 100, "localhost:8989", state} }; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Test that changelogStartMessage encoding and decoding works |
| | | * by checking that : msg == new ChangelogStartMessage(msg.getBytes()). |
| | |
| | | String url, ServerState state) throws Exception |
| | | { |
| | | state.update(new ChangeNumber((long)1, 1,(short)1)); |
| | | ChangelogStartMessage msg = new ChangelogStartMessage(serverId, |
| | | ChangelogStartMessage msg = new ChangelogStartMessage(serverId, |
| | | url, baseDN, window, state); |
| | | ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes()); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | |
| | | assertEquals(msg.getServerState().getMaxChangeNumber((short)1), |
| | | newMsg.getServerState().getMaxChangeNumber((short)1)); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Test that WindowMessageTest encoding and decoding works |
| | | * by checking that : msg == new WindowMessageTest(msg.getBytes()). |