From 3a9e211d36ee94ff99941943b3b51e0f768624f5 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 06 Nov 2009 09:11:40 +0000
Subject: [PATCH] In order to support a more clever algorithm for the DS to choose his RS, we introduce:
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java | 6
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java | 6
opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java | 89 +--
opends/src/server/org/opends/server/replication/common/DSInfo.java | 18
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 8
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java | 18
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java | 6
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 8
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 17
opends/src/messages/messages/replication.properties | 2
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 92 +++
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 378 +++++++++++-----
opends/resource/schema/02-config.ldif | 8
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 21
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 27
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java | 2
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 57 --
opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 8
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java | 55 ++
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 236 ++++++++-
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java | 6
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 203 ++++----
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java | 18
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 6
opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml | 23 +
opends/src/server/org/opends/server/replication/common/RSInfo.java | 30 +
28 files changed, 935 insertions(+), 417 deletions(-)
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 158ad3b..148b8d1 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -2458,6 +2458,11 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.605
+ NAME 'ds-cfg-monitoring-period'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+ SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
NAME 'ds-cfg-access-control-handler'
SUP top
@@ -3137,7 +3142,8 @@
ds-cfg-group-id $
ds-cfg-assured-timeout $
ds-cfg-degraded-status-threshold $
- ds-cfg-weight)
+ ds-cfg-weight $
+ ds-cfg-monitoring-period)
X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.65
NAME 'ds-backup-directory'
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
index 6c91def..57e8b50 100644
--- a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
+++ b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -296,4 +296,27 @@
</ldap:attribute>
</adm:profile>
</adm:property>
+ <adm:property name="monitoring-period" mandatory="false">
+ <adm:synopsis>
+ The period between sending of monitoring messages.
+ </adm:synopsis>
+ <adm:description>
+ Defines the amount of milliseconds the replication server will wait before
+ sending new monitoring messages to its peers (replication servers and
+ directory servers).
+ </adm:description>
+ <adm:default-behavior>
+ <adm:defined>
+ <adm:value>3000ms</adm:value>
+ </adm:defined>
+ </adm:default-behavior>
+ <adm:syntax>
+ <adm:duration base-unit="ms" lower-limit="1000" />
+ </adm:syntax>
+ <adm:profile name="ldap">
+ <ldap:attribute>
+ <ldap:name>ds-cfg-monitoring-period</ldap:name>
+ </ldap:attribute>
+ </adm:profile>
+ </adm:property>
</adm:managed-object>
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index da7c3de..801eca7 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -170,7 +170,7 @@
UTF-8. This is required to be able to encode the changes in the database. \
This replication server will now shutdown
SEVERE_ERR_REPLICATION_COULD_NOT_CONNECT_61=The Replication is configured for \
- suffix %s but was not able to connect to any Replication Server
+ suffix %s but was not able to connect to any Replication Server
NOTICE_NOW_FOUND_SAME_GENERATION_CHANGELOG_62=Replication is up and running \
for domain %s with replication server id %s %s - local server id is %s - data \
generation is %s
diff --git a/opends/src/server/org/opends/server/replication/common/DSInfo.java b/opends/src/server/org/opends/server/replication/common/DSInfo.java
index 1e58428..aeb1835 100644
--- a/opends/src/server/org/opends/server/replication/common/DSInfo.java
+++ b/opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -247,23 +247,23 @@
StringBuffer sb = new StringBuffer();
sb.append("DS id: ");
sb.append(dsId);
- sb.append(" RS id: ");
+ sb.append(" ; RS id: ");
sb.append(rsId);
- sb.append(" Generation id: ");
+ sb.append(" ; Generation id: ");
sb.append(generationId);
- sb.append(" Status: ");
+ sb.append(" ; Status: ");
sb.append(status);
- sb.append(" Assured replication: ");
+ sb.append(" ; Assured replication: ");
sb.append(assuredFlag);
- sb.append(" Assured mode: ");
+ sb.append(" ; Assured mode: ");
sb.append(assuredMode);
- sb.append(" Safe data level: ");
+ sb.append(" ; Safe data level: ");
sb.append(safeDataLevel);
- sb.append(" Group id: ");
+ sb.append(" ; Group id: ");
sb.append(groupId);
- sb.append(" Referral URLs: ");
+ sb.append(" ; Referral URLs: ");
sb.append(refUrls);
- sb.append(" ECL Include: ");
+ sb.append(" ; ECL Include: ");
sb.append(eclIncludes);
return sb.toString();
}
diff --git a/opends/src/server/org/opends/server/replication/common/RSInfo.java b/opends/src/server/org/opends/server/replication/common/RSInfo.java
index 5b6d9e2..577ffd3 100644
--- a/opends/src/server/org/opends/server/replication/common/RSInfo.java
+++ b/opends/src/server/org/opends/server/replication/common/RSInfo.java
@@ -40,6 +40,11 @@
private long generationId = -1;
// Group id of the RS
private byte groupId = (byte) -1;
+ // The weight of the RS
+ // It is important to keep the default value to 1 so that it is used as
+ // default value for a RS using protocol V3: this default value vill be used
+ // in algorithms that use weight
+ private int weight = 1;
/**
* Creates a new instance of RSInfo with every given info.
@@ -47,12 +52,14 @@
* @param id The RS id
* @param generationId The generation id the RS is using
* @param groupId RS group id
+ * @param weight RS weight
*/
- public RSInfo(int id, long generationId, byte groupId)
+ public RSInfo(int id, long generationId, byte groupId, int weight)
{
this.id = id;
this.generationId = generationId;
this.groupId = groupId;
+ this.weight = weight;
}
/**
@@ -83,6 +90,16 @@
}
/**
+ * Get the RS weight.
+ * @return The RS weight
+ */
+ public int getWeight()
+ {
+ return weight;
+ }
+
+
+ /**
* Test if the passed object is equal to this one.
* @param obj The object to test
* @return True if both objects are equal
@@ -99,7 +116,8 @@
RSInfo rsInfo = (RSInfo) obj;
return ((id == rsInfo.getId()) &&
(generationId == rsInfo.getGenerationId()) &&
- (groupId == rsInfo.getGroupId()));
+ (groupId == rsInfo.getGroupId()) &&
+ (weight == rsInfo.getWeight()));
} else
{
return false;
@@ -117,6 +135,7 @@
hash = 37 * hash + this.id;
hash = 37 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
hash = 37 * hash + this.groupId;
+ hash = 37 * hash + this.weight;
return hash;
}
@@ -130,11 +149,12 @@
StringBuffer sb = new StringBuffer();
sb.append("Id: ");
sb.append(id);
- sb.append(" Generation id: ");
+ sb.append(" ; Generation id: ");
sb.append(generationId);
- sb.append(" Group id: ");
+ sb.append(" ; Group id: ");
sb.append(groupId);
+ sb.append(" ; Weight: ");
+ sb.append(weight);
return sb.toString();
}
-
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index e3f6e48..138b2b1 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -93,6 +93,24 @@
}
/**
+ * Sets the sender ID.
+ * @param senderID The sender ID.
+ */
+ public void setSenderID(int senderID)
+ {
+ this.senderID = senderID;
+ }
+
+ /**
+ * Sets the destination.
+ * @param destination The destination.
+ */
+ public void setDestination(int destination)
+ {
+ this.destination = destination;
+ }
+
+ /**
* Sets the state of the replication server.
* @param state The state.
*/
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
index db05464..0924254 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -31,7 +31,7 @@
/**
* This message is part of the replication protocol.
- * RS1 sends a MonitorRequestMsg to RS2 to requests its monitoring
+ * RS1 sends a MonitorRequestMsg to RS2 to request its monitoring
* informations.
* When RS2 receives a MonitorRequestMsg from RS1, RS2 responds with a
* MonitorMessage.
diff --git a/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java b/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
index 5b38558..8b3172a 100644
--- a/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
@@ -29,7 +29,7 @@
/**
* This is an abstract class of messages of the replication protocol
* for message that needs to contain information about the server that
- * send them and the destination servers to whitch they should be sent.
+ * send them and the destination servers to which they should be sent.
*/
public abstract class RoutableMsg extends ReplicationMsg
{
diff --git a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index f357409..85a4d1d 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -167,6 +167,7 @@
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
+ // Put ECL includes
Set<String> attrs = dsInfo.getEclIncludes();
oStream.write(attrs.size());
for (String attr : attrs)
@@ -192,8 +193,15 @@
oStream.write(String.valueOf(rsInfo.getGenerationId()).
getBytes("UTF-8"));
oStream.write(0);
- // Put DS group id
+ // Put RS group id
oStream.write(rsInfo.getGroupId());
+
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // Put RS weight
+ oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
+ oStream.write(0);
+ }
}
return oStream.toByteArray();
@@ -332,23 +340,30 @@
int length = getNextLength(in, pos);
String serverIdString = new String(in, pos, length, "UTF-8");
int id = Integer.valueOf(serverIdString);
- pos +=
- length + 1;
+ pos += length + 1;
/* Read the generation id */
length = getNextLength(in, pos);
long generationId =
Long.valueOf(new String(in, pos, length,
"UTF-8"));
- pos +=
- length + 1;
+ pos += length + 1;
/* Read RS group id */
byte groupId = in[pos++];
+ int weight = 1;
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ /* Read RS weight */
+ length = getNextLength(in, pos);
+ weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length + 1;
+ }
+
/* Now create RSInfo and store it in list */
- RSInfo rsInfo = new RSInfo(id, generationId, groupId);
+ RSInfo rsInfo = new RSInfo(id, generationId, groupId, weight);
rsList.add(rsInfo);
nRsInfo--;
diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index cbc6bad..5053f42 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -309,8 +309,7 @@
try
{
- MonitorData md;
- md = replicationServerDomain.computeMonitorData();
+ MonitorData md = replicationServerDomain.computeMonitorData();
// Oldest missing update
Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
@@ -538,7 +537,7 @@
return;
}
- // Send our own TopologyMsg to remote RS
+ // Send our own TopologyMsg to remote DS
TopologyMsg outTopoMsg = sendTopoToRemoteDS();
logStartSessionHandshake(inStartSessionMsg, outTopoMsg);
@@ -572,6 +571,9 @@
// Create the status analyzer for the domain if not already started
createStatusAnalyzer();
+ // Create the monitoring publisher for the domain if not already started
+ createMonitoringPublisher();
+
registerIntoDomain();
super.finalizeStart();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index b4403da..71f4386 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -49,8 +49,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -101,6 +99,7 @@
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import com.sleepycat.je.DatabaseException;
+import java.util.Collections;
/**
* ReplicationServer Listener.
@@ -173,6 +172,10 @@
// the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled
private int degradedStatusThreshold = 5000;
+ // Number of milliseconds to wait before sending new monitoring messages.
+ // If value is 0, monitoring publisher is disabled
+ private long monitoringPublisherPeriod = 3000;
+
// The handler of the draft change numbers database, the database used to
// store the relation between a draft change number ('seqnum') and the
// associated cookie.
@@ -211,6 +214,13 @@
private int weight = 1;
/**
+ * Holds the list of all replication servers instantiated in this VM.
+ * This allows to perform clean up of the RS databases in unit tests.
+ */
+ private static List<ReplicationServer> allInstances =
+ new ArrayList<ReplicationServer>();
+
+ /**
* Creates a new Replication server using the provided configuration entry.
*
* @param configuration The configuration of this replication server.
@@ -254,6 +264,7 @@
groupId = (byte)configuration.getGroupId();
assuredTimeout = configuration.getAssuredTimeout();
degradedStatusThreshold = configuration.getDegradedStatusThreshold();
+ monitoringPublisherPeriod = configuration.getMonitoringPeriod();
replSessionSecurity = new ReplSessionSecurity();
initialize(replicationPort);
@@ -274,8 +285,20 @@
DirectoryServer.registerImportTaskListener(this);
localPorts.add(replicationPort);
+
+ // Keep track of this new instance
+ allInstances.add(this);
}
+ /**
+ * Get the list of every replication servers instantiated in the current VM.
+ * @return The list of every replication servers instantiated in the current
+ * VM.
+ */
+ public static List<ReplicationServer> getAllInstances()
+ {
+ return allInstances;
+ }
/**
* The run method for the Listen thread.
@@ -850,7 +873,9 @@
dbEnv.shutdown();
}
-}
+ // Remove this instance from the global instance list
+ allInstances.remove(this);
+ }
/**
@@ -1028,6 +1053,32 @@
}
}
+ // Update period value for monitoring publishers (stop them if requested
+ // value is 0)
+ if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
+ {
+ long oldMonitoringPeriod = monitoringPublisherPeriod;
+ monitoringPublisherPeriod = configuration.getMonitoringPeriod();
+ for(ReplicationServerDomain rsd : baseDNs.values())
+ {
+ if (monitoringPublisherPeriod == 0L)
+ {
+ // Requested to stop monitoring publishers
+ rsd.stopMonitoringPublisher();
+ } else if (rsd.isRunningMonitoringPublisher())
+ {
+ // Update the threshold value for this running monitoring publisher
+ rsd.updateMonitoringPublisher(monitoringPublisherPeriod);
+ } else if (oldMonitoringPeriod == 0L)
+ {
+ // Requested to start monitoring publishers with provided period value
+ if ( (rsd.getConnectedDSs().size() > 0) ||
+ (rsd.getConnectedRSs().size() > 0) )
+ rsd.startMonitoringPublisher();
+ }
+ }
+ }
+
// Changed the group id ?
byte newGroupId = (byte)configuration.getGroupId();
if (newGroupId != groupId)
@@ -1044,7 +1095,10 @@
if (weight != configuration.getWeight())
{
weight = configuration.getWeight();
- // TODO: send new TopologyMsg
+ // Broadcast the new weight the the whole topology. This will make some
+ // DSs reconnect (if needed) to other RSs according to the new weight of
+ // this RS.
+ broadcastConfigChange();
}
if ((configuration.getReplicationDBDirectory() != null) &&
@@ -1057,6 +1111,19 @@
}
/**
+ * Broadcast a configuration change that just happened to the whole topology
+ * by sending a TopologyMsg to every entity in the topology.
+ */
+ private void broadcastConfigChange()
+ {
+ for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
+ {
+ replicationServerDomain.buildAndSendTopoInfoToDSs(null);
+ replicationServerDomain.buildAndSendTopoInfoToRSs();
+ }
+ }
+
+ /**
* {@inheritDoc}
*/
public boolean isConfigurationChangeAcceptable(
@@ -1345,6 +1412,15 @@
}
/**
+ * Get the monitoring publisher period value.
+ * @return the monitoring publisher period value.
+ */
+ public long getMonitoringPublisherPeriod()
+ {
+ return monitoringPublisherPeriod;
+ }
+
+ /**
* Compute the list of replication servers that are not any
* more connected to this Replication Server and stop the
* corresponding handlers.
@@ -1411,12 +1487,80 @@
/* The date of the last time they have been elaborated */
private long monitorDataLastBuildDate = 0;
- /* Search op on monitor data is processed by a worker thread.
- * Requests are sent to the other RS,and responses are received by the
- * listener threads.
- * The worker thread is awoke on this semaphore, or on timeout.
+ /**
+ * This uniquely identifies a server (handler) in the cross-domain topology.
+ * Represents an identifier of a handler (in the whole RS) we have to wait a
+ * monitoring message from before answering to a monitor request.
*/
- Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
+ public static class GlobalServerId {
+
+ private int serverId = -1;
+ private String baseDn = null;
+
+ /**
+ * Constructor for a global server id.
+ * @param baseDn The dn of the RSD owning the handler.
+ * @param serverId The handler id in the matching RSD.
+ */
+ public GlobalServerId(String baseDn, int serverId) {
+ this.baseDn = baseDn;
+ this.serverId = serverId;
+ }
+
+ /**
+ * Get the server handler id.
+ * @return the serverId
+ */
+ public int getServerId()
+ {
+ return serverId;
+ }
+
+ /**
+ * Get the base dn.
+ * @return the baseDn
+ */
+ public String getBaseDn()
+ {
+ return baseDn;
+ }
+
+ /**
+ * Get the hascode.
+ * @return The hashcode.
+ */
+ @Override
+ public int hashCode()
+ {
+ int hash = 7;
+ hash = 43 * hash + this.serverId;
+ hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0);
+ return hash;
+ }
+
+ /**
+ * Tests if the passed global server handler id represents the same server
+ * handler as this one.
+ * @param obj The object to test.
+ * @return True if both identifiers are the same.
+ */
+ public boolean equals(Object obj) {
+ if ( (obj == null) || (obj instanceof GlobalServerId))
+ return false;
+
+ GlobalServerId globalServerId = (GlobalServerId)obj;
+ return ( globalServerId.baseDn.equals(baseDn) &&
+ (globalServerId.serverId == serverId) );
+ }
+ }
+
+ /**
+ * This gives the list of server handlers we are willing to wait monitoring
+ * message from. Each time a monitoring message is received by a server
+ * handler, the matching server handler id is retired from the list. When the
+ * list is empty, we received all expected monitoring messages.
+ */
+ private List<GlobalServerId> expectedMonitoringMsg = null;
/**
* Trigger the computation of the Global Monitoring Data.
@@ -1429,7 +1573,7 @@
*
* @throws DirectoryException If the computation cannot be achieved.
*/
- public void computeMonitorData() throws DirectoryException
+ public synchronized void computeMonitorData() throws DirectoryException
{
if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
{
@@ -1440,15 +1584,17 @@
return;
}
- remoteMonitorResponsesSemaphore.drainPermits();
- int count = 0;
+ // Initialize the list of server handlers we expect monitoring messages from
+ expectedMonitoringMsg =
+ Collections.synchronizedList(new ArrayList<GlobalServerId>());
+
for (ReplicationServerDomain domain : baseDNs.values())
{
- count += domain.initializeMonitorData();
+ domain.initializeMonitorData(expectedMonitoringMsg);
}
// Wait for responses
- waitMonitorDataResponses(count);
+ waitMonitorDataResponses();
for (ReplicationServerDomain domain : baseDNs.values())
{
@@ -1457,38 +1603,51 @@
}
/**
- * Wait for the expected count of received MonitorMsg.
- * @param expectedResponses The number of expected answers.
+ * Wait for the expected received MonitorMsg.
* @throws DirectoryException When an error occurs.
*/
- private void waitMonitorDataResponses(int expectedResponses)
+ private void waitMonitorDataResponses()
throws DirectoryException
{
try
{
if (debugEnabled())
TRACER.debugInfo(
- "In " + getMonitorInstanceName() + " baseDn=" +
- " waiting for " + expectedResponses + " expected monitor messages");
+ "In " + getMonitorInstanceName() +
+ " waiting for " + expectedMonitoringMsg.size() +
+ " expected monitor messages");
- boolean allPermitsAcquired =
- remoteMonitorResponsesSemaphore.tryAcquire(
- expectedResponses,
- (long) 5000, TimeUnit.MILLISECONDS);
-
- if (!allPermitsAcquired)
+ // Wait up to 5 seconds for every expected monitoring message to come
+ // back.
+ boolean allReceived = false;
+ long startTime = TimeThread.getTime();
+ long curTime = startTime;
+ int maxTime = 5000;
+ while ( (curTime - startTime) < maxTime )
{
- monitorDataLastBuildDate = TimeThread.getTime();
+ // Have every expected monitoring messages arrived ?
+ if (expectedMonitoringMsg.size() == 0)
+ {
+ // Ok break the loop
+ allReceived = true;
+ break;
+ }
+ Thread.sleep(100);
+ curTime = TimeThread.getTime();
+ }
+
+ monitorDataLastBuildDate = TimeThread.getTime();
+
+ if (!allReceived)
+ {
logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
- // let's go on in best effort even with limited data received.
+ // let's go on in best effort even with limited data received.
} else
{
- monitorDataLastBuildDate = TimeThread.getTime();
if (debugEnabled())
TRACER.debugInfo(
- "In " + getMonitorInstanceName() + " baseDn=" +
- " Successfully received all " + expectedResponses +
- " expected monitor messages");
+ "In " + getMonitorInstanceName() +
+ " Successfully received all expected monitor messages");
}
} catch (Exception e)
{
@@ -1499,11 +1658,18 @@
/**
* This should be called by each ReplicationServerDomain that receives
- * a response to a monitor request message.
+ * a response to a monitor request message. This may also be called when a
+ * monitoring message is coming from a RS whose monitoring publisher thread
+ * sent it. As monitoring messages (sent because of monitoring request or
+ * because of monitoring publisher) have the same content, this is also ok
+ * to mark ok the server when the monitoring message coms from a monitoring
+ * publisher thread.
+ * @param globalServerId The server handler that is receiving the
+ * monitoring message.
*/
- public void responseReceived()
+ public void responseReceived(GlobalServerId globalServerId)
{
- remoteMonitorResponsesSemaphore.release();
+ expectedMonitoringMsg.remove(globalServerId);
}
@@ -1513,7 +1679,7 @@
*/
public void responseReceivedAll()
{
- remoteMonitorResponsesSemaphore.notifyAll();
+ expectedMonitoringMsg.clear();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4ef072c..ab1da75 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -85,6 +85,8 @@
import org.opends.server.types.ResultCode;
import com.sleepycat.je.DatabaseException;
+import org.opends.server.replication.server.
+ ReplicationServer.GlobalServerId;
/**
* This class define an in-memory cache that will be used to store
@@ -109,6 +111,10 @@
// late or not
private StatusAnalyzer statusAnalyzer = null;
+ // The monitoring publisher that periodically sends monitoring messages to the
+ // topology
+ private MonitoringPublisher monitoringPublisher = null;
+
/*
* The following map contains one balanced tree for each replica ID
* to which we are currently publishing
@@ -1066,6 +1072,17 @@
// Try doing job anyway...
}
+ // Stop useless monitoring publisher if no more RS or DS in domain
+ if ( (directoryServers.size() + replicationServers.size() )== 1)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In " +
+ replicationServer.getMonitorInstanceName() +
+ " remote server " + handler.getMonitorInstanceName() + " is " +
+ "the last RS/DS to be stopped: stopping monitoring publisher");
+ stopMonitoringPublisher();
+ }
+
if (handler.isReplicationServer())
{
if (replicationServers.containsValue(handler))
@@ -1082,44 +1099,39 @@
buildAndSendTopoInfoToDSs(null);
}
}
- } else
+ } else if (directoryServers.containsValue(handler))
{
- if (directoryServers.containsValue(handler))
+ // If this is the last DS for the domain,
+ // shutdown the status analyzer
+ if (directoryServers.size() == 1)
{
- // If this is the last DS for the domain,
- // shutdown the status analyzer
- if (directoryServers.size() == 1)
- {
- if (debugEnabled())
- TRACER.debugInfo("In " +
- replicationServer.getMonitorInstanceName() +
- " remote server " + handler.getMonitorInstanceName() +
+ if (debugEnabled())
+ TRACER.debugInfo("In " +
+ replicationServer.getMonitorInstanceName() +
+ " remote server " + handler.getMonitorInstanceName() +
" is the last DS to be stopped: stopping status analyzer");
- stopStatusAnalyzer();
- }
+ stopStatusAnalyzer();
+ }
- unregisterServerHandler(handler);
- handler.shutdown();
+ unregisterServerHandler(handler);
+ handler.shutdown();
- // Check if generation id has to be reset
- mayResetGenerationId();
+ // Check if generation id has to be reset
+ mayResetGenerationId();
+ if (!shutdown)
+ {
// Update the remote replication servers with our list
// of connected LDAP servers
- if (!shutdown)
- {
- buildAndSendTopoInfoToRSs();
- // Warn our DSs that a RS or DS has quit (does not use this
- // handler as already removed from list)
- buildAndSendTopoInfoToDSs(null);
- }
+ buildAndSendTopoInfoToRSs();
+ // Warn our DSs that a RS or DS has quit (does not use this
+ // handler as already removed from list)
+ buildAndSendTopoInfoToDSs(null);
}
- else if (otherHandlers.contains(handler))
- {
- unRegisterHandler(handler);
- handler.shutdown();
- }
+ } else if (otherHandlers.contains(handler))
+ {
+ unRegisterHandler(handler);
+ handler.shutdown();
}
-
}
catch(Exception e)
{
@@ -1581,99 +1593,51 @@
// in the topology.
if (senderHandler.isDataServer())
{
- MonitorMsg returnMsg =
- new MonitorMsg(msg.getDestination(), msg.getsenderID());
+ // Monitoring information requested by a DS
+ MonitorMsg monitorMsg =
+ createGlobalTopologyMonitorMsg(msg.getDestination(),
+ msg.getsenderID());
- try
+ if (monitorMsg != null)
{
- returnMsg.setReplServerDbState(getDbServerState());
- // Update the information we have about all servers
- // in the topology.
- MonitorData md = computeMonitorData();
-
- // Add the informations about the Replicas currently in
- // the topology.
- Iterator<Integer> it = md.ldapIterator();
- while (it.hasNext())
+ try
{
- int replicaId = it.next();
- returnMsg.setServerState(
- replicaId, md.getLDAPServerState(replicaId),
- md.getApproxFirstMissingDate(replicaId), true);
- }
-
- // Add the informations about the Replication Servers
- // currently in the topology.
- it = md.rsIterator();
- while (it.hasNext())
+ senderHandler.send(monitorMsg);
+ } catch (IOException e)
{
- int replicaId = it.next();
- returnMsg.setServerState(
- replicaId, md.getRSStates(replicaId),
- md.getRSApproxFirstMissingDate(replicaId), false);
+ // the connection was closed.
}
}
- catch (DirectoryException e)
- {
- // If we can't compute the Monitor Information, send
- // back an empty message.
- }
- try
- {
- senderHandler.send(returnMsg);
- } catch (IOException e)
- {
- // the connection was closed.
- }
return;
- }
-
- MonitorMsg monitorMsg =
- new MonitorMsg(msg.getDestination(), msg.getsenderID());
-
- // Populate for each connected LDAP Server
- // from the states stored in the serverHandler.
- // - the server state
- // - the older missing change
- for (DataServerHandler lsh : this.directoryServers.values())
+ } else
{
- monitorMsg.setServerState(
- lsh.getServerId(),
- lsh.getServerState(),
- lsh.getApproxFirstMissingDate(),
- true);
- }
+ // Monitoring information requested by a RS
+ MonitorMsg monitorMsg =
+ createLocalTopologyMonitorMsg(msg.getDestination(),
+ msg.getsenderID());
- // Same for the connected RS
- for (ReplicationServerHandler rsh : this.replicationServers.values())
- {
- monitorMsg.setServerState(
- rsh.getServerId(),
- rsh.getServerState(),
- rsh.getApproxFirstMissingDate(),
- false);
- }
-
- // Populate the RS state in the msg from the DbState
- monitorMsg.setReplServerDbState(this.getDbServerState());
-
-
- try
- {
- senderHandler.send(monitorMsg);
- } catch (Exception e)
- {
- // We log the error. The requestor will detect a timeout or
- // any other failure on the connection.
- logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
- Integer.toString((msg.getDestination()))));
+ if (monitorMsg != null)
+ {
+ try
+ {
+ senderHandler.send(monitorMsg);
+ } catch (Exception e)
+ {
+ // We log the error. The requestor will detect a timeout or
+ // any other failure on the connection.
+ logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
+ Integer.toString((msg.getDestination()))));
+ }
+ }
}
} else if (msg instanceof MonitorMsg)
{
MonitorMsg monitorMsg =
(MonitorMsg) msg;
- receivesMonitorDataResponse(monitorMsg);
+ GlobalServerId globalServerId =
+ new GlobalServerId(baseDn, senderHandler.getServerId());
+ receivesMonitorDataResponse(monitorMsg, globalServerId);
} else
{
logError(NOTE_ERR_ROUTING_TO_SERVER.get(
@@ -1775,6 +1739,116 @@
}
/**
+ * Creates a new monitor message including monitoring information for the
+ * whole topology.
+ * @param sender The sender of this message.
+ * @param destination The destination of this message.
+ * @return The newly created and filled MonitorMsg. Null if a problem occurred
+ * during message creation.
+ */
+ public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
+ {
+ MonitorMsg returnMsg =
+ new MonitorMsg(sender, destination);
+
+ try
+ {
+ returnMsg.setReplServerDbState(getDbServerState());
+ // Update the information we have about all servers
+ // in the topology.
+ MonitorData md = computeMonitorData();
+
+ // Add the informations about the Replicas currently in
+ // the topology.
+ Iterator<Integer> it = md.ldapIterator();
+ while (it.hasNext())
+ {
+ int replicaId = it.next();
+ returnMsg.setServerState(
+ replicaId, md.getLDAPServerState(replicaId),
+ md.getApproxFirstMissingDate(replicaId), true);
+ }
+
+ // Add the informations about the Replication Servers
+ // currently in the topology.
+ it = md.rsIterator();
+ while (it.hasNext())
+ {
+ int replicaId = it.next();
+ returnMsg.setServerState(
+ replicaId, md.getRSStates(replicaId),
+ md.getRSApproxFirstMissingDate(replicaId), false);
+ }
+ }
+ catch (DirectoryException e)
+ {
+ // If we can't compute the Monitor Information, send
+ // back an empty message.
+ }
+ return returnMsg;
+ }
+
+ /**
+ * Creates a new monitor message including monitoring information for the
+ * topology directly connected to this RS. This includes information for:
+ * - local RS
+ * - all direct DSs
+ * - all direct RSs
+ * @param sender The sender of this message.
+ * @param destination The destination of this message.
+ * @return The newly created and filled MonitorMsg. Null if a problem occurred
+ * during message creation.
+ */
+ public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
+ {
+ MonitorMsg monitorMsg = null;
+
+ try {
+
+ // Lock domain as we need to go through connected servers list
+ lock();
+
+ monitorMsg = new MonitorMsg(sender, destination);
+
+
+ // Populate for each connected LDAP Server
+ // from the states stored in the serverHandler.
+ // - the server state
+ // - the older missing change
+ for (DataServerHandler lsh : this.directoryServers.values())
+ {
+ monitorMsg.setServerState(
+ lsh.getServerId(),
+ lsh.getServerState(),
+ lsh.getApproxFirstMissingDate(),
+ true);
+ }
+
+ // Same for the connected RS
+ for (ReplicationServerHandler rsh : this.replicationServers.values())
+ {
+ monitorMsg.setServerState(
+ rsh.getServerId(),
+ rsh.getServerState(),
+ rsh.getApproxFirstMissingDate(),
+ false);
+ }
+
+ // Populate the RS state in the msg from the DbState
+ monitorMsg.setReplServerDbState(this.getDbServerState());
+ } catch(InterruptedException e)
+ {
+ // At lock, too bad...
+ } finally
+ {
+ if (hasLock())
+ release();
+ }
+
+ return monitorMsg;
+ }
+
+ /**
* Shutdown this ReplicationServerDomain.
*/
public void shutdown()
@@ -1831,8 +1905,7 @@
/**
* Send a TopologyMsg to all the connected directory servers in order to
- * let.
- * them know the topology (every known DSs and RSs)
+ * let them know the topology (every known DSs and RSs).
* @param notThisOne If not null, the topology message will not be sent to
* this passed server.
*/
@@ -1931,10 +2004,11 @@
dsInfos.add(serverHandler.toDSInfo());
}
- // Create info for us (local RS)
+ // Create info for the local RS
List<RSInfo> rsInfos = new ArrayList<RSInfo>();
RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
- generationId, replicationServer.getGroupId());
+ generationId, replicationServer.getGroupId(),
+ replicationServer.getWeight());
rsInfos.add(localRSInfo);
return new TopologyMsg(dsInfos, rsInfos);
@@ -1965,7 +2039,8 @@
// Add our own info (local RS)
RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
- generationId, replicationServer.getGroupId());
+ generationId, replicationServer.getGroupId(),
+ replicationServer.getWeight());
rsInfos.add(localRSInfo);
// Go through every peer RSs (and get their connected DSs), also add info
@@ -2471,13 +2546,15 @@
* Start collecting global monitoring information for this
* ReplicationServerDomain.
*
- * @return The number of response that should come back.
+ * @param expectedMonitoringMsg The list of server handler we have to wait a
+ * monitoring message from. Will be filled as necessary by this method.
*
* @throws DirectoryException In case the monitoring information could
* not be collected.
*/
- int initializeMonitorData() throws DirectoryException
+ void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg)
+ throws DirectoryException
{
synchronized (monitorDataLock)
{
@@ -2539,7 +2616,7 @@
}
// Send the request for remote monitor data to the
- return sendMonitorDataRequest();
+ sendMonitorDataRequest(expectedMonitoringMsg);
}
/**
@@ -2566,22 +2643,25 @@
/**
* Sends a MonitorRequest message to all connected RS.
- * @return the number of requests sent.
+ * @param expectedMonitoringMsg The list of server handler we have to wait a
+ * monitoring message from. Will be filled as necessary by this method.
* @throws DirectoryException when a problem occurs.
*/
- protected int sendMonitorDataRequest()
+ protected void sendMonitorDataRequest(
+ List<GlobalServerId> expectedMonitoringMsg)
throws DirectoryException
{
- int sent = 0;
try
{
for (ServerHandler rs : replicationServers.values())
{
+ int serverId = rs.getServerId();
MonitorRequestMsg msg =
new MonitorRequestMsg(this.replicationServer.getServerId(),
- rs.getServerId());
+ serverId);
rs.send(msg);
- sent++;
+ // Store the fact that we expect a MonitoringMsg back from this server
+ expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId));
}
} catch (Exception e)
{
@@ -2590,7 +2670,6 @@
throw new DirectoryException(ResultCode.OTHER,
message, e);
}
- return sent;
}
/**
@@ -2598,8 +2677,10 @@
* and stores the data received.
*
* @param msg The message to be processed.
+ * @param globalServerHandlerId server handler that is receiving the message.
*/
- public void receivesMonitorDataResponse(MonitorMsg msg)
+ private void receivesMonitorDataResponse(MonitorMsg msg,
+ GlobalServerId globalServerId)
{
try
{
@@ -2677,7 +2758,7 @@
// Decreases the number of expected responses and potentially
// wakes up the waiting requestor thread.
- replicationServer.responseReceived();
+ replicationServer.responseReceived(globalServerId);
} catch (Exception e)
{
@@ -2832,6 +2913,57 @@
}
/**
+ * Starts the monitoring publisher for the domain.
+ */
+ public void startMonitoringPublisher()
+ {
+ if (monitoringPublisher == null)
+ {
+ long period =
+ replicationServer.getMonitoringPublisherPeriod();
+ if (period > 0) // 0 means no monitoring publisher
+ {
+ monitoringPublisher = new MonitoringPublisher(this, period);
+ monitoringPublisher.start();
+ }
+ }
+ }
+
+ /**
+ * Stops the monitoring publisher for the domain.
+ */
+ public void stopMonitoringPublisher()
+ {
+ if (monitoringPublisher != null)
+ {
+ monitoringPublisher.shutdown();
+ monitoringPublisher.waitForShutdown();
+ monitoringPublisher = null;
+ }
+ }
+
+ /**
+ * Tests if the monitoring publisher for this domain is running.
+ * @return True if the monitoring publisher is running, false otherwise.
+ */
+ public boolean isRunningMonitoringPublisher()
+ {
+ return (monitoringPublisher != null);
+ }
+
+ /**
+ * Update the monitoring publisher with the new period value.
+ * @param period The new period value.
+ */
+ public void updateMonitoringPublisher(long period)
+ {
+ if (monitoringPublisher != null)
+ {
+ monitoringPublisher.setPeriod(period);
+ }
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 91015f3..42b255c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -240,6 +240,9 @@
logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg);
+ // Create the monitoring publisher for the domain if not already started
+ createMonitoringPublisher();
+
// FIXME: i think this should be done for all protocol version !!
// not only those > V1
registerIntoDomain();
@@ -408,6 +411,10 @@
// other servers.
}
+
+ // Create the monitoring publisher for the domain if not already started
+ createMonitoringPublisher();
+
registerIntoDomain();
// Process TopologyMsg sent by remote RS: store matching new info
@@ -497,7 +504,18 @@
// Remote RS sent his topo msg
TopologyMsg inTopoMsg = (TopologyMsg) msg;
- // CONNECTION WITH A RS
+ // Store remore RS weight if it has one
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // List should only contain RS info for sender
+ RSInfo rsInfo = inTopoMsg.getRsList().get(0);
+ weight = rsInfo.getWeight();
+ }
+ else
+ {
+ // Remote RS uses protocol version prior to 4 : use default value for
+ // weight: 1
+ }
// if the remote RS and the local RS have the same genID
// then it's ok and nothing else to do
@@ -646,6 +664,7 @@
RSInfo rsInfo = rsInfos.get(0);
generationId = rsInfo.getGenerationId();
groupId = rsInfo.getGroupId();
+ weight = rsInfo.getWeight();
/**
* Store info for DSs connected to the peer RS
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 22adad3..a0ebd0c 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -244,6 +244,10 @@
*/
private AtomicBoolean shuttingDown = new AtomicBoolean(false);
+ /**
+ * Weight of this remote server.
+ */
+ protected int weight = 1;
/**
* Creates a new server handler instance with the provided socket.
@@ -1215,12 +1219,23 @@
*/
public RSInfo toRSInfo()
{
- RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
+ RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, weight);
return rsInfo;
}
/**
+ * Starts the monitoring publisher for the domain if not already started.
+ */
+ protected void createMonitoringPublisher()
+ {
+ if (!replicationServerDomain.isRunningMonitoringPublisher())
+ {
+ replicationServerDomain.startMonitoringPublisher();
+ }
+ }
+
+ /**
* Performs any processing periodic processing that may be desired to update
* the information associated with this monitor. Note that best-effort
* attempts will be made to ensure that calls to this method come
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 983cffa..86e858b 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -55,11 +55,14 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.HeartbeatMonitor;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
@@ -116,6 +119,28 @@
private ReplicationDomain domain = null;
/**
+ * This object is used as a conditional event to be notified about
+ * the reception of monitor information from the Replication Server.
+ */
+ private final MutableBoolean monitorResponse = new MutableBoolean(false);
+
+ /**
+ * A Map containing the ServerStates of all the replicas in the topology
+ * as seen by the ReplicationServer the last time it was polled or the last
+ * time it published monitoring information.
+ */
+ private HashMap<Integer, ServerState> replicaStates =
+ new HashMap<Integer, ServerState>();
+
+ /**
+ * A Map containing the ServerStates of all the replication servers in the
+ * topology as seen by the ReplicationServer the last time it was polled or
+ * the last time it published monitoring information.
+ */
+ private HashMap<Integer, ServerState> rsStates =
+ new HashMap<Integer, ServerState>();
+
+ /**
* The expected duration in milliseconds between heartbeats received
* from the replication server. Zero means heartbeats are off.
*/
@@ -1918,6 +1943,37 @@
// Try to find a suitable RS
this.reStart(failingSession);
}
+ else if (msg instanceof MonitorMsg)
+ {
+ // This is the response to a MonitorRequest that was sent earlier or
+ // the regular message of the monitoring publisher of the RS.
+
+ // Extract and store replicas ServerStates
+ replicaStates = new HashMap<Integer, ServerState>();
+ MonitorMsg monitorMsg = (MonitorMsg) msg;
+ Iterator<Integer> it = monitorMsg.ldapIterator();
+ while (it.hasNext())
+ {
+ int srvId = it.next();
+ replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
+ }
+
+ // Notify the sender that the response was received.
+ synchronized (monitorResponse)
+ {
+ monitorResponse.set(true);
+ monitorResponse.notify();
+ }
+
+ // Extract and store replication servers ServerStates
+ rsStates = new HashMap<Integer, ServerState>();
+ it = monitorMsg.rsIterator();
+ while (it.hasNext())
+ {
+ int srvId = it.next();
+ rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
+ }
+ }
else
{
return msg;
@@ -1949,6 +2005,40 @@
}
/**
+ * Gets the States of all the Replicas currently in the
+ * Topology.
+ * When this method is called, a Monitoring message will be sent
+ * to the Replication Server to which this domain is currently connected
+ * so that it computes a table containing information about
+ * all Directory Servers in the topology.
+ * This Computation involves communications will all the servers
+ * currently connected and
+ *
+ * @return The States of all Replicas in the topology (except us)
+ */
+ public Map<Integer, ServerState> getReplicaStates()
+ {
+ monitorResponse.set(false);
+
+ // publish Monitor Request Message to the Replication Server
+ publish(new MonitorRequestMsg(serverId, getRsServerId()));
+
+ // wait for Response up to 10 seconds.
+ try
+ {
+ synchronized (monitorResponse)
+ {
+ if (monitorResponse.get() == false)
+ {
+ monitorResponse.wait(10000);
+ }
+ }
+ } catch (InterruptedException e)
+ {}
+ return replicaStates;
+ }
+
+ /**
* This method allows to do the necessary computing for the window
* management after treatment by the worker threads.
*
@@ -2440,7 +2530,7 @@
{
ctHeartbeatPublisherThread =
new CTHeartbeatPublisherThread(
- "Replication CN Heartbeat Thread started for " +
+ "Replication CN Heartbeat sender for " +
baseDn + " with " + getReplicationServer(),
session, changeTimeHeartbeatSendInterval, serverId);
ctHeartbeatPublisherThread.start();
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 992fe7c..1f8a7e1 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -65,7 +65,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -79,7 +78,6 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
@@ -92,8 +90,6 @@
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.MonitorMsg;
-import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -306,20 +302,6 @@
*/
private final ChangeNumberGenerator generator;
- /**
- * This object is used as a conditional event to be notified about
- * the reception of monitor information from the Replication Server.
- */
- private final MutableBoolean monitorResponse = new MutableBoolean(false);
-
-
- /**
- * A Map containing of the ServerStates of all the replicas in the topology
- * as seen by the ReplicationServer the last time it was polled.
- */
- private HashMap<Integer, ServerState> replicaStates =
- new HashMap<Integer, ServerState>();
-
Set<String> cfgEclIncludes = new HashSet<String>();
Set<String> eClIncludes = new HashSet<String>();
@@ -586,24 +568,7 @@
*/
public Map<Integer, ServerState> getReplicaStates()
{
- monitorResponse.set(false);
-
- // publish Monitor Request Message to the Replication Server
- broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
-
- // wait for Response up to 10 seconds.
- try
- {
- synchronized (monitorResponse)
- {
- if (monitorResponse.get() == false)
- {
- monitorResponse.wait(10000);
- }
- }
- } catch (InterruptedException e)
- {}
- return replicaStates;
+ return broker.getReplicaStates();
}
/**
@@ -834,26 +799,6 @@
update = (UpdateMsg) msg;
generator.adjust(update.getChangeNumber());
}
- else if (msg instanceof MonitorMsg)
- {
- // This is the response to a MonitorRequest that was sent earlier
- // build the replicaStates Map.
- replicaStates = new HashMap<Integer, ServerState>();
- MonitorMsg monitorMsg = (MonitorMsg) msg;
- Iterator<Integer> it = monitorMsg.ldapIterator();
- while (it.hasNext())
- {
- int serverId = it.next();
- replicaStates.put(
- serverId, monitorMsg.getLDAPServerState(serverId));
- }
- // Notify the sender that the response was received.
- synchronized (monitorResponse)
- {
- monitorResponse.set(true);
- monitorResponse.notify();
- }
- }
}
catch (SocketTimeoutException e)
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
index d43eb5d..bdcbebc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -27,7 +27,6 @@
package org.opends.server.replication;
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
-import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
@@ -37,7 +36,6 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import static org.opends.server.loggers.ErrorLogger.logError;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
@@ -56,13 +54,9 @@
import java.util.SortedSet;
import java.util.TreeSet;
-import org.opends.messages.Category;
-import org.opends.messages.Message;
-import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.Backend;
import org.opends.server.api.ConnectionHandler;
-import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.MemoryBackend;
import org.opends.server.config.ConfigException;
import org.opends.server.controls.ExternalChangelogRequestControl;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
index 964e464..7dd2c41 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -784,6 +784,8 @@
catch(SocketTimeoutException e)
{
// This is the expected result
+ // Note that timeout should be lower than RS montoring publisher period
+ // so that timeout occurs
}
//===========================================================
@@ -889,49 +891,21 @@
// Broker 2 and 3 should receive 1 change status message to order them
// to enter the bad gen id status
- try
+ ChangeStatusMsg csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker2,
+ ChangeStatusMsg.class.getName());
+ if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
{
- ReplicationMsg msg = broker2.receive();
- if (!(msg instanceof ChangeStatusMsg))
- {
- fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
- " to enter the bad gen id status"
- + msg);
- }
- ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
- if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
- {
- fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
- " to enter the bad gen id status"
- + msg);
- }
+ fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
+ " to enter the bad gen id status"
+ + csMsg);
}
- catch(SocketTimeoutException se)
+ csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker3,
+ ChangeStatusMsg.class.getName());
+ if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
{
- fail("DS2 is expected to receive 1 ChangeStatusMsg to enter the " +
- "bad gen id status.");
- }
- try
- {
- ReplicationMsg msg = broker3.receive();
- if (!(msg instanceof ChangeStatusMsg))
- {
- fail("Broker 3 connection is expected to receive 1 ChangeStatusMsg" +
- " to enter the bad gen id status"
- + msg);
- }
- ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
- if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
- {
- fail("Broker 3 connection is expected to receive 1 ChangeStatusMsg" +
- " to enter the bad gen id status"
- + msg);
- }
- }
- catch(SocketTimeoutException se)
- {
- fail("DS3 is expected to receive 1 ChangeStatusMsg to enter the " +
- "bad gen id status.");
+ fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
+ " to enter the bad gen id status"
+ + csMsg);
}
debugInfo("DS1 root entry must contain the new gen ID");
@@ -988,7 +962,8 @@
debugInfo("DS2 is publishing a change and RS1 must ignore this change, DS3 must not receive it.");
- broker2.publish(createAddMsg());
+ AddMsg emsg = (AddMsg)createAddMsg();
+ broker2.publish(emsg);
// Updates count in RS1 must stay unchanged = to 1
Thread.sleep(500);
@@ -1060,8 +1035,30 @@
isDegradedDueToGenerationId(server3ID),
"Expecting that DS3 is not in bad gen id from RS1");
+ debugInfo("Verify that DS2 receives the add message stored in RS1 DB");
+ try
+ {
+ ReplicationMsg msg = broker2.receive();
+ assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg);
+ }
+ catch(SocketTimeoutException e)
+ {
+ fail("The msg stored in RS1 DB is expected to be received by DS2)");
+ }
+
+ debugInfo("Verify that DS3 receives the add message stored in RS1 DB");
+ try
+ {
+ ReplicationMsg msg = broker3.receive();
+ assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg);
+ }
+ catch(SocketTimeoutException e)
+ {
+ fail("The msg stored in RS1 DB is expected to be received by DS3)");
+ }
+
debugInfo("DS2 is publishing a change and RS1 must store this change, DS3 must receive it.");
- AddMsg emsg = (AddMsg)createAddMsg();
+ emsg = (AddMsg)createAddMsg();
broker2.publish(emsg);
Thread.sleep(500);
@@ -1105,7 +1102,7 @@
* The following test focus on:
* - genId checking across multiple starting RS (replication servers)
* - genId setting propagation from one RS to the others
- * - genId reset propagation from one RS to the others
+ * - genId reset propagation from one RS to the others
*/
@Test(enabled=false)
public void testMultiRS() throws Exception
@@ -1190,7 +1187,7 @@
assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), genId);
assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), genId);
- debugInfo("Connecting broker2 to replServer1 with a bad genId");
+ debugInfo("Connecting broker3 to replServer1 with a bad genId");
try
{
long badgenId = 1;
@@ -1215,7 +1212,7 @@
debugInfo("Connecting DS to replServer1.");
connectServer1ToChangelog(changelog1ID);
- Thread.sleep(1000);
+ Thread.sleep(3000);
debugInfo("Adding reset task to DS.");
@@ -1373,7 +1370,7 @@
/**
* Loop opening sessions to the Replication Server
- * to check that it handle correctly deconnection and reconnection.
+ * to check that it handle correctly disconnection and reconnection.
*/
@Test(enabled=false, groups="slow")
public void testLoop() throws Exception
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 9e1aae1..161679c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -47,7 +47,6 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
@@ -65,9 +64,10 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.PersistentServerState;
-import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.schema.DirectoryStringSyntax;
@@ -260,34 +260,6 @@
broker.setSoTimeout(timeout);
checkConnection(30, broker, port); // give some time to the broker to connect
// to the replicationServer.
- if (emptyOldChanges)
- {
- /*
- * loop receiving update until there is nothing left
- * to make sure that message from previous tests have been consumed.
- */
- try
- {
- while (true)
- {
- ReplicationMsg rMsg = broker.receive();
- if (rMsg instanceof ErrorMsg)
- {
- ErrorMsg eMsg = (ErrorMsg)rMsg;
- logError(new MessageBuilder(
- "ReplicationTestCase/openReplicationSession ").append(
- " received ErrorMessage when emptying old changes ").append(
- eMsg.getDetails()).toMessage());
- }
- }
- }
- catch (Exception e)
- {
- logError(new MessageBuilder(
- "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
- .append(" when emptying old changes").toMessage());
- }
- }
return broker;
}
@@ -313,32 +285,6 @@
broker.setSoTimeout(timeout);
checkConnection(30, broker, port); // give some time to the broker to connect
// to the replicationServer.
- if (emptyOldChanges)
- {
- // loop receiving update until there is nothing left
- // to make sure that message from previous tests have been consumed.
- try
- {
- while (true)
- {
- ReplicationMsg rMsg = broker.receive();
- if (rMsg instanceof ErrorMsg)
- {
- ErrorMsg eMsg = (ErrorMsg)rMsg;
- logError(new MessageBuilder(
- "ReplicationTestCase/openReplicationSession ").append(
- " received ErrorMessage when emptying old changes ").append(
- eMsg.getDetails()).toMessage());
- }
- }
- }
- catch (Exception e)
- {
- logError(new MessageBuilder(
- "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
- .append(" when emptying old changes").toMessage());
- }
- }
return broker;
}
*/
@@ -435,17 +381,6 @@
boolean emptyOldChanges)
throws Exception, SocketException
{
- return openReplicationSession(baseDn, serverId, window_size,
- port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges,
- getGenerationId(baseDn));
- }
-
- protected ReplicationBroker openReplicationSession(
- final DN baseDn, int serverId, int window_size,
- int port, int timeout, int maxSendQueue, int maxRcvQueue,
- boolean emptyOldChanges, long generationId)
- throws Exception, SocketException
- {
ServerState state = new ServerState();
if (emptyOldChanges)
@@ -453,37 +388,13 @@
ReplicationBroker broker = new ReplicationBroker(null,
state, baseDn.toNormalizedString(), serverId, window_size,
- generationId, 0, getReplSessionSecurity(), (byte)1, 500);
+ getGenerationId(baseDn), 0, getReplSessionSecurity(), (byte)1, 500);
ArrayList<String> servers = new ArrayList<String>(1);
servers.add("localhost:" + port);
broker.start(servers);
checkConnection(30, broker, port);
if (timeout != 0)
broker.setSoTimeout(timeout);
- if (emptyOldChanges)
- {
- /*
- * loop receiving update until there is nothing left
- * to make sure that message from previous tests have been consumed.
- */
- try
- {
- while (true)
- {
- ReplicationMsg rMsg = broker.receive();
- if (rMsg instanceof ErrorMsg)
- {
- ErrorMsg eMsg = (ErrorMsg)rMsg;
- logError(new MessageBuilder(
- "ReplicationTestCase/openReplicationSession ").append(
- " received ErrorMessage when emptying old changes ").append(
- eMsg.getDetails()).toMessage());
- }
- }
- }
- catch (Exception e)
- { }
- }
return broker;
}
@@ -575,11 +486,14 @@
logError(Message.raw(Category.SYNC, Severity.NOTICE,
" ##### Calling ReplicationTestCase.classCleanUp ##### "));
+ // Clean RS databases
+ cleanUpReplicationServersDB();
+
cleanConfigEntries();
- configEntryList = null;
+ configEntryList = new LinkedList<DN>();
cleanRealEntries();
- entryList = null;
+ entryList = new LinkedList<DN>();
// Clear the test backend (TestCaseUtils.TEST_ROOT_DN_STRING)
// (in case our test created some emtries in it)
@@ -631,6 +545,10 @@
assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-server)",
"Found unexpected replication server config left");
+ // Be sure that no replication server instance is left
+ List<ReplicationServer> allRSInstances = ReplicationServer.getAllInstances();
+ assertTrue(allRSInstances.size() == 0, "Some replication servers left: " + allRSInstances);
+
// Check for config entries for replication domain
assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-domain)",
"Found unexpected replication domain config left");
@@ -648,6 +566,17 @@
}
/**
+ * Cleanup databases of the currently instantiated replication servers in the
+ * VM
+ */
+ protected void cleanUpReplicationServersDB() {
+
+ for (ReplicationServer rs : ReplicationServer.getAllInstances()) {
+ rs.clearDb();
+ }
+ }
+
+ /**
* Performs a search on the config backend with the specified filter.
* Fails if a config entry is found.
* @param filter The filter to apply for the search
@@ -1266,4 +1195,90 @@
// done
}
}
+
+ /**
+ * Wait for the arrival of a specific message type on the provided session
+ * before going in timeout and failing.
+ * @param session Session from which we should receive the message.
+ * @param msgType Class of the message we are waiting for.
+ * @return The expected message if it comes in time or fails (assertion).
+ */
+ protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) {
+
+ ReplicationMsg replMsg = null;
+
+ int timeOut = 5000; // 5 seconds max to wait for the desired message
+ long startTime = System.currentTimeMillis();
+ long curTime = startTime;
+ int nMsg = 0;
+ while ((curTime - startTime) <= timeOut)
+ {
+ try
+ {
+ replMsg = session.receive();
+ } catch (Exception ex)
+ {
+ fail("Exception waiting for " + msgType + " message : " +
+ ex.getClass().getName() + " : " + ex.getMessage());
+ }
+ // Get message type
+ String rcvMsgType = replMsg.getClass().getName();
+ if (rcvMsgType.equals(msgType))
+ {
+ // Ok, got it, let's return the expected message
+ return replMsg;
+ }
+ TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
+ nMsg++;
+ curTime = System.currentTimeMillis();
+ }
+ // Timeout
+ fail("Failed to receive an expected " + msgType +
+ " message after 5 seconds : also received " + nMsg +
+ " other messages during wait time.");
+ return null;
+ }
+
+ /**
+ * Wait for the arrival of a specific message type on the provided broker
+ * before going in timeout and failing.
+ * @param broker Broker from which we should receive the message.
+ * @param msgType Class of the message we are waiting for.
+ * @return The expected message if it comes in time or fails (assertion).
+ */
+ protected static ReplicationMsg waitForSpecificMsg(ReplicationBroker broker, String msgType) {
+
+ ReplicationMsg replMsg = null;
+
+ int timeOut = 5000; // 5 seconds max to wait for the desired message
+ long startTime = System.currentTimeMillis();
+ long curTime = startTime;
+ int nMsg = 0;
+ while ((curTime - startTime) <= timeOut)
+ {
+ try
+ {
+ replMsg = broker.receive();
+ } catch (Exception ex)
+ {
+ fail("Exception waiting for " + msgType + " message : " +
+ ex.getClass().getName() + " : " + ex.getMessage());
+ }
+ // Get message type
+ String rcvMsgType = replMsg.getClass().getName();
+ if (rcvMsgType.equals(msgType))
+ {
+ // Ok, got it, let's return the expected message
+ return replMsg;
+ }
+ TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
+ nMsg++;
+ curTime = System.currentTimeMillis();
+ }
+ // Timeout
+ fail("Failed to receive an expected " + msgType +
+ " message after 5 seconds : also received " + nMsg +
+ " other messages during wait time.");
+ return null;
+ }
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
index 4a4cf4f..51ce7ad 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -128,6 +128,8 @@
logError(Message.raw(Category.SYNC, Severity.NOTICE,
"Starting replication test : pushSchemaChange "));
+ cleanUpReplicationServersDB();
+
final DN baseDn = DN.decode("cn=schema");
ReplicationBroker broker =
@@ -216,6 +218,8 @@
logError(Message.raw(Category.SYNC, Severity.NOTICE,
"Starting replication test : replaySchemaChange "));
+ cleanUpReplicationServersDB();
+
final DN baseDn = DN.decode("cn=schema");
ReplicationBroker broker =
@@ -253,6 +257,8 @@
logError(Message.raw(Category.SYNC, Severity.NOTICE,
"Starting replication test : pushSchemaFilesChange "));
+ cleanUpReplicationServersDB();
+
final DN baseDn = DN.decode("cn=schema");
ReplicationBroker broker =
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
index 884fa51..e98164b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -67,6 +67,7 @@
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.types.*;
+import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -300,6 +301,9 @@
logError(Message.raw(Category.SYNC, Severity.INFORMATION,
"Starting synchronization test : toggleReceiveStatus"));
+ // Clean replication server database from previous run
+ cleanUpReplicationServersDB();
+
final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
/*
@@ -379,6 +383,9 @@
logError(Message.raw(Category.SYNC, Severity.INFORMATION,
"Starting replication test : lostHeartbeatFailover"));
+ // Clean replication server database from previous run
+ cleanUpReplicationServersDB();
+
final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
/*
@@ -483,6 +490,9 @@
DirectoryServer.getAttributeType("entryuuid");
String monitorAttr = "resolved-modify-conflicts";
+ // Clean replication server database from previous run
+ cleanUpReplicationServersDB();
+
/*
* Open a session to the replicationServer using the broker API.
* This must use a different serverId to that of the directory server.
@@ -610,6 +620,9 @@
String resolvedMonitorAttr = "resolved-naming-conflicts";
String unresolvedMonitorAttr = "unresolved-naming-conflicts";
+ // Clean replication server database from previous run
+ cleanUpReplicationServersDB();
+
/*
* Open a session to the replicationServer using the ReplicationServer broker API.
* This must use a serverId different from the LDAP server ID
@@ -1302,6 +1315,18 @@
return new Object[][] { { false }, {true} };
}
+ private void cleanupTest() {
+ try
+ {
+ classCleanUp();
+ setUp();
+ } catch (Exception e)
+ {
+ fail("Test cleanup failed: " + e.getClass().getName() + " : " +
+ e.getMessage() + " : " + StaticUtils.stackTraceToSingleLineString(e));
+ }
+ }
+
/**
* Tests done using directly the ReplicationBroker interface.
*/
@@ -1312,6 +1337,9 @@
Category.SYNC, Severity.INFORMATION,
"Starting replication test : updateOperations " + assured));
+ // Cleanup from previous run
+ cleanupTest();
+
final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
ReplicationBroker broker =
@@ -1341,15 +1369,15 @@
// Check if the client has received the msg
ReplicationMsg msg = broker.receive();
assertTrue(msg instanceof AddMsg,
- "The received replication message is not an ADD msg");
+ "The received replication message is not an ADD msg : " + msg);
AddMsg addMsg = (AddMsg) msg;
Operation receivedOp = addMsg.createOperation(connection);
assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
- "The received replication message is not an ADD msg");
+ "The received replication message is not an ADD msg : " + addMsg);
assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
- "The received ADD replication message is not for the excepted DN");
+ "The received ADD replication message is not for the excepted DN : " + addMsg);
}
// Modify the entry
@@ -1364,12 +1392,12 @@
// See if the client has received the msg
ReplicationMsg msg = broker.receive();
assertTrue(msg instanceof ModifyMsg,
- "The received replication message is not a MODIFY msg");
+ "The received replication message is not a MODIFY msg : " + msg);
ModifyMsg modMsg = (ModifyMsg) msg;
modMsg.createOperation(connection);
assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
- "The received MODIFY replication message is not for the excepted DN");
+ "The received MODIFY replication message is not for the excepted DN : " + modMsg);
// Modify the entry DN
DN newDN = DN.decode("uid= new person,ou=People," + TEST_ROOT_DN_STRING) ;
@@ -1387,12 +1415,12 @@
// See if the client has received the msg
msg = broker.receive();
assertTrue(msg instanceof ModifyDNMsg,
- "The received replication message is not a MODIFY DN msg");
+ "The received replication message is not a MODIFY DN msg : " + msg);
ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
moddnMsg.createOperation(connection);
assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
- "The received MODIFY_DN message is not for the excepted DN");
+ "The received MODIFY_DN message is not for the excepted DN : " + moddnMsg);
// Delete the entry
DeleteOperationBasis delOp = new DeleteOperationBasis(connection,
@@ -1406,12 +1434,12 @@
// See if the client has received the msg
msg = broker.receive();
assertTrue(msg instanceof DeleteMsg,
- "The received replication message is not a MODIFY DN msg");
+ "The received replication message is not a MODIFY DN msg : " + msg);
DeleteMsg delMsg = (DeleteMsg) msg;
delMsg.createOperation(connection);
assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
.decode("uid= new person,ou=People," + TEST_ROOT_DN_STRING)) == 0,
- "The received DELETE message is not for the excepted DN");
+ "The received DELETE message is not for the excepted DN : " + delMsg);
/*
* Now check that when we send message to the ReplicationServer
@@ -1512,6 +1540,9 @@
logError(Message.raw(Category.SYNC, Severity.INFORMATION,
"Starting replication test : deleteNoSuchObject"));
+ // Clean replication server database from previous run
+ cleanUpReplicationServersDB();
+
DN dn = DN.decode("cn=No Such Object,ou=People," + TEST_ROOT_DN_STRING);
DeleteOperationBasis op =
new DeleteOperationBasis(connection,
@@ -1535,6 +1566,9 @@
final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
+ // Clean replication server database from previous run
+ cleanUpReplicationServersDB();
+
Thread.sleep(2000);
ReplicationBroker broker =
openReplicationSession(baseDn, 11, 100, replServerPort, 1000, true);
@@ -1675,6 +1709,9 @@
final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
+ // Clean replication server database from previous run
+ cleanUpReplicationServersDB();
+
/*
* Open a session to the replicationServer using the broker API.
* This must use a different serverId to that of the directory server.
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 7529f60..7ad45ce 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -501,7 +501,7 @@
// Send topo view
List<RSInfo> rsList = new ArrayList<RSInfo>();
- RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
+ RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
rsList.add(rsInfo);
TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(),
rsList);
@@ -719,7 +719,7 @@
}
/**
- * Read the coming seaf read mode updates and send back acks with errors
+ * Read the coming safe read mode updates and send back acks with errors
*/
private void executeSafeReadManyErrorsScenario()
{
@@ -1058,7 +1058,7 @@
}
/**
- * Tests parameters sent in session handshake an updates, when not using
+ * Tests parameters sent in session handshake and updates, when not using
* assured replication
*/
@Test
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index 2744026..da567bc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -836,7 +836,7 @@
fail("Unknown replication server id.");
}
- return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId);
+ return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId, 1);
}
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
index a293fbe..b937ea1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -1092,11 +1092,11 @@
dsList4.add(dsInfo2);
dsList4.add(dsInfo1);
- RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
+ RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
- RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
+ RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
- RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
+ RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
List<RSInfo> rsList1 = new ArrayList<RSInfo>();
rsList1.add(rsInfo1);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 10f188b..dc67c82 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -1026,13 +1026,13 @@
dsList4.add(dsInfo2);
dsList4.add(dsInfo1);
- RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
+ RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
- RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
+ RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
- RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
+ RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
- RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98);
+ RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98, 1);
List<RSInfo> rsList1 = new ArrayList<RSInfo>();
rsList1.add(rsInfo1);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index cafdede..f95550a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -29,7 +29,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -595,6 +594,9 @@
ReplServerFakeConfiguration conf =
new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
replServers, groupId, assuredTimeout, 5000);
+ // No monitoring publisher to not interfer with some SocketTimeoutException
+ // expected at some points in these tests
+ conf.setMonitoringPeriod(0L);
ReplicationServer replicationServer = new ReplicationServer(conf);
return replicationServer;
@@ -908,7 +910,7 @@
ReplicationMsg replMsg = session.receive();
if (replMsg instanceof ErrorMsg)
{
- // Support for connection done with bad gen id : we receive an error
+ // Support for connection done with bad gen id : we receive an error
// message that we must throw away before reading our ack.
replMsg = session.receive();
}
@@ -967,7 +969,7 @@
}
// Send our topo mesg
- RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
+ RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
List<RSInfo> rsInfos = new ArrayList<RSInfo>();
rsInfos.add(rsInfo);
TopologyMsg topoMsg = new TopologyMsg(null, rsInfos);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
index d277405..50c6031 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -63,8 +63,11 @@
// The weight of the server
private int weight = 1;
+ // The monitoring publisher period
+ private long monitoringPeriod = 3000;
+
/**
- * Constructor without goup id, assured info and weight
+ * Constructor without group id, assured info and weight
*/
public ReplServerFakeConfiguration(
int port, String dirName, int purgeDelay, int serverId,
@@ -254,4 +257,17 @@
return weight;
}
+ public long getMonitoringPeriod()
+ {
+ return monitoringPeriod;
+ }
+
+ /**
+ * @param monitoringPeriod the monitoringPeriod to set
+ */
+ public void setMonitoringPeriod(long monitoringPeriod)
+ {
+ this.monitoringPeriod = monitoringPeriod;
+ }
+
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index a24799f..56801e1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -74,7 +74,6 @@
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
-import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
@@ -1003,7 +1002,7 @@
ProtocolVersion.getCurrentVersion(), 0, sslEncryption, (byte)-1);
session.publish(msg);
- // Read the Replication Server state from the ReplServerStartMsg that
+ // Read the Replication Server state from the ReplServerStartDSMsg that
// comes back.
ReplServerStartDSMsg replStartDSMsg =
(ReplServerStartDSMsg) session.receive();
@@ -1079,7 +1078,8 @@
// check that this did not change the window by sending a probe again.
session.publish(new WindowProbeMsg());
- windowMsg = (WindowMsg) session.receive();
+ // We may receive some MonitoringMsg so use filter method
+ windowMsg = (WindowMsg)waitForSpecificMsg(session, WindowMsg.class.getName());
assertEquals(serverwindow, windowMsg.getNumAck());
debugInfo("Ending windowProbeTest");
}
--
Gitblit v1.10.0