From 11859d9a6e466bab4ab73e1e46d092c6052acf68 Mon Sep 17 00:00:00 2001
From: coulbeck <coulbeck@localhost>
Date: Fri, 02 Feb 2007 21:50:10 +0000
Subject: [PATCH] These changes are for issue 787: LDAP server need to detect failure of changelog servers
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java | 40 ++
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java | 8
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java | 9
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java | 4
opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java | 21 +
opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java | 38 ++
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java | 66 ++++
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java | 44 +++
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java | 64 ++++
opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatMessage.java | 85 ++++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java | 95 ++++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java | 36 -
opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatThread.java | 171 ++++++++++++
opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java | 139 +++++++++
opendj-sdk/opends/resource/schema/02-config.ldif | 6
opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java | 6
16 files changed, 784 insertions(+), 48 deletions(-)
diff --git a/opendj-sdk/opends/resource/schema/02-config.ldif b/opendj-sdk/opends/resource/schema/02-config.ldif
index e2f2eef..b7605d5 100644
--- a/opendj-sdk/opends/resource/schema/02-config.ldif
+++ b/opendj-sdk/opends/resource/schema/02-config.ldif
@@ -1038,6 +1038,10 @@
NAME 'ds-cfg-group-implementation-enabled'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.305
+ NAME 'ds-cfg-heartbeat-interval'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL
MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled )
@@ -1296,7 +1300,7 @@
$ ds-cfg-synchronization-dn )
MAY ( cn $ ds-cfg-receive-status $ ds-cfg-max-receive-queue $
ds-cfg-max-receive-delay $ ds-cfg-max-send-queue $ ds-cfg-max-send-delay $
- ds-cfg-window-size )
+ ds-cfg-window-size $ ds-cfg-heartbeat-interval )
X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.59
NAME 'ds-cfg-length-based-password-validator' SUP ds-cfg-password-validator
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index bac26ee..1b61fec 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -389,10 +389,10 @@
}
/**
- * This method manage the connection with the other LDAP servers
- * it periodically that this changelog server is correctly connected
- * to all the other changelog servers and if not attempt to
- * do the connection.
+ * This method manages the connection with the other changelog servers.
+ * It periodically checks that this changelog server is indeed connected
+ * to all the other changelog servers and if not attempts to
+ * make the connection.
*/
private void runConnect()
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
index 6c74065..27a6be2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/ServerHandler.java
@@ -61,6 +61,7 @@
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
import org.opends.server.synchronization.protocol.WindowMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.util.TimeThread;
/**
@@ -108,6 +109,17 @@
// be stopped from sending messsages.
private int saturationCount = 0;
+ /**
+ * The time in milliseconds between heartbeats from the synchronization
+ * server. Zero means heartbeats are off.
+ */
+ private long heartbeatInterval = 0;
+
+ /**
+ * The thread that will send heartbeats.
+ */
+ HeartbeatThread heartbeatThread = null;
+
private static Map<ChangeNumber, ChangelogAckMessageList>
changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
@@ -173,6 +185,7 @@
maxReceiveQueue = receivedMsg.getMaxReceiveQueue();
maxSendDelay = receivedMsg.getMaxSendDelay();
maxSendQueue = receivedMsg.getMaxSendQueue();
+ heartbeatInterval = receivedMsg.getHeartbeatInterval();
if (maxReceiveQueue > 0)
restartReceiveQueue = (maxReceiveQueue > 1000 ?
@@ -199,6 +212,12 @@
maxSendDelay);
else
restartSendDelay = 0;
+
+ if (heartbeatInterval < 0)
+ {
+ heartbeatInterval = 0;
+ }
+
serverIsLDAPserver = true;
changelogCache = changelog.getChangelogCache(this.baseDn);
@@ -256,6 +275,16 @@
reader.start();
writer.start();
+
+ // Create a thread to send heartbeat messages.
+ if (heartbeatInterval > 0)
+ {
+ heartbeatThread = new HeartbeatThread("Synchronization Heartbeat",
+ session, heartbeatInterval);
+ heartbeatThread.start();
+ }
+
+
DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
DirectoryServer.registerMonitorProvider(this);
}
@@ -853,6 +882,12 @@
msgQueue.notifyAll();
}
+ // Stop the heartbeat thread.
+ if (heartbeatThread != null)
+ {
+ heartbeatThread.shutdown();
+ }
+
DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
}
@@ -1225,4 +1260,13 @@
{
sendWindow.release(windowMsg.getNumAck());
}
+
+ /**
+ * Get our heartbeat interval.
+ * @return Our heartbeat interval.
+ */
+ public long getHeartbeatInterval()
+ {
+ return heartbeatInterval;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
index 5a7d784..5239d75 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/ChangelogBroker.java
@@ -94,6 +94,24 @@
private int timeout = 0;
/**
+ * The time in milliseconds between heartbeats from the synchronization
+ * server. Zero means heartbeats are off.
+ */
+ private long heartbeatInterval = 0;
+
+
+ /**
+ * A thread to monitor heartbeats on the session.
+ */
+ private HeartbeatMonitor heartbeatMonitor = null;
+
+ /**
+ * The number of times the connection was lost.
+ */
+ private int numLostConnections = 0;
+
+
+ /**
* Creates a new Changelog Broker for a particular SynchronizationDomain.
*
* @param state The ServerState that should be used by this broker
@@ -110,10 +128,12 @@
* the changelog server.
* @param maxSendDelay The maximum send delay to use on the changelog server.
* @param window The size of the send and receive window to use.
+ * @param heartbeatInterval The interval between heartbeats requested of the
+ * changelog server, or zero if no heartbeats are requested.
*/
public ChangelogBroker(ServerState state, DN baseDn, short serverID,
int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
- int maxSendDelay, int window)
+ int maxSendDelay, int window, long heartbeatInterval)
{
this.baseDn = baseDn;
this.serverID = serverID;
@@ -127,6 +147,7 @@
this.rcvWindow = window;
this.maxRcvWindow = window;
this.halfRcvWindow = window/2;
+ this.heartbeatInterval = heartbeatInterval;
}
/**
@@ -157,7 +178,7 @@
/**
- * Connect the Changelog server to other servers.
+ * Connect to a Changelog server.
*
* @throws NumberFormatException address was invalid
* @throws IOException error during connection phase
@@ -166,6 +187,13 @@
{
ChangelogStartMessage startMsg;
+ // Stop any existing heartbeat monitor from a previous session.
+ if (heartbeatMonitor != null)
+ {
+ heartbeatMonitor.shutdown();
+ heartbeatMonitor = null;
+ }
+
boolean checkState = true;
while( !connected)
{
@@ -191,9 +219,9 @@
/*
* Send our ServerStartMessage.
*/
- ServerStartMessage msg = new ServerStartMessage( serverID, baseDn,
+ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
- halfRcvWindow*2, state);
+ halfRcvWindow*2, heartbeatInterval, state);
session.publish(msg);
@@ -369,6 +397,15 @@
}
}
}
+
+ // Start a heartbeat monitor thread.
+ if (heartbeatInterval > 0)
+ {
+ heartbeatMonitor =
+ new HeartbeatMonitor("Synchronization Heartbeat Monitor", session,
+ heartbeatInterval);
+ heartbeatMonitor.start();
+ }
}
@@ -379,6 +416,8 @@
*/
private void reStart(ProtocolSession failingSession)
{
+ numLostConnections++;
+
try
{
failingSession.close();
@@ -445,7 +484,7 @@
/**
* Receive a message.
* @return the received message
- * @throws SocketTimeoutException if the tiemout set by setSoTimeout
+ * @throws SocketTimeoutException if the timeout set by setSoTimeout
* has expired
*/
public SynchronizationMessage receive() throws SocketTimeoutException
@@ -474,13 +513,11 @@
}
return msg;
}
+ } catch (SocketTimeoutException e)
+ {
+ throw e;
} catch (Exception e)
{
- if (e instanceof SocketTimeoutException)
- {
- SocketTimeoutException e1 = (SocketTimeoutException) e;
- throw e1;
- }
if (shutdown == false)
{
synchronized (lock)
@@ -631,4 +668,13 @@
else
return 0;
}
+
+ /**
+ * Get the number of times the connection was lost.
+ * @return The number of times the connection was lost.
+ */
+ public int getNumLostConnections()
+ {
+ return numLostConnections;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
new file mode 100644
index 0000000..2065217
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/HeartbeatMonitor.java
@@ -0,0 +1,139 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.synchronization.plugin;
+
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.synchronization.protocol.ProtocolSession;
+import static org.opends.server.loggers.Debug.debugMessage;
+import static org.opends.server.types.DebugLogCategory.SYNCHRONIZATION;
+import static org.opends.server.types.DebugLogSeverity.INFO;
+
+import java.io.IOException;
+
+/**
+ * This class implements a thread to monitor heartbeat messages from the
+ * synchronization server. Each broker runs one of these threads.
+ */
+public class HeartbeatMonitor extends DirectoryThread
+{
+ /**
+ * The fully-qualified name of this class for debugging purposes.
+ */
+ private static final String CLASS_NAME =
+ "org.opends.server.synchronization.plugin.HeartbeatMonitor";
+
+
+ /**
+ * The session on which heartbeats are to be monitored.
+ */
+ private ProtocolSession session;
+
+
+ /**
+ * The time in milliseconds between heartbeats from the synchronization
+ * server. Zero means heartbeats are off.
+ */
+ private long heartbeatInterval;
+
+
+ /**
+ * Set this to stop the thread.
+ */
+ private boolean shutdown = false;
+
+
+ /**
+ * Create a heartbeat monitor thread.
+ * @param threadName The name of the heartbeat thread.
+ * @param session The session on which heartbeats are to be monitored.
+ * @param heartbeatInterval The expected interval between heartbeats in
+ * milliseconds.
+ */
+ public HeartbeatMonitor(String threadName, ProtocolSession session,
+ long heartbeatInterval)
+ {
+ super(threadName);
+ this.session = session;
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ /**
+ * Call this method to stop the thread.
+ */
+ public void shutdown()
+ {
+ shutdown = true;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run()
+ {
+ debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+ "Heartbeat monitor is starting, expected interval is "
+ + heartbeatInterval);
+ try
+ {
+ while (!shutdown)
+ {
+ long now = System.currentTimeMillis();
+ long lastReceiveTime = session.getLastReceiveTime();
+ if (now > lastReceiveTime + 2 * heartbeatInterval)
+ {
+ // Heartbeat is well overdue so the server is assumed to be dead.
+ debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+ "Heartbeat monitor is closing the broker " +
+ "session because it could not detect a " +
+ "heartbeat.");
+ session.close();
+ break;
+ }
+ try
+ {
+ Thread.sleep(heartbeatInterval);
+ }
+ catch (InterruptedException e)
+ {
+ // That's OK.
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ // Hope that's OK.
+ }
+ finally
+ {
+ debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+ "Heartbeat monitor is exiting.");
+ }
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index 6bb188d..1369bed 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -27,6 +27,12 @@
package org.opends.server.synchronization.plugin;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.util.ServerConstants.
+ TIME_UNIT_MILLISECONDS_ABBR;
+import static org.opends.server.util.ServerConstants.
+ TIME_UNIT_MILLISECONDS_FULL;
+import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
+import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
import static org.opends.server.synchronization.common.LogMessages.*;
import static org.opends.server.synchronization.plugin.Historical.*;
import static org.opends.server.synchronization.protocol.OperationContext.*;
@@ -40,6 +46,7 @@
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
@@ -52,6 +59,7 @@
import org.opends.server.config.DNConfigAttribute;
import org.opends.server.config.IntegerConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
+import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -123,6 +131,12 @@
private int maxReceiveDelay = 0;
private int maxSendDelay = 0;
+ /**
+ * The time in milliseconds between heartbeats from the synchronization
+ * server. Zero means heartbeats are off.
+ */
+ private long heartbeatInterval = 0;
+
private short serverId;
private BooleanConfigAttribute receiveStatusStub;
@@ -151,6 +165,7 @@
static String MAX_SEND_QUEUE = "ds-cfg-max-send-queue";
static String MAX_SEND_DELAY = "ds-cfg-max-send-delay";
static String WINDOW_SIZE = "ds-cfg-window-size";
+ static String HEARTBEAT_INTERVAL = "ds-cfg-heartbeat-interval";
private static final StringConfigAttribute changelogStub =
new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
@@ -165,6 +180,25 @@
true, false, false);
/**
+ * The set of time units that will be used for expressing the heartbeat
+ * interval.
+ */
+ private static final LinkedHashMap<String,Double> timeUnits =
+ new LinkedHashMap<String,Double>();
+
+
+
+ static
+ {
+ timeUnits.put(TIME_UNIT_MILLISECONDS_ABBR, 1D);
+ timeUnits.put(TIME_UNIT_MILLISECONDS_FULL, 1D);
+ timeUnits.put(TIME_UNIT_SECONDS_ABBR, 1000D);
+ timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
+ }
+
+
+
+ /**
* Creates a new SynchronizationDomain using configuration from configEntry.
*
* @param configEntry The ConfigEntry to use to read the configuration of this
@@ -302,6 +336,24 @@
configAttributes.add(windowAttr);
}
+ IntegerWithUnitConfigAttribute heartbeatStub =
+ new IntegerWithUnitConfigAttribute(HEARTBEAT_INTERVAL,
+ "heartbeat interval",
+ false, timeUnits, true, 0, false, 0);
+ IntegerWithUnitConfigAttribute heartbeatAttr =
+ (IntegerWithUnitConfigAttribute)
+ configEntry.getConfigAttribute(heartbeatStub);
+ if (heartbeatAttr == null)
+ {
+ // Attribute is not present : use the default value
+ heartbeatInterval = 1000;
+ }
+ else
+ {
+ heartbeatInterval = heartbeatAttr.activeCalculatedValue();
+ configAttributes.add(heartbeatAttr);
+ }
+
configDn = configEntry.getDN();
DirectoryServer.registerConfigurableComponent(this);
@@ -315,7 +367,8 @@
try
{
broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue,
- maxReceiveDelay, maxSendQueue, maxSendDelay, window);
+ maxReceiveDelay, maxSendQueue, maxSendDelay, window,
+ heartbeatInterval);
synchronized (broker)
{
broker.start(changelogServers);
@@ -1762,4 +1815,13 @@
{
return broker.getCurrentSendWindow();
}
+
+ /**
+ * Get the number of times the synchronization connection was lost.
+ * @return The number of times the synchronization connection was lost.
+ */
+ public int getNumLostConnections()
+ {
+ return broker.getNumLostConnections();
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java
index 1b8ae4b..5a1dc30 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationMonitor.java
@@ -97,6 +97,10 @@
attr = new Attribute("connected-to", domain.getChangelogServer());
attributes.add(attr);
+ /* get number of lost connections */
+ addMonitorData(attributes, "lost-connections",
+ domain.getNumLostConnections());
+
/* get number of received updates */
addMonitorData(attributes, "received-updates", domain.getNumRcvdUpdates());
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatMessage.java
new file mode 100644
index 0000000..a6d3cc4
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatMessage.java
@@ -0,0 +1,85 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.synchronization.protocol;
+
+import java.util.zip.DataFormatException;
+
+/**
+ * This message is sent at regular intervals by the synchronization server
+ * when it is sending no other messages. It allows the directory server to
+ * detect a problem sooner when a synchronization server has crashed or has
+ * been isolated from the network.
+ */
+public class HeartbeatMessage extends SynchronizationMessage
+{
+ /**
+ * Create a new HeartbeatMessage.
+ *
+ */
+ public HeartbeatMessage()
+ {
+ }
+
+ /**
+ * Creates a new heartbeat message from its encoded form.
+ *
+ * @param in The byte array containing the encoded form of the message.
+ * @throws java.util.zip.DataFormatException If the byte array does not
+ * contain a valid encoded form of the message.
+ */
+ public HeartbeatMessage(byte[] in) throws DataFormatException
+ {
+ /* The heartbeat message is encoded in the form :
+ * <msg-type>
+ */
+
+ /* first byte is the type */
+ if (in.length != 1 || in[0] != MSG_TYPE_HEARTBEAT)
+ throw new DataFormatException("Input is not a valid Heartbeat Message.");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ /*
+ * The heartbeat message contains:
+ * <msg-type>
+ */
+ int length = 1;
+ byte[] resultByteArray = new byte[length];
+
+ /* put the message type */
+ resultByteArray[0] = MSG_TYPE_HEARTBEAT;
+
+ return resultByteArray;
+ }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatThread.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatThread.java
new file mode 100644
index 0000000..1121dad
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/HeartbeatThread.java
@@ -0,0 +1,171 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.synchronization.protocol;
+
+import org.opends.server.api.DirectoryThread;
+import static org.opends.server.loggers.Debug.debugMessage;
+import static org.opends.server.types.DebugLogCategory.SYNCHRONIZATION;
+import static org.opends.server.types.DebugLogSeverity.INFO;
+import static org.opends.server.types.DebugLogSeverity.VERBOSE;
+
+import java.io.IOException;
+
+/**
+ * This thread publishes a heartbeat message on a given protocol session at
+ * regular intervals when there are no other synchronization messages being
+ * published.
+ */
+public class HeartbeatThread extends DirectoryThread
+{
+ /**
+ * The fully-qualified name of this class for debugging purposes.
+ */
+ private static final String CLASS_NAME =
+ "org.opends.server.synchronization.plugin.HeartbeatThread";
+
+
+ /**
+ * For test purposes only to simulate loss of heartbeats.
+ */
+ static private boolean heartbeatsDisabled = false;
+
+ /**
+ * The session on which heartbeats are to be sent.
+ */
+ private ProtocolSession session;
+
+
+ /**
+ * The time in milliseconds between heartbeats.
+ */
+ private long heartbeatInterval;
+
+
+ /**
+ * Set this to stop the thread.
+ */
+ private boolean shutdown = false;
+
+
+ /**
+ * Create a heartbeat thread.
+ * @param threadName The name of the heartbeat thread.
+ * @param session The session on which heartbeats are to be sent.
+ * @param heartbeatInterval The desired interval between heartbeats in
+ * milliseconds.
+ */
+ public HeartbeatThread(String threadName, ProtocolSession session,
+ long heartbeatInterval)
+ {
+ super(threadName);
+ this.session = session;
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run()
+ {
+ try
+ {
+ debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+ "Heartbeat thread is starting, interval is " +
+ heartbeatInterval);
+ HeartbeatMessage heartbeatMessage = new HeartbeatMessage();
+
+ while (!shutdown)
+ {
+ long now = System.currentTimeMillis();
+ debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run",
+ "Heartbeat thread awoke at " + now +
+ ", last message was sent at " +
+ session.getLastPublishTime());
+
+ if (now > session.getLastPublishTime() + heartbeatInterval)
+ {
+ if (!heartbeatsDisabled)
+ {
+ debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run",
+ "Heartbeat sent at " + now);
+ session.publish(heartbeatMessage);
+ }
+ }
+
+ try
+ {
+ long sleepTime = session.getLastPublishTime() +
+ heartbeatInterval - now;
+ if (sleepTime <= 0)
+ {
+ sleepTime = heartbeatInterval;
+ }
+
+ debugMessage(SYNCHRONIZATION, VERBOSE, CLASS_NAME, "run",
+ "Heartbeat thread sleeping for " + sleepTime);
+ Thread.sleep(sleepTime);
+ }
+ catch (InterruptedException e)
+ {
+ // Keep looping.
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+ "Heartbeat thread could not send a heartbeat.");
+ // This will be caught in another thread.
+ }
+ finally
+ {
+ debugMessage(SYNCHRONIZATION, INFO, CLASS_NAME, "run",
+ "Heartbeat thread is exiting.");
+ }
+ }
+
+
+ /**
+ * Call this method to stop the thread.
+ */
+ public void shutdown()
+ {
+ shutdown = true;
+ }
+
+
+ /**
+ * For testing purposes only to simulate loss of heartbeats.
+ * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
+ */
+ public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
+ {
+ HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled;
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java
index 7da612d..947243c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ProtocolSession.java
@@ -43,8 +43,7 @@
/**
* This method is called when the session with the remote must be closed.
- * It must
- * This object won't be used anymore after this method is called.
+ * This object won't be used anymore after this method is called.
*
* @throws IOException If an error happen during the close process.
*/
@@ -103,4 +102,22 @@
* such as a TCP error.
*/
public abstract void setSoTimeout(int timeout) throws SocketException;
+
+
+
+ /**
+ * Gets the time the last synchronization message was published on this
+ * session.
+ * @return The timestamp in milliseconds of the last message published.
+ */
+ public abstract long getLastPublishTime();
+
+
+
+ /**
+ * Gets the time the last synchronization message was received on this
+ * session.
+ * @return The timestamp in milliseconds of the last message received.
+ */
+ public abstract long getLastReceiveTime();
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java
index 48b6c74..97cf84c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/ServerStartMessage.java
@@ -57,6 +57,12 @@
private ServerState serverState = null;
/**
+ * The time in milliseconds between heartbeats from the synchronization
+ * server. Zero means heartbeats are off.
+ */
+ private long heartbeatInterval = 0;
+
+ /**
* Create a new ServerStartMessage.
*
* @param serverId The serverId of the server for which the ServerStartMessage
@@ -67,11 +73,13 @@
* @param maxSendDelay The max Send Delay from this server.
* @param maxSendQueue The max send Queue from this server.
* @param windowSize The window size used by this server.
+ * @param heartbeatInterval The requested heartbeat interval.
* @param serverState The state of this server.
*/
public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
int maxReceiveQueue, int maxSendDelay,
int maxSendQueue, int windowSize,
+ long heartbeatInterval,
ServerState serverState)
{
this.serverId = serverId;
@@ -80,8 +88,10 @@
this.maxReceiveQueue = maxReceiveQueue;
this.maxSendDelay = maxSendDelay;
this.maxSendQueue = maxSendQueue;
- this.serverState = serverState;
this.windowSize = windowSize;
+ this.heartbeatInterval = heartbeatInterval;
+
+ this.serverState = serverState;
try
{
@@ -105,7 +115,7 @@
{
/* The ServerStartMessage is encoded in the form :
* <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
- * <maxSendDelay><maxSendQueue><window><ServerState>
+ * <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState>
*/
try
{
@@ -173,6 +183,13 @@
pos += length +1;
/*
+ * read the heartbeatInterval
+ */
+ length = getNextLength(in, pos);
+ heartbeatInterval = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length +1;
+
+ /*
* read the ServerState
*/
serverState = new ServerState(in, pos, in.length-1);
@@ -269,7 +286,7 @@
/*
* ServerStartMessage contains.
* <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
- * <maxSendDelay><maxSendQueue><windowsize><ServerState>
+ * <maxSendDelay><maxSendQueue><windowsize><heartbeatInterval><ServerState>
*/
try {
byte[] byteDn = baseDn.getBytes("UTF-8");
@@ -285,6 +302,8 @@
String.valueOf(maxSendQueue).getBytes("UTF-8");
byte[] byteWindowSize =
String.valueOf(windowSize).getBytes("UTF-8");
+ byte[] byteHeartbeatInterval =
+ String.valueOf(heartbeatInterval).getBytes("UTF-8");
byte[] byteServerState = serverState.getBytes();
int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
@@ -294,6 +313,7 @@
byteMaxSendDelay.length + 1 +
byteMaxSendQueue.length + 1 +
byteWindowSize.length + 1 +
+ byteHeartbeatInterval.length + 1 +
byteServerState.length + 1;
byte[] resultByteArray = new byte[length];
@@ -318,6 +338,8 @@
pos = addByteArray(byteWindowSize, resultByteArray, pos);
+ pos = addByteArray(byteHeartbeatInterval, resultByteArray, pos);
+
pos = addByteArray(byteServerState, resultByteArray, pos);
return resultByteArray;
@@ -337,4 +359,16 @@
{
return windowSize;
}
+
+ /**
+ * Get the heartbeat interval requested by the ldap server that created the
+ * message.
+ *
+ * @return The heartbeat interval requested by the ldap server that created
+ * the message.
+ */
+ public long getHeartbeatInterval()
+ {
+ return heartbeatInterval;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
index 08f8285..3b1185f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SocketSession.java
@@ -50,6 +50,18 @@
byte[] rcvLengthBuf = new byte[8];
/**
+ * The time the last message published to this session.
+ */
+ private long lastPublishTime = 0;
+
+
+ /**
+ * The time the last message was received on this session.
+ */
+ private long lastReceiveTime = 0;
+
+
+ /**
* Creates a new SocketSession based on the provided socket.
*
* @param socket The Socket on which the SocketSession will be based.
@@ -87,6 +99,8 @@
output.write(sendLengthBuf);
output.write(buffer);
output.flush();
+
+ lastPublishTime = System.currentTimeMillis();
}
/**
@@ -102,9 +116,13 @@
{
int read = input.read(rcvLengthBuf, length, 8-length);
if (read == -1)
+ {
throw new IOException("no more data");
+ }
else
+ {
length += read;
+ }
}
int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
@@ -114,7 +132,11 @@
length = 0;
byte[] buffer = new byte[totalLength];
while (length < totalLength)
+ {
length += input.read(buffer, length, totalLength - length);
+ }
+
+ lastReceiveTime = System.currentTimeMillis();
return SynchronizationMessage.generateMsg(buffer);
}
catch (OutOfMemoryError e)
@@ -127,6 +149,22 @@
/**
* {@inheritDoc}
*/
+ public long getLastPublishTime()
+ {
+ return lastPublishTime;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public long getLastReceiveTime()
+ {
+ return lastReceiveTime;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
public String getRemoteAddress()
{
return socket.getInetAddress().getHostAddress();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
index 77dcc1b..8aab0f5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/protocol/SynchronizationMessage.java
@@ -46,6 +46,7 @@
static final byte MSG_TYPE_SERVER_START = 6;
static final byte MSG_TYPE_CHANGELOG_START = 7;
static final byte MSG_TYPE_WINDOW = 8;
+ static final byte MSG_TYPE_HEARTBEAT = 9;
/**
* Return the byte[] representation of this message.
@@ -57,6 +58,8 @@
* MSG_TYPE_ACK
* MSG_TYPE_SERVER_START
* MSG_TYPE_CHANGELOG_START
+ * MSG_TYPE_WINDOW
+ * MSG_TYPE_HEARTBEAT
*
* @return the byte[] representation of this message.
*/
@@ -101,6 +104,9 @@
case MSG_TYPE_WINDOW:
msg = new WindowMessage(buffer);
break;
+ case MSG_TYPE_HEARTBEAT:
+ msg = new HeartbeatMessage(buffer);
+ break;
default:
throw new DataFormatException("received message with unknown type");
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
index 1c59158..69c8323 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationTestCase.java
@@ -120,7 +120,7 @@
if (emptyOldChanges)
state.loadState();
ChangelogBroker broker = new ChangelogBroker(
- state, baseDn, serverId, 0, 0, 0, 0, window_size);
+ state, baseDn, serverId, 0, 0, 0, 0, window_size, 0);
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:" + port);
broker.start(servers);
@@ -155,7 +155,7 @@
throws Exception, SocketException
{
ChangelogBroker broker = new ChangelogBroker(
- state, baseDn, serverId, 0, 0, 0, 0, window_size);
+ state, baseDn, serverId, 0, 0, 0, 0, window_size, 0);
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:" + port);
broker.start(servers);
@@ -164,7 +164,7 @@
return broker;
}
-
+
/**
* Open a changelog session with flow control to the local Changelog server.
*
@@ -179,7 +179,8 @@
if (emptyOldChanges)
state.loadState();
ChangelogBroker broker = new ChangelogBroker(
- state, baseDn, serverId, maxRcvQueue, 0, maxSendQueue, 0, window_size);
+ state, baseDn, serverId, maxRcvQueue, 0,
+ maxSendQueue, 0, window_size, 0);
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:" + port);
broker.start(servers);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
index 45652ce..6da10d0 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/UpdateOperationTest.java
@@ -46,6 +46,7 @@
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyMsg;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
+import org.opends.server.synchronization.protocol.HeartbeatThread;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -311,6 +312,100 @@
}
/**
+ * Tests whether the synchronization provider fails over when it loses
+ * the heartbeat from the synchronization server.
+ */
+ @Test(groups = "slow")
+ public void lostHeartbeatFailover() throws Exception
+ {
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "Starting synchronization test : lostHeartbeatFailover" , 1);
+
+ final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+
+ /*
+ * Open a session to the changelog server using the broker API.
+ * This must use a different serverId to that of the directory server.
+ */
+ ChangelogBroker broker =
+ openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
+
+
+ /*
+ * Create a Change number generator to generate new changenumbers
+ * when we need to send operation messages to the changelog server.
+ */
+ ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
+
+
+ // Create and publish an update message to add an entry.
+ AddMsg addMsg = new AddMsg(gen.NewChangeNumber(),
+ personWithUUIDEntry.getDN().toString(),
+ user1entryUUID,
+ baseUUID,
+ personWithUUIDEntry.getObjectClassAttribute(),
+ personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+ broker.publish(addMsg);
+
+ entryList.add(personWithUUIDEntry.getDN());
+ Entry resultEntry;
+
+ // Check that the entry has been created in the directory server.
+ resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, true);
+ assertNotNull(resultEntry,
+ "The ADD synchronization message was not replayed");
+
+ // Send a first modify operation message.
+ List<Modification> mods = generatemods("telephonenumber", "01 02 45");
+ ModifyMsg modMsg = new ModifyMsg(gen.NewChangeNumber(),
+ personWithUUIDEntry.getDN(), mods,
+ user1entryUUID);
+ broker.publish(modMsg);
+
+ // Check that the modify has been replayed.
+ boolean found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+ "telephonenumber", "01 02 45", 10000, true);
+ if (!found)
+ {
+ fail("The first modification was not replayed.");
+ }
+
+ // Simulate loss of heartbeats.
+ HeartbeatThread.setHeartbeatsDisabled(true);
+ Thread.sleep(3000);
+ HeartbeatThread.setHeartbeatsDisabled(false);
+
+ // Send a second modify operation message.
+ mods = generatemods("description", "Description was changed");
+ modMsg = new ModifyMsg(gen.NewChangeNumber(),
+ personWithUUIDEntry.getDN(), mods,
+ user1entryUUID);
+ broker.publish(modMsg);
+
+ // Check that the modify has been replayed.
+ found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
+ "description", "Description was changed",
+ 10000, true);
+ if (!found)
+ {
+ fail("The second modification was not replayed.");
+ }
+
+ // Delete the entries to clean the database.
+ DeleteMsg delMsg =
+ new DeleteMsg(personWithUUIDEntry.getDN().toString(),
+ gen.NewChangeNumber(), user1entryUUID);
+ broker.publish(delMsg);
+ resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, false);
+
+ // Check that the delete operation has been applied.
+ assertNull(resultEntry,
+ "The DELETE synchronization message was not replayed");
+ broker.stop();
+ }
+
+ /**
* Tests the naming conflict resolution code.
* In this test, the local server act both as an LDAP server and
* a changelog server that are inter-connected.
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
index 01a0c91..68ea221 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/protocol/SynchronizationMsgTest.java
@@ -36,7 +36,6 @@
import org.testng.annotations.Test;
import static org.testng.Assert.*;
-import org.opends.server.api.ClientConnection;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -48,20 +47,9 @@
import org.opends.server.synchronization.common.ChangeNumber;
import org.opends.server.synchronization.common.ServerState;
import org.opends.server.synchronization.plugin.PendingChange;
-import org.opends.server.synchronization.protocol.AckMessage;
-import org.opends.server.synchronization.protocol.AddMsg;
-import org.opends.server.synchronization.protocol.ChangelogStartMessage;
-import org.opends.server.synchronization.protocol.DeleteMsg;
-import org.opends.server.synchronization.protocol.ModifyDNMsg;
-import org.opends.server.synchronization.protocol.ModifyMsg;
-import org.opends.server.synchronization.protocol.ServerStartMessage;
-import org.opends.server.synchronization.protocol.SynchronizationMessage;
-import org.opends.server.synchronization.protocol.UpdateMessage;
-import org.opends.server.synchronization.protocol.WindowMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
-import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
@@ -187,7 +175,7 @@
throws Exception
{
DN dn = DN.decode(rawdn);
- ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid");;
+ ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid");
// Check uuid
assertEquals("fakeuniqueid", msg.getUniqueId());
@@ -221,7 +209,6 @@
/**
* Build some data for the DeleteMsg test below.
- * @throws DirectoryException
*/
@DataProvider(name = "deleteEncodeDecode")
public Object[][] createDelData() {
@@ -383,7 +370,7 @@
//Create an Add operation and generate and Add msg from it
DN dn = DN.decode(rawDN);
- addOp = new AddOperation((ClientConnection) connection,
+ addOp = new AddOperation(connection,
(long) 1, 1, null, dn, objectClassList, userAttList, opList);
OperationContext opCtx = new AddContext(cn, "thisIsaUniqueID",
"parentUniqueId");
@@ -404,7 +391,6 @@
/**
* Build some data for the AckMsg test below.
- * @throws DirectoryException
*/
@DataProvider(name = "ackMsg")
public Object[][] createAckData() {
@@ -451,7 +437,7 @@
// Check that retrieved CN is OK
msg2 = (AckMessage) SynchronizationMessage.generateMsg(msg1.getBytes());
}
-
+
@DataProvider(name="serverStart")
public Object [][] createServerStartMessageTestData() throws Exception
{
@@ -469,16 +455,20 @@
{
state.update(new ChangeNumber((long)1, 1,(short)1));
ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
- window, window, window, window, window, state);
+ window, window, window, window, window, window, state);
ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
assertEquals(msg.getServerId(), newMsg.getServerId());
assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
+ assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay());
+ assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue());
+ assertEquals(msg.getMaxSendDelay(), newMsg.getMaxSendDelay());
+ assertEquals(msg.getMaxSendQueue(), newMsg.getMaxSendQueue());
assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
- assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+ assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval());
assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
newMsg.getServerState().getMaxChangeNumber((short)1));
}
-
+
@DataProvider(name="changelogStart")
public Object [][] createChangelogStartMessageTestData() throws Exception
{
@@ -486,7 +476,7 @@
ServerState state = new ServerState();
return new Object [][] { {(short)1, baseDN, 100, "localhost:8989", state} };
}
-
+
/**
* Test that changelogStartMessage encoding and decoding works
* by checking that : msg == new ChangelogStartMessage(msg.getBytes()).
@@ -496,7 +486,7 @@
String url, ServerState state) throws Exception
{
state.update(new ChangeNumber((long)1, 1,(short)1));
- ChangelogStartMessage msg = new ChangelogStartMessage(serverId,
+ ChangelogStartMessage msg = new ChangelogStartMessage(serverId,
url, baseDN, window, state);
ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes());
assertEquals(msg.getServerId(), newMsg.getServerId());
@@ -506,7 +496,7 @@
assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
newMsg.getServerState().getMaxChangeNumber((short)1));
}
-
+
/**
* Test that WindowMessageTest encoding and decoding works
* by checking that : msg == new WindowMessageTest(msg.getBytes()).
--
Gitblit v1.10.0