From c90277aee027fd834936b44a621a142cf71de444 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 14 Dec 2007 15:55:42 +0000
Subject: [PATCH] - Partial fix for #1302: on startup servers should wait to catchup before accepting connections - Fix for #795: Changelog fail over unit tests
---
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 741 ++++++++++++++-------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java | 477 ++++++++++++++
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 3
opends/src/messages/messages/replication.properties | 16
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java | 684 ++++++++++++++++++++
6 files changed, 1,659 insertions(+), 265 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 07cdeb9..f69a1cf 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -76,8 +76,8 @@
NOTICE_SERVER_DISCONNECT_16=%s has disconnected from this replication server
NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \
listening on %s
-NOTICE_CHANGELOG_MISSING_CHANGES_18=The replication server %s is missing some \
- changes that this server has already processed on suffix %s
+NOTICE_FOUND_CHANGELOGS_WITH_MY_CHANGES_18=Found %d replication server(s) with \
+up to date chnages for suffix %s
NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \
server should be configured
SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
@@ -85,8 +85,8 @@
MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \
database for base DN %s
NOTICE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES_22=Could not find a \
- replication server that has seen all the local changes on suffix %s. Going to replay \
- changes
+ replication server that has seen all the local changes on suffix %s. Found %d \
+replications server(s) not up to date. Going to replay changes
NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \
server on suffix %s, retrying...
NOTICE_EXCEPTION_CLOSING_DATABASE_24=Error closing changelog database %s :
@@ -249,4 +249,12 @@
SEVERE_ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED_99=The replication \
server backend cannot export its entries in LDIF format because the \
export-ldif command must be run as a task
+DEBUG_GOING_TO_SEARCH_FOR_CHANGES_100=The replication server is late \
+regarding our changes: going to send missing ones
+DEBUG_SENDING_CHANGE_101=Sending change number: %s
+DEBUG_CHANGES_SENT_102=All missing changes sent to replication server
+SEVERE_ERR_PUBLISHING_FAKE_OPS_103=Caught exception publishing fake operations \
+for domain %s to replication server %s : %s
+SEVERE_ERR_COMPUTING_FAKE_OPS_104=Caught exception computing fake operations \
+for domain %s for replication server %s : %s
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index 94fddf0..049910b 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,6 +25,7 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
+
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -41,6 +42,8 @@
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
@@ -61,17 +64,16 @@
import org.opends.server.types.SearchResultReference;
import org.opends.server.types.SearchScope;
-
/**
* The broker for Multi-master Replication.
*/
public class ReplicationBroker implements InternalSearchListener
{
+
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
-
private boolean shutdown = false;
private Collection<String> servers;
private boolean connected = false;
@@ -95,23 +97,23 @@
private long generationId = -1;
private ReplSessionSecurity replSessionSecurity;
+ // Trick for avoiding a inner class for many parameters return for
+ // performHandshake method.
+ private String tmpReadableServerName = null;
+
/**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
*/
private long heartbeatInterval = 0;
-
-
/**
* A thread to monitor heartbeats on the session.
*/
private HeartbeatMonitor heartbeatMonitor = null;
-
/**
* The number of times the connection was lost.
*/
private int numLostConnections = 0;
-
/**
* When the broker cannot connect to any replication server
* it log an error and keeps continuing every second.
@@ -121,7 +123,6 @@
* finally succeed to connect.
*/
private boolean connectionError = false;
-
private final Object connectPhaseLock = new Object();
/**
@@ -149,9 +150,9 @@
* @param replSessionSecurity The session security configuration.
*/
public ReplicationBroker(ServerState state, DN baseDn, short serverID,
- int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
- int maxSendDelay, int window, long heartbeatInterval,
- long generationId, ReplSessionSecurity replSessionSecurity)
+ int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
+ int maxSendDelay, int window, long heartbeatInterval,
+ long generationId, ReplSessionSecurity replSessionSecurity)
{
this.baseDn = baseDn;
this.serverID = serverID;
@@ -164,7 +165,7 @@
new TreeSet<FakeOperation>(new FakeOperationComparator());
this.rcvWindow = window;
this.maxRcvWindow = window;
- this.halfRcvWindow = window/2;
+ this.halfRcvWindow = window / 2;
this.heartbeatInterval = heartbeatInterval;
this.protocolVersion = ProtocolVersion.currentVersion();
this.generationId = generationId;
@@ -194,7 +195,6 @@
this.connect();
}
-
/**
* Connect to a ReplicationServer.
*
@@ -202,7 +202,7 @@
*/
private void connect()
{
- ReplServerStartMessage replServerStartMsg = null;
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
// Stop any existing heartbeat monitor from a previous session.
if (heartbeatMonitor != null)
@@ -211,187 +211,121 @@
heartbeatMonitor = null;
}
- // checkState is true for the first loop on all replication servers
- // looking for one already up-to-date.
- // If we found some responding replication servers but none up-to-date
- // then we set check-state to false and do a second loop where the first
- // found will be the one elected and then we will update this replication
- // server.
- boolean checkState = true;
- boolean receivedResponse = true;
-
- // TODO: We are doing here 2 loops opening , closing , reopening session to
- // the same servers .. risk to have 'same server id' erros.
- // Would be better to do only one loop, keeping the best candidate while
- // traversing the list of replication servers to connect to.
- if (servers.size()==1)
- {
- checkState = false;
- }
-
synchronized (connectPhaseLock)
{
- while ((!connected) && (!shutdown) && (receivedResponse))
+ /*
+ * Connect to each replication server and get their ServerState then find
+ * out which one is the best to connect to.
+ */
+ for (String server : servers)
{
- receivedResponse = false;
- for (String server : servers)
- {
- int separator = server.lastIndexOf(':');
- String port = server.substring(separator + 1);
- String hostname = server.substring(0, separator);
+ // Connect to server and get reply message
+ ReplServerStartMessage replServerStartMsg =
+ performHandshake(server, false);
+ tmpReadableServerName = null; // Not needed now
+ // Store reply message in list
+ if (replServerStartMsg != null)
+ {
+ ServerState rsState = replServerStartMsg.getServerState();
+ rsStates.put(server, rsState);
+ }
+ } // for servers
+
+ ReplServerStartMessage replServerStartMsg = null;
+
+ if (rsStates.size() > 0)
+ {
+
+ // At least one server answered, find the best one.
+ String bestServer = computeBestReplicationServer(state, rsStates,
+ serverID, baseDn);
+
+ // Best found, now connect to this one
+ replServerStartMsg = performHandshake(bestServer, true);
+
+ if (replServerStartMsg != null)
+ {
try
{
/*
- * Open a socket connection to the next candidate.
- */
- InetSocketAddress ServerAddr = new InetSocketAddress(
- InetAddress.getByName(hostname), Integer.parseInt(port));
- Socket socket = new Socket();
- socket.setReceiveBufferSize(1000000);
- socket.setTcpNoDelay(true);
- socket.connect(ServerAddr, 500);
- session = replSessionSecurity.createClientSession(server, socket);
- boolean isSslEncryption =
- replSessionSecurity.isSslEncryption(server);
- /*
- * Send our ServerStartMessage.
- */
- ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
- maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
- halfRcvWindow*2, heartbeatInterval, state,
- protocolVersion, generationId, isSslEncryption);
- session.publish(msg);
-
-
- /*
- * Read the ReplServerStartMessage that should come back.
- */
- session.setSoTimeout(1000);
- replServerStartMsg = (ReplServerStartMessage) session.receive();
- receivedResponse = true;
-
- /*
- * We have sent our own protocol version to the replication server.
- * The replication server will use the same one (or an older one
- * if it is an old replication server).
- */
- protocolVersion = ProtocolVersion.minWithCurrent(
- replServerStartMsg.getVersion());
- session.setSoTimeout(timeout);
-
- if (!isSslEncryption)
- {
- session.stopEncryption();
- }
-
- /*
* We must not publish changes to a replicationServer that has not
* seen all our previous changes because this could cause some
* other ldap servers to miss those changes.
* Check that the ReplicationServer has seen all our previous
* changes.
- * If not, try another replicationServer.
- * If no other replicationServer has seen all our changes, recover
- * those changes and send them again to any replicationServer.
*/
ChangeNumber replServerMaxChangeNumber =
replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
+
if (replServerMaxChangeNumber == null)
+ {
replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
+ }
ChangeNumber ourMaxChangeNumber =
state.getMaxChangeNumber(serverID);
- if ((ourMaxChangeNumber == null) ||
- (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
+
+ if ((ourMaxChangeNumber != null) &&
+ (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
{
- replicationServer = ServerAddr.toString();
- maxSendWindow = replServerStartMsg.getWindowSize();
- connected = true;
- startHeartBeat();
- break;
- }
- else
- {
- if (checkState == true)
+
+ // Replication server is missing some of our changes: let's send
+ // them to him.
+ replayOperations.clear();
+
+ Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
+ logError(message);
+
+ /*
+ * Get all the changes that have not been seen by this
+ * replication server and populate the replayOperations
+ * list.
+ */
+ InternalSearchOperation op = seachForChangedEntries(
+ baseDn, replServerMaxChangeNumber, this);
+ if (op.getResultCode() != ResultCode.SUCCESS)
{
- /* This replicationServer is missing some
- * of our changes, we are going to try another server
- * but before log a notice message
+ /*
+ * An error happened trying to search for the updates
+ * This server will start acepting again new updates but
+ * some inconsistencies will stay between servers.
+ * Log an error for the repair tool
+ * that will need to resynchronize the servers.
*/
- Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
- baseDn.toNormalizedString());
+ message = ERR_CANNOT_RECOVER_CHANGES.get(
+ baseDn.toNormalizedString());
+ logError(message);
+ } else
+ {
+ for (FakeOperation replayOp : replayOperations)
+ {
+ message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber().
+ toString());
+ logError(message);
+ session.publish(replayOp.generateMessage());
+ }
+ message = DEBUG_CHANGES_SENT.get();
logError(message);
}
- else
- {
- replayOperations.clear();
-
- // TODO: i18n
- logError(Message.raw("going to search for changes"));
-
- /*
- * Get all the changes that have not been seen by this
- * replicationServer and populate the replayOperations
- * list.
- */
- InternalSearchOperation op = seachForChangedEntries(
- baseDn, replServerMaxChangeNumber, this);
- if (op.getResultCode() != ResultCode.SUCCESS)
- {
- /*
- * An error happened trying to search for the updates
- * This server will start acepting again new updates but
- * some inconsistencies will stay between servers.
- * Log an error for the repair tool
- * that will need to resynchronize the servers.
- */
- Message message = ERR_CANNOT_RECOVER_CHANGES.get(
- baseDn.toNormalizedString());
- logError(message);
- replicationServer = ServerAddr.toString();
- maxSendWindow = replServerStartMsg.getWindowSize();
- connected = true;
- startHeartBeat();
- }
- else
- {
- replicationServer = ServerAddr.toString();
- maxSendWindow = replServerStartMsg.getWindowSize();
- connected = true;
- for (FakeOperation replayOp : replayOperations)
- {
- logError(Message.raw("sendingChange")); // TODO: i18n
- session.publish(replayOp.generateMessage());
- }
- startHeartBeat();
- logError(Message.raw("changes sent")); // TODO: i18n
- break;
- }
- }
}
- }
- catch (ConnectException e)
+
+ replicationServer = tmpReadableServerName;
+ maxSendWindow = replServerStartMsg.getWindowSize();
+ connected = true;
+ startHeartBeat();
+ } catch (IOException e)
{
- /*
- * There was no server waiting on this host:port
- * Log a notice and try the next replicationServer in the list
- */
- if (!connectionError )
- {
- // the error message is only logged once to avoid overflowing
- // the error log
- Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
- logError(message);
- }
- }
- catch (Exception e)
- {
- Message message = ERR_EXCEPTION_STARTING_SESSION.get(
- baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
- stackTraceToSingleLineString(e));
+ Message message = ERR_PUBLISHING_FAKE_OPS.get(
+ baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
+ stackTraceToSingleLineString(e));
logError(message);
- }
- finally
+ } catch (Exception e)
+ {
+ Message message = ERR_COMPUTING_FAKE_OPS.get(
+ baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
+ stackTraceToSingleLineString(e));
+ logError(message);
+ } finally
{
if (connected == false)
{
@@ -402,81 +336,60 @@
session.close();
} catch (IOException e)
{
- // The session was already closed, just ignore.
+ // The session was already closed, just ignore.
}
session = null;
}
}
}
- } // for servers
-
- // We have traversed all the replication servers
-
- if ((!connected) && (checkState == true) && receivedResponse)
- {
- /*
- * We could not find a replicationServer that has seen all the
- * changes that this server has already processed, start again
- * the loop looking for any replicationServer.
- */
- Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
- baseDn.toNormalizedString());
- logError(message);
- checkState = false;
- }
- }
-
- // We have traversed all the replication servers as many times as needed
- // to find one if one is up and running.
+ } // Could perform handshake with best
+ } // Reached some servers
if (connected)
{
- // This server has connected correctly.
// Log a message to let the administrator know that the failure was
// resolved.
- // wakeup all the thread that were waiting on the window
+ // Wakeup all the thread that were waiting on the window
// on the previous connection.
connectionError = false;
if (sendWindow != null)
+ {
sendWindow.release(Integer.MAX_VALUE);
+ }
this.sendWindow = new Semaphore(maxSendWindow);
connectPhaseLock.notify();
if ((replServerStartMsg.getGenerationId() == this.generationId) ||
- (replServerStartMsg.getGenerationId() == -1))
+ (replServerStartMsg.getGenerationId() == -1))
{
Message message =
NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
- baseDn.toString(),
- replicationServer,
- Long.toString(this.generationId));
+ baseDn.toString(),
+ replicationServer,
+ Long.toString(this.generationId));
logError(message);
- }
- else
+ } else
{
Message message =
NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
- baseDn.toString(),
- replicationServer,
- Long.toString(this.generationId),
- Long.toString(replServerStartMsg.getGenerationId()));
+ baseDn.toString(),
+ replicationServer,
+ Long.toString(this.generationId),
+ Long.toString(replServerStartMsg.getGenerationId()));
logError(message);
}
- }
- else
+ } else
{
/*
- * This server could not find any replicationServer
- * It's going to start in degraded mode.
- * Log a message
+ * This server could not find any replicationServer. It's going to start
+ * in degraded mode. Log a message.
*/
if (!connectionError)
{
- checkState = false;
connectionError = true;
connectPhaseLock.notify();
Message message =
- NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
+ NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
logError(message);
}
}
@@ -484,6 +397,315 @@
}
/**
+ * Connect to the provided server performing the handshake (start messages
+ * exchange) and return the reply message from the replication server.
+ *
+ * @param server Server to connect to.
+ * @param keepConnection Do we keep session opened or not after handshake.
+ * @return The ReplServerStartMessage the server replied. Null if could not
+ * get an answer.
+ */
+ public ReplServerStartMessage performHandshake(String server,
+ boolean keepConnection)
+ {
+ ReplServerStartMessage replServerStartMsg = null;
+
+ // Parse server string.
+ int separator = server.lastIndexOf(':');
+ String port = server.substring(separator + 1);
+ String hostname = server.substring(0, separator);
+
+ boolean error = false;
+ try
+ {
+ /*
+ * Open a socket connection to the next candidate.
+ */
+ int intPort = Integer.parseInt(port);
+ InetSocketAddress serverAddr = new InetSocketAddress(
+ InetAddress.getByName(hostname), intPort);
+ tmpReadableServerName = serverAddr.toString();
+ Socket socket = new Socket();
+ socket.setReceiveBufferSize(1000000);
+ socket.setTcpNoDelay(true);
+ socket.connect(serverAddr, 500);
+ session = replSessionSecurity.createClientSession(server, socket);
+ boolean isSslEncryption =
+ replSessionSecurity.isSslEncryption(server);
+ /*
+ * Send our ServerStartMessage.
+ */
+ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
+ maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+ halfRcvWindow * 2, heartbeatInterval, state,
+ protocolVersion, generationId, isSslEncryption);
+ session.publish(msg);
+
+ /*
+ * Read the ReplServerStartMessage that should come back.
+ */
+ session.setSoTimeout(1000);
+ replServerStartMsg = (ReplServerStartMessage) session.receive();
+
+ /*
+ * We have sent our own protocol version to the replication server.
+ * The replication server will use the same one (or an older one
+ * if it is an old replication server).
+ */
+ protocolVersion = ProtocolVersion.minWithCurrent(
+ replServerStartMsg.getVersion());
+ session.setSoTimeout(timeout);
+
+ if (!isSslEncryption)
+ {
+ session.stopEncryption();
+ }
+ } catch (ConnectException e)
+ {
+ /*
+ * There was no server waiting on this host:port
+ * Log a notice and try the next replicationServer in the list
+ */
+ if (!connectionError)
+ {
+ // the error message is only logged once to avoid overflowing
+ // the error log
+ Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
+ logError(message);
+ }
+ error = true;
+ } catch (Exception e)
+ {
+ Message message = ERR_EXCEPTION_STARTING_SESSION.get(
+ baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
+ stackTraceToSingleLineString(e));
+ logError(message);
+ error = true;
+ }
+
+ // Close session if requested
+ if (!keepConnection || error)
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ } catch (IOException e)
+ {
+ // The session was already closed, just ignore.
+ }
+ session = null;
+ }
+ if (error)
+ {
+ replServerStartMsg = null;
+ } // Be sure to return null.
+ }
+
+ return replServerStartMsg;
+ }
+
+ /**
+ * Returns the replication server that best fits our need so that we can
+ * connect to it.
+ *
+ * Note: this method put as public static for unit testing purpose.
+ *
+ * @param myState The local server state.
+ * @param rsStates The list of available replication servers and their
+ * associated server state.
+ * @param serverId The server id for the suffix we are working for.
+ * @param baseDn The suffix for which we are working for.
+ * @return The computed best replication server.
+ */
+ public static String computeBestReplicationServer(ServerState myState,
+ HashMap<String, ServerState> rsStates, short serverId, DN baseDn)
+ {
+
+ /*
+ * Find replication servers who are up to date (or more up to date than us,
+ * if for instance we failed and restarted, having sent some changes to the
+ * RS but without having time to store our own state) regarding our own
+ * server id. Then, among them, choose the server that is the most up to
+ * date regarding the whole topology.
+ *
+ * If no server is up to date regarding our own server id, find the one who
+ * is the most up to date regarding our server id.
+ */
+
+ // Should never happen (sanity check)
+ if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) ||
+ (baseDn == null))
+ {
+ return null;
+ }
+
+ String bestServer = null;
+ // Servers up to dates with regard to our changes
+ HashMap<String, ServerState> upToDateServers =
+ new HashMap<String, ServerState>();
+ // Servers late with regard to our changes
+ HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
+
+ /*
+ * Start loop to differenciate up to date servers from late ones.
+ */
+ ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId);
+ if (myChangeNumber == null)
+ {
+ myChangeNumber = new ChangeNumber(0, 0, serverId);
+ }
+ for (String repServer : rsStates.keySet())
+ {
+
+ ServerState rsState = rsStates.get(repServer);
+ ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId);
+ if (rsChangeNumber == null)
+ {
+ rsChangeNumber = new ChangeNumber(0, 0, serverId);
+ }
+
+ // Store state in right list
+ if (myChangeNumber.olderOrEqual(rsChangeNumber))
+ {
+ upToDateServers.put(repServer, rsState);
+ } else
+ {
+ lateOnes.put(repServer, rsState);
+ }
+ }
+
+ if (upToDateServers.size() > 0)
+ {
+
+ /*
+ * Some up to date servers, among them, choose the one that has the
+ * maximum number of changes to send us. This is the most up to date one
+ * regarding the whole topology. This server is the one which has the less
+ * difference with the topology server state. For comparison, we need to
+ * compute the difference for each server id with the topology server
+ * state.
+ */
+
+ Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
+ upToDateServers.size(),
+ baseDn.toNormalizedString());
+ logError(message);
+
+ /*
+ * First of all, compute the virtual server state for the whole topology,
+ * which is composed of the most up to date change numbers for
+ * each server id in the topology.
+ */
+ ServerState topoState = new ServerState();
+ for (ServerState curState : upToDateServers.values())
+ {
+
+ Iterator<Short> it = curState.iterator();
+ while (it.hasNext())
+ {
+ Short sId = it.next();
+ ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
+ if (curSidCn == null)
+ {
+ curSidCn = new ChangeNumber(0, 0, sId);
+ }
+ // Update topology state
+ topoState.update(curSidCn);
+ }
+ } // For up to date servers
+
+ // Min of the max shifts
+ long minShift = -1L;
+ for (String upServer : upToDateServers.keySet())
+ {
+
+ /*
+ * Compute the maximum difference between the time of a server id's
+ * change number and the time of the matching server id's change
+ * number in the topology server state.
+ *
+ * Note: we could have used the sequence number here instead of the
+ * timestamp, but this would have caused a problem when the sequence
+ * number loops and comes back to 0 (computation would have becomen
+ * meaningless).
+ */
+ long shift = -1L;
+ ServerState curState = upToDateServers.get(upServer);
+ Iterator<Short> it = curState.iterator();
+ while (it.hasNext())
+ {
+ Short sId = it.next();
+ ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
+ if (curSidCn == null)
+ {
+ curSidCn = new ChangeNumber(0, 0, sId);
+ }
+ // Cannot be null as checked at construction time
+ ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
+ // Cannot be negative as topoState computed as being the max CN
+ // for each server id in the topology
+ long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
+ if (tmpShift > shift)
+ {
+ shift = tmpShift;
+ }
+ }
+
+ if ((minShift < 0) // First time in loop
+ || (shift < minShift))
+ {
+ // This sever is even closer to topo state
+ bestServer = upServer;
+ minShift = shift;
+ }
+ } // For up to date servers
+
+ } else
+ {
+ /*
+ * We could not find a replication server that has seen all the
+ * changes that this server has already processed,
+ */
+ // lateOnes cannot be empty
+ Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
+ baseDn.toNormalizedString(), lateOnes.size());
+ logError(message);
+
+ // Min of the shifts
+ long minShift = -1L;
+ for (String lateServer : lateOnes.keySet())
+ {
+
+ /*
+ * Choose the server who is the closest to us regarding our server id
+ * (this is the most up to date regarding our server id).
+ */
+ ServerState curState = lateOnes.get(lateServer);
+ ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId);
+ if (ourSidCn == null)
+ {
+ ourSidCn = new ChangeNumber(0, 0, serverId);
+ }
+ // Cannot be negative as our Cn for our server id is strictly
+ // greater than those of the servers in late server list
+ long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
+
+ if ((minShift < 0) // First time in loop
+ || (tmpShift < minShift))
+ {
+ // This sever is even closer to topo state
+ bestServer = lateServer;
+ minShift = tmpShift;
+ }
+ } // For late servers
+ }
+
+ return bestServer;
+ }
+
+ /**
* Search for the changes that happened since fromChangeNumber
* based on the historical attribute.
* @param baseDn the base DN
@@ -493,26 +715,26 @@
* @throws Exception when raised.
*/
public static InternalSearchOperation seachForChangedEntries(
- DN baseDn,
- ChangeNumber fromChangeNumber,
- InternalSearchListener resultListener)
- throws Exception
+ DN baseDn,
+ ChangeNumber fromChangeNumber,
+ InternalSearchListener resultListener)
+ throws Exception
{
InternalClientConnection conn =
InternalClientConnection.getRootConnection();
LDAPFilter filter = LDAPFilter.decode(
- "("+ Historical.HISTORICALATTRIBUTENAME +
- ">=dummy:" + fromChangeNumber + ")");
+ "(" + Historical.HISTORICALATTRIBUTENAME +
+ ">=dummy:" + fromChangeNumber + ")");
LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
attrs.add(Historical.HISTORICALATTRIBUTENAME);
attrs.add(Historical.ENTRYUIDNAME);
return conn.processSearch(
- new ASN1OctetString(baseDn.toString()),
- SearchScope.WHOLE_SUBTREE,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- 0, 0, false, filter,
- attrs,
- resultListener);
+ new ASN1OctetString(baseDn.toString()),
+ SearchScope.WHOLE_SUBTREE,
+ DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, 0, false, filter,
+ attrs,
+ resultListener);
}
/**
@@ -524,14 +746,13 @@
if (heartbeatInterval > 0)
{
heartbeatMonitor =
- new HeartbeatMonitor("Replication Heartbeat Monitor on " +
- baseDn + " with " + getReplicationServer(),
- session, heartbeatInterval);
+ new HeartbeatMonitor("Replication Heartbeat Monitor on " +
+ baseDn + " with " + getReplicationServer(),
+ session, heartbeatInterval);
heartbeatMonitor.start();
}
}
-
/**
* restart the ReplicationBroker.
*/
@@ -556,7 +777,7 @@
}
} catch (IOException e1)
{
- // ignore
+ // ignore
}
if (failingSession == session)
@@ -572,7 +793,7 @@
{
MessageBuilder mb = new MessageBuilder();
mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
- baseDn.toNormalizedString(), e.getLocalizedMessage()));
+ baseDn.toNormalizedString(), e.getLocalizedMessage()));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
}
@@ -583,13 +804,12 @@
Thread.sleep(500);
} catch (InterruptedException e)
{
- // ignore
+ // ignore
}
}
}
}
-
/**
* Publish a message to the other servers.
* @param msg the message to publish
@@ -598,7 +818,7 @@
{
boolean done = false;
- while (!done)
+ while (!done && !shutdown)
{
if (connectionError)
{
@@ -611,7 +831,7 @@
if (debugEnabled())
{
debugInfo("ReplicationBroker.publish() Publishing a " +
- " message is not possible due to existing connection error.");
+ " message is not possible due to existing connection error.");
}
return;
@@ -642,9 +862,8 @@
// want to hold off reconnection in case the connection dropped.
credit =
currentWindowSemaphore.tryAcquire(
- (long) 500, TimeUnit.MILLISECONDS);
- }
- else
+ (long) 500, TimeUnit.MILLISECONDS);
+ } else
{
credit = true;
}
@@ -685,24 +904,22 @@
if (debugEnabled())
{
debugInfo("ReplicationBroker.publish() " +
- "IO exception raised : " + e.getLocalizedMessage());
+ "IO exception raised : " + e.getLocalizedMessage());
}
}
}
- }
- catch (InterruptedException e)
+ } catch (InterruptedException e)
{
// just loop.
if (debugEnabled())
{
debugInfo("ReplicationBroker.publish() " +
- "Interrupted exception raised." + e.getLocalizedMessage());
+ "Interrupted exception raised." + e.getLocalizedMessage());
}
}
}
}
-
/**
* Receive a message.
* This method is not multithread safe and should either always be
@@ -730,8 +947,7 @@
{
WindowMessage windowMsg = (WindowMessage) msg;
sendWindow.release(windowMsg.getNumAck());
- }
- else
+ } else
{
if (msg instanceof UpdateMessage)
{
@@ -752,11 +968,11 @@
if (shutdown == false)
{
Message message =
- NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
+ NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
logError(message);
debugInfo("ReplicationBroker.receive() " + baseDn +
- " Exception raised." + e + e.getLocalizedMessage());
+ " Exception raised." + e + e.getLocalizedMessage());
this.reStart(failingSession);
}
}
@@ -764,7 +980,6 @@
return null;
}
-
/**
* stop the server.
*/
@@ -773,8 +988,10 @@
replicationServer = "stopped";
shutdown = true;
connected = false;
- if (heartbeatMonitor!= null)
+ if (heartbeatMonitor != null)
+ {
heartbeatMonitor.shutdown();
+ }
try
{
if (debugEnabled())
@@ -784,9 +1001,12 @@
}
if (session != null)
+ {
session.close();
+ }
} catch (IOException e)
- {}
+ {
+ }
}
/**
@@ -834,12 +1054,13 @@
{
return replicationServer;
}
+
/**
* {@inheritDoc}
*/
public void handleInternalSearchEntry(
- InternalSearchOperation searchOperation,
- SearchResultEntry searchEntry)
+ InternalSearchOperation searchOperation,
+ SearchResultEntry searchEntry)
{
/*
* Only deal with modify operation so far
@@ -862,10 +1083,10 @@
* {@inheritDoc}
*/
public void handleInternalSearchReference(
- InternalSearchOperation searchOperation,
- SearchResultReference searchReference)
+ InternalSearchOperation searchOperation,
+ SearchResultReference searchReference)
{
- // TODO to be implemented
+ // TODO to be implemented
}
/**
@@ -906,9 +1127,12 @@
public int getCurrentSendWindow()
{
if (connected)
+ {
return sendWindow.availablePermits();
- else
+ } else
+ {
return 0;
+ }
}
/**
@@ -920,7 +1144,6 @@
return numLostConnections;
}
-
/**
* Change some config parameters.
*
@@ -933,8 +1156,8 @@
* @param heartbeatInterval The heartbeat interval.
*/
public void changeConfig(Collection<String> replicationServers,
- int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
- int maxSendDelay, int window, long heartbeatInterval)
+ int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
+ int maxSendDelay, int window, long heartbeatInterval)
{
this.servers = replicationServers;
this.maxRcvWindow = window;
@@ -943,9 +1166,9 @@
this.maxReceiveQueue = maxReceiveQueue;
this.maxSendDelay = maxSendDelay;
this.maxSendQueue = maxSendQueue;
- // TODO : Changing those parameters requires to either restart a new
- // session with the replicationServer or renegociate the parameters that
- // were sent in the ServerStart message
+ // TODO : Changing those parameters requires to either restart a new
+ // session with the replicationServer or renegociate the parameters that
+ // were sent in the ServerStart message
}
/**
@@ -968,7 +1191,11 @@
return !connectionError;
}
- private boolean debugEnabled() { return true; }
+ private boolean debugEnabled()
+ {
+ return true;
+ }
+
private static final void debugInfo(String s)
{
logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 397a00a..c1add4c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -133,9 +133,6 @@
// At startup, the listen thread wait on this flag for the connet
// thread to look for other servers in the topology.
- // TODO when a replication server is out of date (has old changes
- // to receive from other servers, the listen thread should not accept
- // connection from ldap servers. (issue 1302)
private boolean connectedInTopology = false;
private final Object connectedInTopologyLock = new Object();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
new file mode 100644
index 0000000..5321783
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -0,0 +1,684 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import java.util.HashMap;
+import static org.opends.server.replication.plugin.ReplicationBroker.*;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.testng.Assert.*;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.types.DN;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test the algorithm for find the best replication server among the configured
+ * ones.
+ */
+public class ComputeBestServerTest extends ReplicationTestCase
+{
+
+ // The tracer object for the debug logger
+ private static final DebugTracer TRACER = getTracer();
+
+ private void debugInfo(String s)
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("** TEST **" + s);
+ }
+ }
+
+ private void debugInfo(String message, Exception e)
+ {
+ debugInfo(message + stackTraceToSingleLineString(e));
+ }
+
+ /**
+ * Set up the environment.
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @BeforeClass
+ @Override
+ public void setUp() throws Exception
+ {
+ // Don't need server context in these tests
+ }
+
+ /**
+ * Clean up the environment.
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @AfterClass
+ @Override
+ public void classCleanUp() throws Exception
+ {
+ // Don't need server context in these tests
+ }
+
+ /**
+ * Test with one replication server, nobody has a change number (simulates)
+ * very first connection.
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void testNullCNBoth() throws Exception
+ {
+ String testCase = "testNullCNBoth";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ short myId1 = 1;
+ short myId2 = 2;
+ short myId3 = 3;
+
+ // definitions for server names
+ final String WINNER = "winner";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers state list
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(0L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(0L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(WINNER, aState);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
+
+ /**
+ * Test with one replication server, only replication server has a non null
+ * changenumber for ds server id
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void testNullCNDS() throws Exception
+ {
+ String testCase = "testNullCNDS";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ short myId1 = 1;
+ short myId2 = 2;
+ short myId3 = 3;
+ // definitions for server names
+ final String WINNER = "winner";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers state list
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(0L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(0L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(0L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(WINNER, aState);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
+
+ /**
+ * Test with one replication server, only ds server has a non null
+ * changenumber for ds server id but rs has a null one.
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void testNullCNRS() throws Exception
+ {
+ String testCase = "testNullCNRS";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ short myId1 = 1;
+ short myId2 = 2;
+ short myId3 = 3;
+
+ // definitions for server names
+ final String WINNER = "winner";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
+ mySt.update(cn);
+ cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers state list
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(0L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(0L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(WINNER, aState);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
+
+ /**
+ * Test with one replication server, up to date.
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void test1ServerUp() throws Exception
+ {
+ String testCase = "test1ServerUp";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ short myId1 = 1;
+ short myId2 = 2;
+ short myId3 = 3;
+ // definitions for server names
+ final String WINNER = "winner";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
+ mySt.update(cn);
+ cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers state list
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(1L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(WINNER, aState);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
+
+ /**
+ * Test with 2 replication servers, up to date.
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void test2ServersUp() throws Exception
+ {
+ String testCase = "test2ServersUp";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ short myId1 = 1;
+ short myId2 = 2;
+ short myId3 = 3;
+ // definitions for server names
+ final String WINNER = "winner";
+ final String LOOSER1 = "looser1";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
+ mySt.update(cn);
+ cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers state list
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(1L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER1, aState);
+
+ // State for server 2
+ aState = new ServerState();
+ cn = new ChangeNumber(2L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(WINNER, aState);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
+
+ /**
+ * Test with 3 replication servers, up to date.
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void test3ServersUp() throws Exception
+ {
+ String testCase = "test3ServersUp";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ short myId1 = 1;
+ short myId2 = 2;
+ short myId3 = 3;
+ // definitions for server names
+ final String WINNER = "winner";
+ final String LOOSER1 = "looser1";
+ final String LOOSER2 = "looser2";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
+ mySt.update(cn);
+ cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers state list
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(1L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER1, aState);
+
+ // State for server 2
+ aState = new ServerState();
+ cn = new ChangeNumber(2L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(WINNER, aState);
+
+ // State for server 3
+ aState = new ServerState();
+ cn = new ChangeNumber(3L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(2L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER2, aState);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
+
+ /**
+ * Test with one replication server, late.
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void test1ServerLate() throws Exception
+ {
+ String testCase = "test1ServerLate";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ short myId1 = 1;
+ short myId2 = 2;
+ short myId3 = 3;
+ // definitions for server names
+ final String WINNER = "winner";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
+ mySt.update(cn);
+ cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers state list
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(0L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(1L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(WINNER, aState);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
+
+ /**
+ * Test with 2 replication servers, late.
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void test2ServersLate() throws Exception
+ {
+ String testCase = "test2ServersLate";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ short myId1 = 1;
+ short myId2 = 2;
+ short myId3 = 3;
+ // definitions for server names
+ final String WINNER = "winner";
+ final String LOOSER1 = "looser1";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(2L, 0, myId1);
+ mySt.update(cn);
+ cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers state list
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(0L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(10L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(10L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER1, aState);
+
+ // State for server 2
+ aState = new ServerState();
+ cn = new ChangeNumber(1L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(0L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(0L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(WINNER, aState);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
+
+ /**
+ * Test with 3 replication servers, late.
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void test3ServersLate() throws Exception
+ {
+ String testCase = "test3ServersLate";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ short myId1 = 1;
+ short myId2 = 2;
+ short myId3 = 3;
+ // definitions for server names
+ final String WINNER = "winner";
+ final String LOOSER1 = "looser1";
+ final String LOOSER2 = "looser2";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
+ mySt.update(cn);
+ cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers state list
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(1L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(10L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(10L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER1, aState);
+
+ // State for server 2
+ aState = new ServerState();
+ cn = new ChangeNumber(3L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(0L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(0L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(WINNER, aState);
+
+ // State for server 3
+ aState = new ServerState();
+ cn = new ChangeNumber(2L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(10L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(10L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER2, aState);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
+
+ /**
+ * Test with 6 replication servers, some up, some late, one null
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void test6ServersMixed() throws Exception
+ {
+ String testCase = "test6ServersMixed";
+
+ debugInfo("Starting " + testCase);
+
+ // definitions for server ids
+ short myId1 = 1;
+ short myId2 = 2;
+ short myId3 = 3;
+
+ // definitions for server names
+ final String WINNER = "winner";
+ final String LOOSER1 = "looser1";
+ final String LOOSER2 = "looser2";
+ final String LOOSER3 = "looser3";
+ final String LOOSER4 = "looser4";
+ final String LOOSER5 = "looser5";
+
+ // Create my state
+ ServerState mySt = new ServerState();
+ ChangeNumber cn = new ChangeNumber(5L, 0, myId1);
+ mySt.update(cn);
+ cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
+ mySt.update(cn);
+ cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
+ mySt.update(cn);
+
+ // Create replication servers state list
+ HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+
+ // State for server 1
+ ServerState aState = new ServerState();
+ cn = new ChangeNumber(4L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(10L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(10L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER1, aState);
+
+ // State for server 2
+ aState = new ServerState();
+ cn = new ChangeNumber(7L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(6L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(5L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER2, aState);
+
+ // State for server 3
+ aState = new ServerState();
+ cn = new ChangeNumber(3L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(10L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(10L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER3, aState);
+
+ // State for server 4
+ aState = new ServerState();
+ cn = new ChangeNumber(6L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(6L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(8L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(WINNER, aState);
+
+ // State for server 5 (null one for our serverid)
+ aState = new ServerState();
+ cn = new ChangeNumber(5L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(5L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER4, aState);
+
+ // State for server 6
+ aState = new ServerState();
+ cn = new ChangeNumber(5L, 0, myId1);
+ aState.update(cn);
+ cn = new ChangeNumber(7L, 0, myId2);
+ aState.update(cn);
+ cn = new ChangeNumber(6L, 0, myId3);
+ aState.update(cn);
+ rsStates.put(LOOSER5, aState);
+
+ String bestServer =
+ computeBestReplicationServer(mySt, rsStates, myId1, new DN());
+
+ assertEquals(bestServer, WINNER, "Wrong best replication server.");
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
new file mode 100644
index 0000000..2c8116d
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -0,0 +1,477 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import java.io.IOException;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.testng.Assert.*;
+
+import java.net.ServerSocket;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.DN;
+import org.opends.server.types.ResultCode;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test if the replication domain is able to switch of replication rerver
+ * if there is some replication server failure.
+ */
+@Test(sequential = true)
+public class ReplicationServerFailoverTest extends ReplicationTestCase
+{
+
+ private static final String BASEDN_STRING = "dc=example,dc=com";
+ private static final short DS1_ID = 1;
+ private static final short DS2_ID = 2;
+ private static final short RS1_ID = 11;
+ private static final short RS2_ID = 12;
+ private int rs1Port = -1;
+ private int rs2Port = -1;
+ private ReplicationDomain rd1 = null;
+ private ReplicationDomain rd2 = null;
+ private ReplicationServer rs1 = null;
+ private ReplicationServer rs2 = null;
+
+ // The tracer object for the debug logger
+ private static final DebugTracer TRACER = getTracer();
+
+ private void debugInfo(String s)
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("** TEST **" + s);
+ }
+ }
+
+ private void debugInfo(String message, Exception e)
+ {
+ debugInfo(message + stackTraceToSingleLineString(e));
+ }
+
+ private void initTest()
+ {
+ rs1Port = -1;
+ rs2Port = -1;
+ rd1 = null;
+ rd2 = null;
+ rs1 = null;
+ rs2 = null;
+ findFreePorts();
+ }
+
+ private void endTest()
+ {
+ if (rd1 != null)
+ {
+ rd1.shutdown();
+ rd1 = null;
+ }
+
+ if (rd2 != null)
+ {
+ rd2.shutdown();
+ rd2 = null;
+ }
+
+ if (rs1 != null)
+ {
+ rs1.shutdown();
+ rs1 = null;
+ }
+
+ if (rs2 != null)
+ {
+ rs2.shutdown();
+ rs2 = null;
+ }
+ rs1Port = -1;
+ rs2Port = -1;
+ }
+
+ /**
+ * Test the failover feature when one RS fails:
+ * 1 DS (DS1) and 2 RS (RS1 and RS2) in topology.
+ * DS1 connected to RS1 (DS1<->RS1)
+ * Both RS are connected together (RS1<->RS2)
+ * RS1 fails, DS1 should be connected to RS2
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test
+ public void testFailOverSingle() throws Exception
+ {
+ String testCase = "testFailOverSingle";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ // Start RS1
+ rs1 = createReplicationServer(RS1_ID, testCase);
+ // Start RS2
+ rs2 = createReplicationServer(RS2_ID, testCase);
+
+ // Start DS1
+ DN baseDn = DN.decode(BASEDN_STRING);
+ rd1 = createReplicationDomain(baseDn, DS1_ID, testCase);
+
+ // DS1 connected to RS1 ?
+ String msg = "Before " + RS1_ID + " failure";
+ checkConnection(DS1_ID, RS1_ID, msg);
+
+ // Simulate RS1 failure
+ rs1.shutdown();
+ // Let time for failover to happen
+ sleep(5000);
+
+ // DS1 connected to RS2 ?
+ msg = "After " + RS1_ID + " failure";
+ checkConnection(DS1_ID, RS2_ID, msg);
+
+ endTest();
+ }
+
+ /**
+ * Test the failover feature when one RS fails:
+ * 2 DS (DS1 and DS2) and 2 RS (RS1 and RS2) in topology.
+ * Each DS connected to its own RS (DS1<->RS1, DS2<->RS2)
+ * Both RS are connected together (RS1<->RS2)
+ * RS1 fails, DS1 and DS2 should be both connected to RS2
+ * RS1 comes back (no change)
+ * RS2 fails, DS1 and DS2 should be both connected to RS1
+ *
+ * @throws Exception If a problem occured
+ */
+ @Test(enabled = false)
+ // This test to be run in standalone, not in precommit
+ // because the timing is important as we restart servers after they fail
+ // and thus cannot warrenty that the recovering server is the right one if
+ // the sleep time is not enough with regard to thread scheduling in heavy
+ // precommit environment
+ public void testFailOverMulti() throws Exception
+ {
+ String testCase = "testFailOverMulti";
+
+ debugInfo("Starting " + testCase);
+
+ initTest();
+
+ // Start RS1
+ rs1 = createReplicationServer(RS1_ID, testCase);
+ // Start RS2
+ rs2 = createReplicationServer(RS2_ID, testCase);
+
+ // Start DS1
+ DN baseDn = DN.decode(BASEDN_STRING);
+ rd1 = createReplicationDomain(baseDn, DS1_ID, testCase);
+ // Start DS2
+ rd2 = createReplicationDomain(baseDn, DS2_ID, testCase);
+
+ // DS1 connected to RS1 ?
+ String msg = "Before " + RS1_ID + " failure";
+ checkConnection(DS1_ID, RS1_ID, msg);
+ // DS2 connected to RS2 ?
+ checkConnection(DS2_ID, RS2_ID, msg);
+
+ // Simulate RS1 failure
+ rs1.shutdown();
+ // Let time for failover to happen
+ sleep(5000);
+
+ // DS1 connected to RS2 ?
+ msg = "After " + RS1_ID + " failure";
+ checkConnection(DS1_ID, RS2_ID, msg);
+ // DS2 connected to RS2 ?
+ checkConnection(DS2_ID, RS2_ID, msg);
+
+ // Restart RS1
+ rs1 = createReplicationServer(RS1_ID, testCase);
+ // Let time for RS1 to restart
+ sleep(5000);
+
+ // DS1 connected to RS2 ?
+ msg = "Before " + RS2_ID + " failure";
+ checkConnection(DS1_ID, RS2_ID, msg);
+ // DS2 connected to RS2 ?
+ checkConnection(DS2_ID, RS2_ID, msg);
+
+ // Simulate RS2 failure
+ rs2.shutdown();
+ // Let time for failover to happen
+ sleep(5000);
+
+ // DS1 connected to RS1 ?
+ msg = "After " + RS2_ID + " failure";
+ checkConnection(DS1_ID, RS1_ID, msg);
+ // DS2 connected to RS1 ?
+ checkConnection(DS2_ID, RS1_ID, msg);
+
+ // Restart RS2
+ rs2 = createReplicationServer(RS2_ID, testCase);
+ // Let time for RS2 to restart
+ sleep(5000);
+
+ // DS1 connected to RS1 ?
+ msg = "After " + RS2_ID + " restart";
+ checkConnection(DS1_ID, RS1_ID, msg);
+ // DS2 connected to RS1 ?
+ checkConnection(DS2_ID, RS1_ID, msg);
+
+ endTest();
+ }
+
+ private void sleep(long time)
+ {
+ try
+ {
+ Thread.sleep(time);
+ } catch (InterruptedException ex)
+ {
+ fail("Error sleeping " + stackTraceToSingleLineString(ex));
+ }
+ }
+
+ /**
+ * Check connection of the provided replication domain to the provided
+ * replication server.
+ */
+ private void checkConnection(short dsId, short rsId, String msg)
+ {
+
+ int rsPort = -1;
+ ReplicationDomain rd = null;
+ if (dsId == DS1_ID)
+ {
+ rd = rd1;
+ } else if (dsId == DS2_ID)
+ {
+ rd = rd2;
+ } else
+ {
+ fail("Unknown replication domain server id.");
+ }
+
+ if (rsId == RS1_ID)
+ {
+ rsPort = rs1Port;
+ } else if (rsId == RS2_ID)
+ {
+ rsPort = rs2Port;
+ } else
+ {
+ fail("Unknown replication server id.");
+ }
+
+ // Connected ?
+ assertEquals(rd.isConnected(), true,
+ "Replication domain " + dsId +
+ " is not connected to a replication server (" + msg + ")");
+ // Right port ?
+ String serverStr = rd.getReplicationServer();
+ int index = serverStr.lastIndexOf(':');
+ if ((index == -1) || (index >= serverStr.length()))
+ fail("Enable to find port number in: " + serverStr);
+ String rdPortStr = serverStr.substring(index + 1);
+ int rdPort = -1;
+ try
+ {
+ rdPort = (new Integer(rdPortStr)).intValue();
+ } catch (Exception e)
+ {
+ fail("Enable to get an int from: " + rdPortStr);
+ }
+ assertEquals(rdPort, rsPort,
+ "Replication domain " + dsId +
+ " is not connected to right replication server port (" +
+ rdPort + ") was expecting " + rsPort +
+ " (" + msg + ")");
+ }
+
+ /**
+ * Find needed free TCP ports.
+ */
+ private void findFreePorts()
+ {
+ try
+ {
+ ServerSocket socket1 = TestCaseUtils.bindFreePort();
+ ServerSocket socket2 = TestCaseUtils.bindFreePort();
+ rs1Port = socket1.getLocalPort();
+ rs2Port = socket2.getLocalPort();
+ socket1.close();
+ socket2.close();
+ } catch (IOException e)
+ {
+ fail("Unable to determinate some free ports " +
+ stackTraceToSingleLineString(e));
+ }
+ }
+
+ /**
+ * Creates a new ReplicationServer.
+ */
+ private ReplicationServer createReplicationServer(short serverId,
+ String suffix)
+ {
+ SortedSet<String> replServers = new TreeSet<String>();
+ try
+ {
+ int port = -1;
+ if (serverId == RS1_ID)
+ {
+ port = rs1Port;
+ replServers.add("localhost:" + rs2Port);
+ } else if (serverId == RS2_ID)
+ {
+ port = rs2Port;
+ replServers.add("localhost:" + rs1Port);
+ } else
+ {
+ fail("Unknown replication server id.");
+ }
+
+ String dir = "genid" + serverId + suffix + "Db";
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
+ replServers);
+ ReplicationServer replicationServer = new ReplicationServer(conf);
+ return replicationServer;
+
+ } catch (Exception e)
+ {
+ fail("createReplicationServer " + stackTraceToSingleLineString(e));
+ }
+ return null;
+ }
+
+ /**
+ * Creates a new ReplicationDomain.
+ */
+ private ReplicationDomain createReplicationDomain(DN baseDn, short serverId,
+ String suffix)
+ {
+
+ SortedSet<String> replServers = new TreeSet<String>();
+ try
+ {
+ if (serverId == DS1_ID)
+ {
+ replServers.add("localhost:" + rs1Port);
+ } else if (serverId == DS2_ID)
+ {
+ replServers.add("localhost:" + rs2Port);
+ } else
+ {
+ fail("Unknown replication domain server id.");
+ }
+
+ DomainFakeCfg domainConf =
+ new DomainFakeCfg(baseDn, serverId, replServers);
+ //domainConf.setHeartbeatInterval(500);
+ ReplicationDomain replicationDomain =
+ MultimasterReplication.createNewDomain(domainConf);
+
+ // Add other server (doing that after connection insure we connect to
+ // the right server)
+ // WARNING: only works because for the moment, applying changes to conf
+ // does not force reconnection in replication domain
+ // when it is coded, the reconnect may 1 of both servers and we can not
+ // guaranty anymore that we reach the server we want at the beginning.
+ if (serverId == DS1_ID)
+ {
+ replServers.add("localhost:" + rs2Port);
+ } else if (serverId == DS2_ID)
+ {
+ replServers.add("localhost:" + rs1Port);
+ } else
+ {
+ fail("Unknown replication domain server id.");
+ }
+ domainConf = new DomainFakeCfg(baseDn, serverId, replServers);
+ ConfigChangeResult chgRes =
+ replicationDomain.applyConfigurationChange(domainConf);
+ if ((chgRes == null) ||
+ (!chgRes.getResultCode().equals(ResultCode.SUCCESS)))
+ {
+ fail("Could not change replication domain config" +
+ " (add some replication servers).");
+ }
+
+ return replicationDomain;
+
+ } catch (Exception e)
+ {
+ fail("createReplicationDomain " + stackTraceToSingleLineString(e));
+ }
+ return null;
+ }
+
+ /**
+ * Set up the environment.
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @BeforeClass
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ // In case we need to extend
+ }
+
+ /**
+ * Clean up the environment.
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @AfterClass
+ @Override
+ public void classCleanUp() throws Exception
+ {
+ super.classCleanUp();
+ // In case we need it extend
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index fc5180d..210d849 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -785,7 +785,8 @@
}
else
{
- fail("ReplicationServer transmission failed: no expected message class.");
+ fail("ReplicationServer transmission failed: no expected message" +
+ " class: " + msg2);
break;
}
}
--
Gitblit v1.10.0