From c02dd7f87e9ba574f06e5cc1eb36ebeb76b9f446 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 08 Oct 2009 16:02:17 +0000
Subject: [PATCH] - Addition of ReplServerStartDSMsg now sent to a DS connecting to a RS in handshake phase instead of a ReplServerStartMsg. ReplServerStartDSMsg contains same thing as ReplServerStartMsg but also contains
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java | 7
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 68 +
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java | 127 +++
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java | 70 ++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java | 8
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java | 8
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 108 +-
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java | 12
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 30
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java | 29
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java | 8
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 39 +
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java | 27
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 31
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java | 18
opendj-sdk/opends/src/messages/messages/replication.properties | 23
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java | 8
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java | 408 ++++++++++++
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java | 34
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java | 23
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java | 9
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 467 ++++++++++---
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 120 ++
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java | 2
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 13
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java | 6
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java | 21
opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml | 35
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 79 ++
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 2
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java | 141 +++
opendj-sdk/opends/resource/schema/02-config.ldif | 8
36 files changed, 1,655 insertions(+), 347 deletions(-)
diff --git a/opendj-sdk/opends/resource/schema/02-config.ldif b/opendj-sdk/opends/resource/schema/02-config.ldif
index 8f0c851..f1f179d 100644
--- a/opendj-sdk/opends/resource/schema/02-config.ldif
+++ b/opendj-sdk/opends/resource/schema/02-config.ldif
@@ -2453,6 +2453,11 @@
NAME 'ds-cfg-ecl-include'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.603
+ NAME 'ds-cfg-weight'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+ SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
NAME 'ds-cfg-access-control-handler'
SUP top
@@ -3130,7 +3135,8 @@
ds-cfg-replication-purge-delay $
ds-cfg-group-id $
ds-cfg-assured-timeout $
- ds-cfg-degraded-status-threshold)
+ ds-cfg-degraded-status-threshold $
+ ds-cfg-weight)
X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.65
NAME 'ds-backup-directory'
diff --git a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
index 812bad9..6c91def 100644
--- a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
+++ b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -216,11 +216,11 @@
</adm:property>
<adm:property name="assured-timeout" mandatory="false">
<adm:synopsis>
- The timeout value when waiting for assured mode acknowledgements.
+ The timeout value when waiting for assured mode acknowledgments.
</adm:synopsis>
<adm:description>
Defines the amount of milliseconds the replication server will wait for
- assured acknowledgements (in either Safe Data or Safe Read assured sub
+ assured acknowledgments (in either Safe Data or Safe Read assured sub
modes) before forgetting them and answer to the entity that sent an update
and is waiting for acknowledgment.
</adm:description>
@@ -265,4 +265,35 @@
</ldap:attribute>
</adm:profile>
</adm:property>
+ <adm:property name="weight" mandatory="false">
+ <adm:synopsis>
+ The weight of the replication server.
+ </adm:synopsis>
+ <adm:description>
+ The weight affected to the replication server.
+ Each replication server of the topology has a weight. When combined
+ together, the weights of the replication servers of a same group can be
+ translated to a percentage that determines the quantity of directory
+ servers of the topology that should be connected to a replication server.
+ For instance imagine a topology with 3 replication servers (with the same
+ group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that
+ RS1 should have 25% of the directory servers connected in the topology,
+ RS2 25%, and RS3 50%. This may be useful if the replication servers of the
+ topology have a different power and one wants to spread the load between
+ the replication servers according to their power.
+ </adm:description>
+ <adm:default-behavior>
+ <adm:defined>
+ <adm:value>1</adm:value>
+ </adm:defined>
+ </adm:default-behavior>
+ <adm:syntax>
+ <adm:integer lower-limit="0"></adm:integer>
+ </adm:syntax>
+ <adm:profile name="ldap">
+ <ldap:attribute>
+ <ldap:name>ds-cfg-weight</ldap:name>
+ </ldap:attribute>
+ </adm:profile>
+ </adm:property>
</adm:managed-object>
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index ba4ca5b..71e4264 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -73,7 +73,6 @@
base dn : %s
MILD_ERR_ERROR_SEARCHING_RUV_15=Error %s when searching for server state %s : \
%s base dn : %s
-NOTICE_SERVER_DISCONNECT_16=%s has disconnected from this replication server %s
NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \
listening on %s
NOTICE_FOUND_CHANGELOGS_WITH_MY_CHANGES_18=Found %d replication server(s) with \
@@ -175,12 +174,13 @@
NOTICE_NOW_FOUND_SAME_GENERATION_CHANGELOG_62=Replication is up and running \
for domain %s with replication server id %s %s - local server id is %s - data \
generation is %s
-NOTICE_DISCONNECTED_FROM_CHANGELOG_63=The connection to Replication Server %s \
- %s has been dropped by the Replication Server for %s in local server id %s
+NOTICE_REPLICATION_SERVER_PROPERLY_DISCONNECTED_63=Replication server %s \
+ %s has properly disconnected for %s in local server id %s. Trying to reconnect \
+ to a suitable replication server
SEVERE_ERR_CHANGELOG_ERROR_SENDING_ERROR_65=An unexpected error occurred \
while sending an Error Message to %s. This connection is going to be closed \
and reopened
-SEVERE_ERR_CHANGELOG_ERROR_SENDING_MSG_66=An unexpected error occurred while \
+SEVERE_ERR_CHANGELOG_ERROR_SENDING_MSG_66=An unexpected error occurred while \
sending a Message to %s. This connection is going to be closed and reopened
MILD_ERR_ERROR_REPLAYING_OPERATION_67=Could not replay operation %s with \
ChangeNumber %s error %s %s
@@ -409,7 +409,7 @@
NOTICE_ERR_FRACTIONAL_FORBIDDEN_FULL_UPDATE_FRACTIONAL_172=The export of \
domain %s from server %s to all other servers of the topology is forbidden as \
the source server has some fractional configuration : only fractional servers \
- in a replicated topology does not makes sense
+ in a replicated topology does not make sense
MILD_ERR_DRAFT_CHANGENUMBER_DATABASE_173=An error occurred when accessing the \
database of the draft change number : %s
SEVERE_ERR_INITIALIZATION_FAILED_NOCONN_174=The initialization failed because \
@@ -423,4 +423,15 @@
NOTICE_ERR_LDIF_IMPORT_FRACTIONAL_DATA_SET_IS_FRACTIONAL_177=The LDIF import \
for importing suffix %s data has been stopped due to fractional configuration \
inconsistency : imported data set has some fractional configuration but not \
- local server
\ No newline at end of file
+ local server
+SEVERE_ERR_DS_DISCONNECTED_DURING_HANDSHAKE_178=Directory server %s was \
+ attempting to connect to replication server %s but has disconnected in \
+ handshake phase
+SEVERE_ERR_RS_DISCONNECTED_DURING_HANDSHAKE_179=Replication server %s was \
+ attempting to connect to replication server %s but has disconnected in \
+ handshake phase
+SEVERE_ERR_REPLICATION_SERVER_BADLY_DISCONNECTED_180=The connection to \
+ replication server %s %s has been unexpectedly dropped by the replication \
+ server for %s in local server id %s
+SEVERE_ERR_SERVER_BADLY_DISCONNECTED_181= %s has badly disconnected from this \
+ replication server %s
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index b950162..898b387 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4472,7 +4472,7 @@
* Verifies that the given string represents a valid source
* from which this server can be initialized.
* @param sourceString The string representing the source
- * @return The source as a short value
+ * @return The source as a integer value
* @throws DirectoryException if the string is not valid
*/
public int decodeSource(String sourceString)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
index fdc2197..52798ad 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
@@ -41,7 +41,7 @@
* Creates a message.
*
* @param serverID The sender server of this message.
- * @param i The server or servers targetted by this message.
+ * @param i The server or servers targeted by this message.
*/
public DoneMsg(int serverID, int i)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
index a60b658..518435b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -69,6 +69,11 @@
*/
private boolean shutdown = false;
+ /**
+ * Send StopMsg before session closure or not.
+ */
+ private boolean sendStopBeforeClose = false;
+
/**
* Create a heartbeat monitor thread.
@@ -76,13 +81,16 @@
* @param session The session on which heartbeats are to be monitored.
* @param heartbeatInterval The expected interval between heartbeats received
* (in milliseconds).
+ * @param sendStopBeforeClose Should we send a StopMsg before closing the
+ * session ?
*/
public HeartbeatMonitor(String threadName, ProtocolSession session,
- long heartbeatInterval)
+ long heartbeatInterval, boolean sendStopBeforeClose)
{
super(threadName);
this.session = session;
this.heartbeatInterval = heartbeatInterval;
+ this.sendStopBeforeClose = sendStopBeforeClose;
}
/**
@@ -117,6 +125,17 @@
{
// Heartbeat is well overdue so the server is assumed to be dead.
logError(NOTE_HEARTBEAT_FAILURE.get(currentThread().getName()));
+ if (sendStopBeforeClose)
+ {
+ // V4 protocol introduces a StopMsg to properly end communications
+ try
+ {
+ session.publish(new StopMsg());
+ } catch(IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
session.close();
break;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index 7b7e01d..e3f6e48 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -82,11 +82,6 @@
SubTopoMonitorData data = new SubTopoMonitorData();
/**
- * The protocolVersion that should be used when serializing this message.
- */
- private final short protocolVersion;
-
- /**
* Creates a new MonitorMsg.
*
* @param sender The sender of this message.
@@ -95,25 +90,8 @@
public MonitorMsg(int sender, int destination)
{
super(sender, destination);
- protocolVersion = ProtocolVersion.getCurrentVersion();
}
-
- /**
- * Creates a new MonitorMsg with a specific protocol version.
- *
- * @param sender The sender of this message.
- * @param destination The destination of this message.
- * @param replicationProtocol The protocol version to use.
- */
- public MonitorMsg(int sender, int destination,
- short replicationProtocol)
- {
- super(sender, destination);
- protocolVersion = replicationProtocol;
- }
-
-
/**
* Sets the state of the replication server.
* @param state The state.
@@ -204,7 +182,6 @@
*/
public MonitorMsg(byte[] in, short version) throws DataFormatException
{
- protocolVersion = ProtocolVersion.getCurrentVersion();
ByteSequenceReader reader = ByteString.wrap(in).asReader();
if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -328,6 +305,17 @@
*/
@Override
public byte[] getBytes()
+ throws UnsupportedEncodingException
+ {
+ return getBytes(ProtocolVersion.getCurrentVersion());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes(short protocolVersion)
+ throws UnsupportedEncodingException
{
try
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
index 47c380e..db05464 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -42,7 +42,7 @@
* Creates a message.
*
* @param serverID The sender server of this message.
- * @param destination The server or servers targetted by this message.
+ * @param destination The server or servers targeted by this message.
*/
public MonitorRequestMsg(int serverID, int destination)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
index 4825445..ee31cb6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -116,6 +116,13 @@
*/
public abstract String getRemoteAddress();
+ /**
+ * Retrieve the human readable address of the remote server.
+ *
+ * @return The human readable address of the remote server.
+ */
+ public abstract String getReadableRemoteAddress();
+
/**
* Set a timeout value.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index 997267d..b2f095c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -54,9 +54,12 @@
public static final short REPLICATION_PROTOCOL_V3 = 3;
/**
- * 4th version of the replication protocol.
- * Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
- * ECL entry attributes.
+ * The constant for the 4th version of the replication protocol.
+ * - Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
+ * ECL entry attributes.
+ * - Modified algorithm for choosing a RS to connect to: introduction of a
+ * ReplicationServerDSMsg message.
+ * - Introduction of a StopMsg for proper connections ending.
*/
public static final short REPLICATION_PROTOCOL_V4 = 4;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
new file mode 100644
index 0000000..5c507ec
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
@@ -0,0 +1,408 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+import org.opends.server.replication.common.ServerState;
+
+/**
+ * Message sent by a replication server to a directory server in reply to the
+ * ServerStartMsg.
+ */
+public class ReplServerStartDSMsg extends StartMsg
+{
+ private int serverId;
+ private String serverURL;
+ private String baseDn = null;
+ private int windowSize;
+ private ServerState serverState;
+
+ /**
+ * Whether to continue using SSL to encrypt messages after the start
+ * messages have been exchanged.
+ */
+ private boolean sslEncryption;
+
+ /**
+ * Threshold value used by the RS to determine if a DS must be put in
+ * degraded status because the number of pending changes for him has crossed
+ * this value. This field is only used by a DS.
+ */
+ private int degradedStatusThreshold = -1;
+
+ /**
+ * The weight affected to the replication server.
+ */
+ private int weight = -1;
+
+ /**
+ * Number of currently connected DS to the replication server.
+ */
+ private int connectedDSNumber = -1;
+
+ /**
+ * Create a ReplServerStartDSMsg.
+ *
+ * @param serverId replication server id
+ * @param serverURL replication server URL
+ * @param baseDn base DN for which the ReplServerStartDSMsg is created.
+ * @param windowSize The window size.
+ * @param serverState our ServerState for this baseDn.
+ * @param protocolVersion The replication protocol version of the creator.
+ * @param generationId The generationId for this server.
+ * @param sslEncryption Whether to continue using SSL to encrypt messages
+ * after the start messages have been exchanged.
+ * @param groupId The group id of the RS
+ * @param degradedStatusThreshold The degraded status threshold
+ * @param weight The weight affected to the replication server.
+ * @param connectedDSNumber Number of currently connected DS to the
+ * replication server.
+ */
+ public ReplServerStartDSMsg(int serverId, String serverURL, String baseDn,
+ int windowSize,
+ ServerState serverState,
+ short protocolVersion,
+ long generationId,
+ boolean sslEncryption,
+ byte groupId,
+ int degradedStatusThreshold,
+ int weight,
+ int connectedDSNumber)
+ {
+ super(protocolVersion, generationId);
+ this.serverId = serverId;
+ this.serverURL = serverURL;
+ if (baseDn != null)
+ this.baseDn = baseDn;
+ else
+ this.baseDn = null;
+ this.windowSize = windowSize;
+ this.serverState = serverState;
+ this.sslEncryption = sslEncryption;
+ this.groupId = groupId;
+ this.degradedStatusThreshold = degradedStatusThreshold;
+ this.weight = weight;
+ this.connectedDSNumber = connectedDSNumber;
+ }
+
+ /**
+ * Creates a new ReplServerStartDSMsg by decoding the provided byte array.
+ * @param in A byte array containing the encoded information for the
+ * ReplServerStartDSMsg
+ * @throws DataFormatException If the in does not contain a properly
+ * encoded ReplServerStartDSMsg.
+ */
+ public ReplServerStartDSMsg(byte[] in) throws DataFormatException
+ {
+ byte[] allowedPduTypes = new byte[1];
+ allowedPduTypes[0] = MSG_TYPE_REPL_SERVER_START_DS;
+ headerLength = decodeHeader(allowedPduTypes, in);
+
+ try
+ {
+ /* The ReplServerStartDSMsg payload is stored in the form :
+ * <baseDn><serverId><serverURL><windowSize><sslEncryption>
+ * <degradedStatusThreshold><weight><connectedDSNumber>
+ * <serverState>
+ */
+
+ /* first bytes are the header */
+ int pos = headerLength;
+
+ /* read the dn
+ * first calculate the length then construct the string
+ */
+ int length = getNextLength(in, pos);
+ baseDn = new String(in, pos, length, "UTF-8");
+ pos += length +1;
+
+ /*
+ * read the ServerId
+ */
+ length = getNextLength(in, pos);
+ String serverIdString = new String(in, pos, length, "UTF-8");
+ serverId = Integer.valueOf(serverIdString);
+ pos += length +1;
+
+ /*
+ * read the ServerURL
+ */
+ length = getNextLength(in, pos);
+ serverURL = new String(in, pos, length, "UTF-8");
+ pos += length +1;
+
+ /*
+ * read the window size
+ */
+ length = getNextLength(in, pos);
+ windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length +1;
+
+ /*
+ * read the sslEncryption setting
+ */
+ length = getNextLength(in, pos);
+ sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length +1;
+
+ /**
+ * read the degraded status threshold
+ */
+ length = getNextLength(in, pos);
+ degradedStatusThreshold =
+ Integer.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length + 1;
+
+ /*
+ * read the weight
+ */
+ length = getNextLength(in, pos);
+ String weightString = new String(in, pos, length, "UTF-8");
+ weight = Integer.valueOf(weightString);
+ pos += length +1;
+
+ /*
+ * read the connected DS number
+ */
+ length = getNextLength(in, pos);
+ String connectedDSNumberString = new String(in, pos, length, "UTF-8");
+ connectedDSNumber = Integer.valueOf(connectedDSNumberString);
+ pos += length +1;
+
+ // Read the ServerState
+ // Caution: ServerState MUST be the last field. Because ServerState can
+ // contain null character (string termination of serverid string ..) it
+ // cannot be decoded using getNextLength() like the other fields. The
+ // only way is to rely on the end of the input buffer : and that forces
+ // the ServerState to be the last. This should be changed and we want to
+ // have more than one ServerState field.
+ serverState = new ServerState(in, pos, in.length - 1);
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * Get the Server Id.
+ * @return the server id
+ */
+ public int getServerId()
+ {
+ return this.serverId;
+ }
+
+ /**
+ * Get the server URL.
+ * @return the server URL
+ */
+ public String getServerURL()
+ {
+ return this.serverURL;
+ }
+
+ /**
+ * Get the base DN from this ReplServerStartDSMsg.
+ *
+ * @return the base DN from this ReplServerStartDSMsg.
+ */
+ public String getBaseDn()
+ {
+ return baseDn;
+ }
+
+ /**
+ * Get the serverState.
+ * @return Returns the serverState.
+ */
+ public ServerState getServerState()
+ {
+ return this.serverState;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ throws UnsupportedEncodingException
+ {
+ return getBytes(ProtocolVersion.getCurrentVersion());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes(short protocolVersion)
+ throws UnsupportedEncodingException
+ {
+ /* The ReplServerStartDSMsg is stored in the form :
+ * <operation type><baseDn><serverId><serverURL><windowSize><sslEncryption>
+ * <degradedStatusThreshold><weight><connectedDSNumber>
+ * <serverState>
+ */
+
+ byte[] byteDn = baseDn.getBytes("UTF-8");
+ byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
+ byte[] byteServerUrl = serverURL.getBytes("UTF-8");
+ byte[] byteServerState = serverState.getBytes();
+ byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
+ byte[] byteSSLEncryption =
+ String.valueOf(sslEncryption).getBytes("UTF-8");
+ byte[] byteDegradedStatusThreshold =
+ String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
+ byte[] byteWeight =
+ String.valueOf(weight).getBytes("UTF-8");
+ byte[] byteConnectedDSNumber =
+ String.valueOf(connectedDSNumber).getBytes("UTF-8");
+
+ int length = byteDn.length + 1 + byteServerId.length + 1 +
+ byteServerUrl.length + 1 + byteWindowSize.length + 1 +
+ byteSSLEncryption.length + 1 + byteDegradedStatusThreshold.length + 1 +
+ byteWeight.length + 1 + byteConnectedDSNumber.length + 1 +
+ byteServerState.length + 1;
+
+ /* encode the header in a byte[] large enough */
+ byte resultByteArray[] =
+ encodeHeader(MSG_TYPE_REPL_SERVER_START_DS, length, protocolVersion);
+
+ int pos = headerLength;
+
+ /* put the baseDN and a terminating 0 */
+ pos = addByteArray(byteDn, resultByteArray, pos);
+
+ /* put the ServerId */
+ pos = addByteArray(byteServerId, resultByteArray, pos);
+
+ /* put the ServerURL */
+ pos = addByteArray(byteServerUrl, resultByteArray, pos);
+
+ /* put the window size */
+ pos = addByteArray(byteWindowSize, resultByteArray, pos);
+
+ /* put the SSL Encryption setting */
+ pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
+
+ /* put the degraded status threshold */
+ pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
+
+ /* put the weight */
+ pos = addByteArray(byteWeight, resultByteArray, pos);
+
+ /* put the connected DS number */
+ pos = addByteArray(byteConnectedDSNumber, resultByteArray, pos);
+
+ /* put the ServerState */
+ pos = addByteArray(byteServerState, resultByteArray, pos);
+
+ return resultByteArray;
+ }
+
+ /**
+ * get the window size for the server that created this message.
+ *
+ * @return The window size for the server that created this message.
+ */
+ public int getWindowSize()
+ {
+ return windowSize;
+ }
+
+ /**
+ * Get the SSL encryption value for the server that created the
+ * message.
+ *
+ * @return The SSL encryption value for the server that created the
+ * message.
+ */
+ public boolean getSSLEncryption()
+ {
+ return sslEncryption;
+ }
+
+ /**
+ * Get the degraded status threshold value.
+ * @return The degraded status threshold value.
+ */
+ public int getDegradedStatusThreshold()
+ {
+ return degradedStatusThreshold;
+ }
+
+ /**
+ * Set the degraded status threshold (For test purpose).
+ * @param degradedStatusThreshold The degraded status threshold to set.
+ */
+ public void setDegradedStatusThreshold(int degradedStatusThreshold)
+ {
+ this.degradedStatusThreshold = degradedStatusThreshold;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString()
+ {
+ return "ReplServerStartDSMsg content: " +
+ "\nprotocolVersion: " + protocolVersion +
+ "\ngenerationId: " + generationId +
+ "\nbaseDn: " + baseDn +
+ "\ngroupId: " + groupId +
+ "\nserverId: " + serverId +
+ "\nserverState: " + serverState +
+ "\nserverURL: " + serverURL +
+ "\nsslEncryption: " + sslEncryption +
+ "\ndegradedStatusThreshold: " + degradedStatusThreshold +
+ "\nwindowSize: " + windowSize +
+ "\nweight: " + weight +
+ "\nconnectedDSNumber: " + connectedDSNumber;
+ }
+
+ /**
+ * Gets the weight of the replication server.
+ * @return The weight of the replication server.
+ */
+ public int getWeight()
+ {
+ return weight;
+ }
+
+ /**
+ * Gets the number of directory servers connected to the replication server.
+ * @return The number of directory servers connected to the replication
+ * server.
+ */
+ public int getConnectedDSNumber()
+ {
+ return connectedDSNumber;
+ }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
index 7c83505..6fce977 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -50,6 +50,14 @@
private boolean sslEncryption;
/**
+ * NOTE: Starting from protocol V4, we introduce a dedicated PDU for answering
+ * to the DS ServerStartMsg. This is the ReplServerStartDSMsg. So the
+ * degradedStatusThreshold value being used only by a DS, it could be removed
+ * from the ReplServerStartMsg PDU. However for a smoothly transition to V4
+ * protocol, we prefer to let this variable also in this PDU but the one
+ * really used is in the ReplServerStartDSMsg PDU. This prevents from having
+ * only RSv3 able to connect to RSv4 as connection initiator.
+ *
* Threshold value used by the RS to determine if a DS must be put in
* degraded status because the number of pending changes for him has crossed
* this value. This field is only used by a DS.
@@ -108,6 +116,15 @@
allowedPduTypes[1] = MSG_TYPE_REPL_SERVER_START_V1;
headerLength = decodeHeader(allowedPduTypes, in);
+ // Protocol version has been read as part of the header:
+ // decode the body according to the protocol version read in the header
+ switch(protocolVersion)
+ {
+ case ProtocolVersion.REPLICATION_PROTOCOL_V1:
+ decodeBody_V1(in, headerLength);
+ return;
+ }
+
try
{
/* The ReplServerStartMsg payload is stored in the form :
@@ -154,22 +171,85 @@
sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
pos += length +1;
- // For easiness (no additional method), simply compare PDU type to
- // know if we have to read new parameters of V2
- if (in[0] == MSG_TYPE_REPL_SERVER_START)
- {
- /**
- * read the degraded status threshold
- */
- length = getNextLength(in, pos);
- degradedStatusThreshold =
- Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
- }
+ /**
+ * read the degraded status threshold
+ */
+ length = getNextLength(in, pos);
+ degradedStatusThreshold =
+ Integer.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length + 1;
// Read the ServerState
// Caution: ServerState MUST be the last field. Because ServerState can
- // contain null character (string termination of sererid string ..) it
+ // contain null character (string termination of serverid string ..) it
+ // cannot be decoded using getNextLength() like the other fields. The
+ // only way is to rely on the end of the input buffer : and that forces
+ // the ServerState to be the last. This should be changed and we want to
+ // have more than one ServerState field.
+ serverState = new ServerState(in, pos, in.length - 1);
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * Decodes the body of a just received ReplServerStartMsg. The body is in the
+ * passed array, and starts at the provided location. This is for a PDU
+ * encoded in V1 protocol version.
+ * @param in A byte array containing the body for the ReplServerStartMsg
+ * @param pos The position in the array where the decoding should start
+ * @throws DataFormatException If the in does not contain a properly
+ * encoded ReplServerStartMsg.
+ */
+ public void decodeBody_V1(byte[] in, int pos) throws DataFormatException
+ {
+ try
+ {
+ /* The ReplServerStartMsg payload is stored in the form :
+ * <baseDn><serverId><serverURL><windowSize><sslEncryption>
+ * <serverState>
+ */
+
+ /* read the dn
+ * first calculate the length then construct the string
+ */
+ int length = getNextLength(in, pos);
+ baseDn = new String(in, pos, length, "UTF-8");
+ pos += length +1;
+
+ /*
+ * read the ServerId
+ */
+ length = getNextLength(in, pos);
+ String serverIdString = new String(in, pos, length, "UTF-8");
+ serverId = Integer.valueOf(serverIdString);
+ pos += length +1;
+
+ /*
+ * read the ServerURL
+ */
+ length = getNextLength(in, pos);
+ serverURL = new String(in, pos, length, "UTF-8");
+ pos += length +1;
+
+ /*
+ * read the window size
+ */
+ length = getNextLength(in, pos);
+ windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length +1;
+
+ /*
+ * read the sslEncryption setting
+ */
+ length = getNextLength(in, pos);
+ sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length +1;
+
+ // Read the ServerState
+ // Caution: ServerState MUST be the last field. Because ServerState can
+ // contain null character (string termination of serverid string ..) it
// cannot be decoded using getNextLength() like the other fields. The
// only way is to rely on the end of the input buffer : and that forces
// the ServerState to be the last. This should be changed and we want to
@@ -235,8 +315,12 @@
public byte[] getBytes(short protocolVersion)
throws UnsupportedEncodingException
{
- if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
- return getBytes_V1();
+ // If an older version requested, encode in the requested way
+ switch(protocolVersion)
+ {
+ case ProtocolVersion.REPLICATION_PROTOCOL_V1:
+ return getBytes_V1();
+ }
/* The ReplServerStartMsg is stored in the form :
* <operation type><baseDn><serverId><serverURL><windowSize><sslEncryption>
@@ -254,12 +338,12 @@
String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
int length = byteDn.length + 1 + byteServerId.length + 1 +
- byteServerUrl.length + 1 + byteWindowSize.length + 1 +
- byteSSLEncryption.length + 1 +
- byteDegradedStatusThreshold.length + 1 +
- byteServerState.length + 1;
+ byteServerUrl.length + 1 + byteWindowSize.length + 1 +
+ byteSSLEncryption.length + 1 +
+ byteDegradedStatusThreshold.length + 1 +
+ byteServerState.length + 1;
- /* encode the header in a byte[] large enough to also contain the mods */
+ /* encode the header in a byte[] large enough */
byte resultByteArray[] =
encodeHeader(MSG_TYPE_REPL_SERVER_START, length, protocolVersion);
@@ -377,7 +461,7 @@
byteSSLEncryption.length + 1 +
byteServerState.length + 1;
- /* encode the header in a byte[] large enough to also contain the mods */
+ /* encode the header in a byte[] large enough */
byte resultByteArray[] = encodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1,
length);
int pos = headerLength;
@@ -407,5 +491,4 @@
return null;
}
}
-
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index 466d884..e2ba2e3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -71,12 +71,16 @@
static final byte MSG_TYPE_CHANGE_STATUS = 28;
static final byte MSG_TYPE_GENERIC_UPDATE = 29;
- // Protocol version : 3
+ // Added for protocol version 3
static final byte MSG_TYPE_START_ECL = 30;
static final byte MSG_TYPE_START_ECL_SESSION = 31;
static final byte MSG_TYPE_ECL_UPDATE = 32;
static final byte MSG_TYPE_CT_HEARTBEAT = 33;
+ // Added for protocol version 4
+ static final byte MSG_TYPE_REPL_SERVER_START_DS = 34;
+ static final byte MSG_TYPE_STOP = 35;
+
// Adding a new type of message here probably requires to
// change accordingly generateMsg method below
@@ -238,6 +242,12 @@
case MSG_TYPE_CT_HEARTBEAT:
msg = new ChangeTimeHeartbeatMsg(buffer);
break;
+ case MSG_TYPE_REPL_SERVER_START_DS:
+ msg = new ReplServerStartDSMsg(buffer);
+ break;
+ case MSG_TYPE_STOP:
+ msg = new StopMsg(buffer);
+ break;
default:
throw new DataFormatException("received message with unknown type");
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 0631618..18cf6d3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -232,6 +232,14 @@
/**
* {@inheritDoc}
*/
+ public String getReadableRemoteAddress()
+ {
+ return socket.getRemoteSocketAddress().toString();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
public void setSoTimeout(int timeout) throws SocketException
{
socket.setSoTimeout(timeout);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
new file mode 100644
index 0000000..e6c260a
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
@@ -0,0 +1,70 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.util.zip.DataFormatException;
+
+/**
+ * This message is part of the replication protocol.
+ * This message is sent by a server to tell a peer the communication will be
+ * terminated.
+ */
+public class StopMsg extends ReplicationMsg
+{
+ /**
+ * Creates a message.
+ */
+ public StopMsg()
+ {
+ }
+
+ /**
+ * Creates a new message by decoding the provided byte array.
+ * @param in A byte array containing the encoded information for the message,
+ * @throws DataFormatException If the in does not contain a properly,
+ * encoded message.
+ */
+ public StopMsg(byte[] in) throws DataFormatException
+ {
+ // First byte is the type
+ if (in[0] != MSG_TYPE_STOP)
+ throw new DataFormatException("input is not a valid Stop message: " +
+ in[0]);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ return new byte[]
+ {
+ MSG_TYPE_STOP
+ };
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index 4b2fc59..c20d895 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -243,6 +243,14 @@
/**
* {@inheritDoc}
*/
+ public String getReadableRemoteAddress()
+ {
+ return plainSocket.getRemoteSocketAddress().toString();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
public void setSoTimeout(int timeout) throws SocketException
{
plainSocket.setSoTimeout(timeout);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 3cd91c4..40fe57d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -145,7 +145,21 @@
try
{
if (session != null)
+ {
+ // V4 protocol introduces a StopMsg to properly close the
+ // connection between servers
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ try
+ {
+ session.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
session.close();
+ }
} catch (IOException e)
{
// ignore
@@ -461,7 +475,7 @@
{
TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForDS(
this.serverId);
- session.publish(outTopoMsg);
+ session.publish(outTopoMsg, protocolVersion);
return outTopoMsg;
}
/**
@@ -500,14 +514,13 @@
return;
}
- //
- ReplServerStartMsg outReplServerStartMsg = null;
+ StartMsg outStartMsg = null;
try
{
- outReplServerStartMsg = sendStartToRemote(protocolVersion);
+ outStartMsg = sendStartToRemote(protocolVersion);
// log
- logStartHandshakeRCVandSND(inServerStartMsg, outReplServerStartMsg);
+ logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
// The session initiator decides whether to use SSL.
// Until here session is encrypted then it depends on the negotiation
@@ -517,6 +530,13 @@
// wait and process StartSessionMsg from remote RS
StartSessionMsg inStartSessionMsg =
waitAndProcessStartSessionFromRemoteDS();
+ if (inStartSessionMsg == null)
+ {
+ // DS wants to properly close the connection (DS sent a StopMsg)
+ logStopReceived();
+ abortStart(null);
+ return;
+ }
// Send our own TopologyMsg to remote RS
TopologyMsg outTopoMsg = sendTopoToRemoteDS();
@@ -525,18 +545,12 @@
}
catch(IOException e)
{
- // We do not want polluting error log if error is due to normal session
- // aborted after handshake phase one from a DS that is searching for
- // best suitable RS.
-
- // don't log a polluting error when connection aborted
- // from a DS that wanted only to perform handshake phase 1 in order
- // to determine the best suitable RS:
- // 1) -> ServerStartMsg
- // 2) <- ReplServerStartMsg
- // 3) connection closure
-
- throw new DirectoryException(ResultCode.OTHER, null, null);
+ Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
+ Integer.toString(inServerStartMsg.getServerId()),
+ Integer.toString(replicationServerDomain.getReplicationServer().
+ getServerId()));
+ logError(errMessage);
+ throw new DirectoryException(ResultCode.OTHER, errMessage);
}
catch (NotSupportedOldVersionPDUException e)
{
@@ -578,6 +592,65 @@
replicationServerDomain.release();
}
}
+
+ /**
+ * Send the ReplServerStartDSMsg to the remote DS.
+ * @param requestedProtocolVersion The provided protocol version.
+ * @return The StartMsg sent.
+ * @throws IOException When an exception occurs.
+ */
+ private StartMsg sendStartToRemote(short requestedProtocolVersion)
+ throws IOException
+ {
+ // Before V4 protocol, we sent a ReplServerStartMsg
+ if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+
+ // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
+ ReplServerStartMsg outReplServerStartMsg
+ = new ReplServerStartMsg(
+ replicationServerId,
+ replicationServerURL,
+ getServiceId(),
+ maxRcvWindow,
+ replicationServerDomain.getDbServerState(),
+ protocolVersion,
+ localGenerationId,
+ sslEncryption,
+ getLocalGroupId(),
+ replicationServerDomain.
+ getReplicationServer().getDegradedStatusThreshold());
+
+ session.publish(outReplServerStartMsg, requestedProtocolVersion);
+
+ return outReplServerStartMsg;
+ }
+ else
+ {
+ // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
+ ReplServerStartDSMsg outReplServerStartDSMsg
+ = new ReplServerStartDSMsg(
+ replicationServerId,
+ replicationServerURL,
+ getServiceId(),
+ maxRcvWindow,
+ replicationServerDomain.getDbServerState(),
+ protocolVersion,
+ localGenerationId,
+ sslEncryption,
+ getLocalGroupId(),
+ replicationServerDomain.
+ getReplicationServer().getDegradedStatusThreshold(),
+ replicationServer.getWeight(),
+ replicationServerDomain.getConnectedLDAPservers().size());
+
+
+ session.publish(outReplServerStartDSMsg);
+
+ return outReplServerStartDSMsg;
+ }
+ }
+
/**
* Creates a DSInfo structure representing this remote DS.
* @return The DSInfo structure representing this remote DS
@@ -609,8 +682,10 @@
}
/**
- * Wait receiving the StartSessionMsg from the remote DS and process it.
- * @return the startSessionMsg received
+ * Wait receiving the StartSessionMsg from the remote DS and process it, or
+ * receiving a StopMsg to properly stop the handshake procedure.
+ * @return the startSessionMsg received or null DS sent a stop message to
+ * not finish the handshake.
* @throws DirectoryException
* @throws IOException
* @throws ClassNotFoundException
@@ -625,7 +700,12 @@
ReplicationMsg msg = null;
msg = session.receive();
- if (!(msg instanceof StartSessionMsg))
+ if (msg instanceof StopMsg)
+ {
+ // DS wants to stop handshake (was just for handshake phase one for RS
+ // choice). Return null to make the session be terminated.
+ return null;
+ } else if (!(msg instanceof StartSessionMsg))
{
Message message = Message.raw(
"Protocol error: StartSessionMsg required." + msg + " received.");
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 82aacd7..923e1b6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -331,6 +331,64 @@
}
/**
+ * Send the ReplServerStartDSMsg to the remote ECL server.
+ * @param requestedProtocolVersion The provided protocol version.
+ * @return The StartMsg sent.
+ * @throws IOException When an exception occurs.
+ */
+ private StartMsg sendStartToRemote(short requestedProtocolVersion)
+ throws IOException
+ {
+ // Before V4 protocol, we sent a ReplServerStartMsg
+ if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+
+ // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
+ ReplServerStartMsg outReplServerStartMsg
+ = new ReplServerStartMsg(
+ replicationServerId,
+ replicationServerURL,
+ getServiceId(),
+ maxRcvWindow,
+ replicationServerDomain.getDbServerState(),
+ protocolVersion,
+ localGenerationId,
+ sslEncryption,
+ getLocalGroupId(),
+ replicationServerDomain.
+ getReplicationServer().getDegradedStatusThreshold());
+
+ session.publish(outReplServerStartMsg, requestedProtocolVersion);
+
+ return outReplServerStartMsg;
+ }
+ else
+ {
+ // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
+ ReplServerStartDSMsg outReplServerStartDSMsg
+ = new ReplServerStartDSMsg(
+ replicationServerId,
+ replicationServerURL,
+ getServiceId(),
+ maxRcvWindow,
+ replicationServerDomain.getDbServerState(),
+ protocolVersion,
+ localGenerationId,
+ sslEncryption,
+ getLocalGroupId(),
+ replicationServerDomain.
+ getReplicationServer().getDegradedStatusThreshold(),
+ replicationServer.getWeight(),
+ replicationServerDomain.getConnectedLDAPservers().size());
+
+
+ session.publish(outReplServerStartDSMsg);
+
+ return outReplServerStartDSMsg;
+ }
+ }
+
+ /**
* Creates a new handler object to a remote replication server.
* @param session The session with the remote RS.
* @param queueSize The queue size to manage updates to that RS.
@@ -406,12 +464,14 @@
// lock with timeout
lockDomain(true);
+ this.localGenerationId = replicationServerDomain.getGenerationId();
+
// send start to remote
- ReplServerStartMsg outReplServerStartMsg =
+ StartMsg outStartMsg =
sendStartToRemote(protocolVersion);
// log
- logStartHandshakeRCVandSND(inECLStartMsg, outReplServerStartMsg);
+ logStartHandshakeRCVandSND(inECLStartMsg, outStartMsg);
// until here session is encrypted then it depends on the negociation
// The session initiator decides whether to use SSL.
@@ -421,6 +481,14 @@
// wait and process StartSessionMsg from remote RS
StartECLSessionMsg inStartECLSessionMsg =
waitAndProcessStartSessionECLFromRemoteServer();
+ if (inStartECLSessionMsg == null)
+ {
+ // client wants to properly close the connection (client sent a
+ // StopMsg)
+ logStopReceived();
+ abortStart(null);
+ return;
+ }
logStartECLSessionHandshake(inStartECLSessionMsg);
@@ -462,7 +530,12 @@
ReplicationMsg msg = null;
msg = session.receive();
- if (!(msg instanceof StartECLSessionMsg))
+ if (msg instanceof StopMsg)
+ {
+ // client wants to stop handshake (was just for handshake phase one for RS
+ // choice). Return null to make the session be terminated.
+ return null;
+ } else if (!(msg instanceof StartECLSessionMsg))
{
Message message = Message.raw(
"Protocol error: StartECLSessionMsg required." + msg + " received.");
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 53efc16..2567751 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -169,7 +169,7 @@
catch (SocketException e)
{
// Just ignore the exception and let the thread die as well
- errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
+ errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
"for operation " + handler.getOperationId());
logError(errMessage);
}
@@ -198,7 +198,7 @@
}
/**
- * Loop geting changes from the domain and publishing them either to
+ * Loop getting changes from the domain and publishing them either to
* the provided session or to the ECL session interface.
* @throws IOException when raised (connection closure)
* @throws InterruptedException when raised
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 8cd9f1a..c093c86 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -189,6 +189,21 @@
final private Object domainMonitor = new Object();
/**
+ * The weight affected to the replication server.
+ * Each replication server of the topology has a weight. When combined
+ * together, the weights of the replication servers of a same group can be
+ * translated to a percentage that determines the quantity of directory
+ * servers of the topology that should be connected to a replication server.
+ * For instance imagine a topology with 3 replication servers (with the same
+ * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that
+ * RS1 should have 25% of the directory servers connected in the topology,
+ * RS2 25%, and RS3 50%. This may be useful if the replication servers of the
+ * topology have a different power and one wants to spread the load between
+ * the replication servers according to their power.
+ */
+ private int weight = 1;
+
+ /**
* Creates a new Replication server using the provided configuration entry.
*
* @param configuration The configuration of this replication server.
@@ -979,6 +994,13 @@
}
}
+ // Set a potential new weight
+ if (weight != configuration.getWeight())
+ {
+ weight = configuration.getWeight();
+ // TODO: send new TopologyMsg
+ }
+
if ((configuration.getReplicationDBDirectory() != null) &&
(!dbDirname.equals(configuration.getReplicationDBDirectory())))
{
@@ -1786,4 +1808,13 @@
}
return result;
}
+
+ /**
+ * Gets the weight.
+ * @return the weight
+ */
+ public int getWeight()
+ {
+ return weight;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 8d3b4a7..a301fc3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1554,20 +1554,8 @@
// in the topology.
if (senderHandler.isDataServer())
{
- MonitorMsg returnMsg;
-
- if (senderHandler.getProtocolVersion() >
- ProtocolVersion.REPLICATION_PROTOCOL_V1)
- {
- returnMsg =
+ MonitorMsg returnMsg =
new MonitorMsg(msg.getDestination(), msg.getsenderID());
- }
- else
- {
- returnMsg =
- new MonitorMsg(msg.getDestination(), msg.getsenderID(),
- ProtocolVersion.REPLICATION_PROTOCOL_V1);
- }
try
{
@@ -1613,20 +1601,8 @@
return;
}
- MonitorMsg monitorMsg;
-
- if (senderHandler.getProtocolVersion() >
- ProtocolVersion.REPLICATION_PROTOCOL_V1)
- {
- monitorMsg =
- new MonitorMsg(msg.getDestination(), msg.getsenderID());
- }
- else
- {
- monitorMsg =
- new MonitorMsg(msg.getDestination(), msg.getsenderID(),
- ProtocolVersion.REPLICATION_PROTOCOL_V1);
- }
+ MonitorMsg monitorMsg =
+ new MonitorMsg(msg.getDestination(), msg.getsenderID());
// Populate for each connected LDAP Server
// from the states stored in the serverHandler.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 8693dd9..6ce4627 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -121,6 +121,34 @@
}
/**
+ * Send the ReplServerStartMsg to the remote RS.
+ * @param requestedProtocolVersion The provided protocol version.
+ * @return The ReplServerStartMsg sent.
+ * @throws IOException When an exception occurs.
+ */
+ private ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
+ throws IOException
+ {
+ ReplServerStartMsg outReplServerStartMsg
+ = new ReplServerStartMsg(
+ replicationServerId,
+ replicationServerURL,
+ getServiceId(),
+ maxRcvWindow,
+ replicationServerDomain.getDbServerState(),
+ protocolVersion,
+ localGenerationId,
+ sslEncryption,
+ getLocalGroupId(),
+ replicationServerDomain.
+ getReplicationServer().getDegradedStatusThreshold());
+
+ session.publish(outReplServerStartMsg, requestedProtocolVersion);
+
+ return outReplServerStartMsg;
+ }
+
+ /**
* Creates a new handler object to a remote replication server.
* @param session The session with the remote RS.
* @param queueSize The queue size to manage updates to that RS.
@@ -262,6 +290,7 @@
// lock with timeout
lockDomain(true);
+ this.localGenerationId = replicationServerDomain.getGenerationId();
ReplServerStartMsg outReplServerStartMsg =
sendStartToRemote(protocolVersion);
@@ -389,6 +418,14 @@
super.finalizeStart();
}
+ catch(IOException ioe) {
+ Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(
+ Integer.toString(inReplServerStartMsg.getServerId()),
+ Integer.toString(replicationServerDomain.getReplicationServer().
+ getServerId()));
+ logError(errMessage);
+ abortStart(errMessage);
+ }
catch(DirectoryException de)
{
abortStart(de.getMessageObject());
@@ -425,7 +462,7 @@
throws IOException
{
TopologyMsg outTopoMsg = replicationServerDomain.createTopologyMsgForRS();
- session.publish(outTopoMsg);
+ session.publish(outTopoMsg, protocolVersion);
return outTopoMsg;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index ce95024..489dbae 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,13 +50,14 @@
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
+import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
@@ -94,17 +95,21 @@
if (providedMsg != null)
{
if (debugEnabled())
- TRACER.debugInfo("In "
- + ((handler!=null)?handler.toString():"Replication Server")
- + " closing session with err=" +
- providedMsg.toString());
+ TRACER.debugInfo("In " +
+ ((handler != null) ? handler.toString() : "Replication Server") +
+ " closing session with err=" +
+ providedMsg.toString());
logError(providedMsg);
}
try
{
- if (providedSession!=null)
+ if (providedSession != null)
+ // This method is only called when aborting a failing handshake and
+ // not StopMsg should be sent in such situation. StopMsg are only
+ // expected when full handshake has been performed, or at end of
+ // handshake phase 1, when DS was just gathering available RS info
providedSession.close();
- } catch (IOException ee)
+ } catch (IOException e)
{
// ignore
}
@@ -174,7 +179,10 @@
private int rcvWindow;
private int rcvWindowSizeHalf;
- private int maxRcvWindow;
+ /**
+ * The size of the receiving window.
+ */
+ protected int maxRcvWindow;
/**
* Semaphore that the writer uses to control the flow to the remote server.
*/
@@ -197,7 +205,7 @@
*/
protected long localGenerationId = -1;
/**
- * The generation id before procesing a new start handshake.
+ * The generation id before processing a new start handshake.
*/
protected long oldGenerationId = -1;
/**
@@ -210,7 +218,7 @@
protected boolean initSslEncryption;
/**
- * The SSL encryption after the negociation with the peer.
+ * The SSL encryption after the negotiation with the peer.
*/
protected boolean sslEncryption;
/**
@@ -275,17 +283,6 @@
// be disturbed
if (session!=null)
{
- try
- {
- session.publish(
- new ErrorMsg(
- replicationServerDomain.getReplicationServer().getServerId(),
- serverId,
- reason));
- }
- catch(Exception e)
- {
- }
closeSession(session, reason, this);
}
@@ -991,7 +988,14 @@
replicationServerDomain.getReplicationServer().
getMonitorInstanceName() + this +
" publishes message:\n" + msg);
- session.publish(msg);
+ // Currently only MonitorMsg has to support a backward compatibility
+ if (msg instanceof MonitorMsg)
+ {
+ session.publish(msg, protocolVersion);
+ } else
+ {
+ session.publish(msg);
+ }
}
/**
@@ -1017,35 +1021,6 @@
}
/**
- * Send the ReplServerStartMsg to the remote server (RS or DS).
- * @param requestedProtocolVersion The provided protocol version.
- * @return The ReplServerStartMsg sent.
- * @throws IOException When an exception occurs.
- */
- public ReplServerStartMsg sendStartToRemote(short requestedProtocolVersion)
- throws IOException
- {
- this.localGenerationId = replicationServerDomain.getGenerationId();
- ReplServerStartMsg outReplServerStartMsg
- = new ReplServerStartMsg(
- replicationServerId,
- replicationServerURL,
- getServiceId(),
- maxRcvWindow,
- replicationServerDomain.getDbServerState(),
- protocolVersion,
- localGenerationId,
- sslEncryption,
- getLocalGroupId(),
- replicationServerDomain.
- getReplicationServer().getDegradedStatusThreshold());
-
- session.publish(outReplServerStartMsg, requestedProtocolVersion);
-
- return outReplServerStartMsg;
- }
-
- /**
* Sends the provided TopologyMsg to the peer server.
*
* @param topoMsg The TopologyMsg message to be sent.
@@ -1058,7 +1033,7 @@
// V1 Rs do not support the TopologyMsg
if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
- session.publish(topoMsg);
+ session.publish(topoMsg, protocolVersion);
}
}
@@ -1110,6 +1085,18 @@
if (session != null)
{
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end
+ // communications
+ try
+ {
+ session.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
// Close session to end ServerReader or ServerWriter
try
{
@@ -1328,12 +1315,27 @@
replicationServerDomain.getReplicationServer().
getMonitorInstanceName() + ", " +
this.getClass().getSimpleName() + " " + this + " :" +
- "\nSH SESSION HANDSHAKE RECEIVED:\n" +
+ "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
"\nAND REPLIED:\n" + outTopoMsg.toString());
}
}
/**
+ * Log stop message has been received.
+ */
+ protected void logStopReceived()
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + ", " +
+ this.getClass().getSimpleName() + " " + this + " :" +
+ "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
+ }
+ }
+
+ /**
* Log the messages involved in the Topology/StartSession handshake.
* @param inStartECLSessionMsg The message received first.
*/
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index c238e19..4b5037d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -272,6 +272,18 @@
ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
replicationServerDomain.processChangeTimeHeartbeatMsg(handler,
cthbMsg);
+ } else if (msg instanceof StopMsg)
+ {
+ // Peer server is properly disconnecting: go out of here to
+ // properly close the server handler going to finally block.
+ if (debugEnabled())
+ {
+ TRACER.debugInfo(handler.toString() + " has properly " +
+ "disconnected from this replication server " +
+ Integer.toString(replicationServerDomain.getReplicationServer().
+ getServerId()));
+ }
+ return;
} else if (msg == null)
{
/*
@@ -308,7 +320,7 @@
" reader IO EXCEPTION for serverID=" + serverId + " " +
this + " " +
stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
- errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
+ errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
Integer.toString(replicationServerDomain.
getReplicationServer().getServerId()));
logError(errMessage);
@@ -346,7 +358,7 @@
finally
{
/*
- * The thread only exit the loop above is some error condition
+ * The thread only exits the loop above if some error condition
* happen.
* Attempt to close the socket and stop the server handler.
*/
@@ -357,6 +369,19 @@
"In RS " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
this + " is closing the session");
+ if (handler.getProtocolVersion() >=
+ ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end
+ // communications
+ try
+ {
+ session.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
session.close();
} catch (IOException e)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 37579ba..80ad900 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -41,6 +41,8 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -198,7 +200,7 @@
* The remote host has disconnected and this particular Tree is going to
* be removed, just ignore the exception and let the thread die as well
*/
- errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
+ errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
Integer.toString(replicationServerDomain.
getReplicationServer().getServerId()));
logError(errMessage);
@@ -209,7 +211,7 @@
* The remote host has disconnected and this particular Tree is going to
* be removed, just ignore the exception and let the thread die as well
*/
- errMessage = NOTE_SERVER_DISCONNECT.get(handler.toString(),
+ errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
Integer.toString(replicationServerDomain.
getReplicationServer().getServerId()));
logError(errMessage);
@@ -225,6 +227,18 @@
logError(errMessage);
}
finally {
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end
+ // communications
+ try
+ {
+ session.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
try
{
session.close();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 9b39dc7..e058301 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -61,6 +62,7 @@
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -68,6 +70,7 @@
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
@@ -112,9 +115,6 @@
// Our replication domain
private ReplicationDomain domain = null;
- // Trick for avoiding a inner class for many parameters return for
- // performPhaseOneHandshake method.
- private String tmpReadableServerName = null;
/**
* The expected duration in milliseconds between heartbeats received
* from the replication server. Zero means heartbeats are off.
@@ -183,7 +183,7 @@
* @param groupId The group id of our domain.
* @param changeTimeHeartbeatInterval The interval (in ms) between Change
* time heartbeats are sent to the RS,
- * or zero if no CN heartbeat shoud be sent.
+ * or zero if no CN heartbeat should be sent.
*/
public ReplicationBroker(ReplicationDomain replicationDomain,
ServerState state, String baseDn, int serverID2, int window,
@@ -290,23 +290,93 @@
/**
* Bag class for keeping info we get from a server in order to compute the
- * best one to connect to.
+ * best one to connect to. This is in fact a wrapper to a
+ * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4).
*/
public static class ServerInfo
{
-
- private ServerState serverState = null;
+ private short protocolVersion;
+ private long generationId;
private byte groupId = (byte) -1;
+ private int serverId;
+ private String serverURL;
+ private String baseDn = null;
+ private int windowSize;
+ private ServerState serverState;
+ private boolean sslEncryption;
+ private int degradedStatusThreshold = -1;
+ // Keeps the -1 value if created with a ReplServerStartMsg
+ private int weight = -1;
+ // Keeps the -1 value if created with a ReplServerStartMsg
+ private int connectedDSNumber = -1;
/**
- * Constructor.
- * @param serverState Server state of the RS
- * @param groupId Group id of the RS
+ * Create a new instance of ServerInfo wrapping the passed message.
+ * @param msg Message to wrap.
+ * @return The new instance wrapping the passed message.
+ * @throws IllegalArgumentException If the passed message has an unexpected
+ * type.
*/
- public ServerInfo(ServerState serverState, byte groupId)
+ public static ServerInfo newServerInfo(
+ ReplicationMsg msg) throws IllegalArgumentException
{
- this.serverState = serverState;
- this.groupId = groupId;
+ if (msg instanceof ReplServerStartMsg)
+ {
+ // This is a ReplServerStartMsg (RS uses protocol V3 or under)
+ ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg)msg;
+ return new ServerInfo(replServerStartMsg);
+ }
+ else if (msg instanceof ReplServerStartDSMsg)
+ {
+ // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
+ ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg)msg;
+ return new ServerInfo(replServerStartDSMsg);
+ }
+
+ // Unsupported message type: should not happen
+ throw new IllegalArgumentException("Unexpected PDU type: " +
+ msg.getClass().getName() + " :\n" + msg.toString());
+ }
+
+ /**
+ * Constructs a ServerInfo object wrapping a ReplServerStartMsg.
+ * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
+ */
+ private ServerInfo(ReplServerStartMsg replServerStartMsg)
+ {
+ this.protocolVersion = replServerStartMsg.getVersion();
+ this.generationId = replServerStartMsg.getGenerationId();
+ this.groupId = replServerStartMsg.getGroupId();
+ this.serverId = replServerStartMsg.getServerId();
+ this.serverURL = replServerStartMsg.getServerURL();
+ this.baseDn = replServerStartMsg.getBaseDn();
+ this.windowSize = replServerStartMsg.getWindowSize();
+ this.serverState = replServerStartMsg.getServerState();
+ this.sslEncryption = replServerStartMsg.getSSLEncryption();
+ this.degradedStatusThreshold =
+ replServerStartMsg.getDegradedStatusThreshold();
+ }
+
+ /**
+ * Constructs a ServerInfo object wrapping a ReplServerStartDSMsg.
+ * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
+ * wrap.
+ */
+ private ServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
+ {
+ this.protocolVersion = replServerStartDSMsg.getVersion();
+ this.generationId = replServerStartDSMsg.getGenerationId();
+ this.groupId = replServerStartDSMsg.getGroupId();
+ this.serverId = replServerStartDSMsg.getServerId();
+ this.serverURL = replServerStartDSMsg.getServerURL();
+ this.baseDn = replServerStartDSMsg.getBaseDn();
+ this.windowSize = replServerStartDSMsg.getWindowSize();
+ this.serverState = replServerStartDSMsg.getServerState();
+ this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
+ this.degradedStatusThreshold =
+ replServerStartDSMsg.getDegradedStatusThreshold();
+ this.weight = replServerStartDSMsg.getWeight();
+ this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
}
/**
@@ -326,6 +396,98 @@
{
return groupId;
}
+
+ /**
+ * Get the server protocol version.
+ * @return the protocolVersion
+ */
+ public short getProtocolVersion()
+ {
+ return protocolVersion;
+ }
+
+ /**
+ * Get the generation id.
+ * @return the generationId
+ */
+ public long getGenerationId()
+ {
+ return generationId;
+ }
+
+ /**
+ * Get the server id.
+ * @return the serverId
+ */
+ public int getServerId()
+ {
+ return serverId;
+ }
+
+ /**
+ * Get the server URL.
+ * @return the serverURL
+ */
+ public String getServerURL()
+ {
+ return serverURL;
+ }
+
+ /**
+ * Get the base dn.
+ * @return the baseDn
+ */
+ public String getBaseDn()
+ {
+ return baseDn;
+ }
+
+ /**
+ * Get the window size.
+ * @return the windowSize
+ */
+ public int getWindowSize()
+ {
+ return windowSize;
+ }
+
+ /**
+ * Get the ssl encryption.
+ * @return the sslEncryption
+ */
+ public boolean isSslEncryption()
+ {
+ return sslEncryption;
+ }
+
+ /**
+ * Get the degraded status threshold.
+ * @return the degradedStatusThreshold
+ */
+ public int getDegradedStatusThreshold()
+ {
+ return degradedStatusThreshold;
+ }
+
+ /**
+ * Get the weight.
+ * @return the weight. Null if this object is a wrapper for
+ * a ReplServerStartMsg.
+ */
+ public int getWeight()
+ {
+ return weight;
+ }
+
+ /**
+ * Get the connected DS number.
+ * @return the connectedDSNumber. Null if this object is a wrapper for
+ * a ReplServerStartMsg.
+ */
+ public int getConnectedDSNumber()
+ {
+ return connectedDSNumber;
+ }
}
private void connect()
@@ -342,10 +504,34 @@
}
/**
+ * Contacts all replication servers to get information from them and being
+ * able to choose the more suitable.
+ * @return the collected information.
+ */
+ private Map<String, ServerInfo> collectReplicationServersInfo() {
+
+ Map<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
+
+ for (String server : servers)
+ {
+ // Connect to server and get info about it
+ ServerInfo serverInfo = performPhaseOneHandshake(server, false);
+
+ // Store server info in list
+ if (serverInfo != null)
+ {
+ rsInfos.put(server, serverInfo);
+ }
+ }
+
+ return rsInfos;
+ }
+
+ /**
* Special aspects of connecting as ECL compared to connecting as data server
* are :
* - 1 single RS configured
- * - so no choice of the prefered RS
+ * - so no choice of the preferred RS
* - No same groupID polling
* - ?? Heartbeat
* - Start handshake is :
@@ -358,10 +544,10 @@
// FIXME:ECL List of RS to connect is for now limited to one RS only
String bestServer = this.servers.iterator().next();
- ReplServerStartMsg inReplServerStartMsg
+ ReplServerStartDSMsg inReplServerStartDSMsg
= performECLPhaseOneHandshake(bestServer, true);
- if (inReplServerStartMsg!=null)
+ if (inReplServerStartDSMsg!=null)
performECLPhaseTwoHandshake(bestServer);
}
@@ -392,8 +578,6 @@
*/
private void connectAsDataServer()
{
- HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
-
// May have created a broker with null replication domain for
// unit test purpose.
if (domain != null)
@@ -418,24 +602,12 @@
*/
if (debugEnabled())
TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
- " order to elect the prefered one");
- for (String server : servers)
- {
- // Connect to server and get reply message
- ReplServerStartMsg replServerStartMsg =
- performPhaseOneHandshake(server, false);
+ " order to elect the preferred one");
- // Store reply message info in list
- if (replServerStartMsg != null)
- {
- ServerInfo serverInfo =
- new ServerInfo(replServerStartMsg.getServerState(),
- replServerStartMsg.getGroupId());
- rsInfos.put(server, serverInfo);
- }
- } // for servers
+ // Get info from every available replication servers
+ Map<String, ServerInfo> rsInfos = collectReplicationServersInfo();
- ReplServerStartMsg replServerStartMsg = null;
+ ServerInfo serverInfo = null;
if (rsInfos.size() > 0)
{
@@ -446,19 +618,17 @@
// Best found, now initialize connection to this one (handshake phase 1)
if (debugEnabled())
TRACER.debugInfo(
- "phase 2 : will perform PhaseOneH with the prefered RS.");
- replServerStartMsg = performPhaseOneHandshake(bestServer, true);
+ "phase 2 : will perform PhaseOneH with the preferred RS.");
+ serverInfo = performPhaseOneHandshake(bestServer, true);
- if (replServerStartMsg != null) // Handshake phase 1 exchange went well
+ if (serverInfo != null) // Handshake phase 1 exchange went well
{
- ServerInfo bestServerInfo = rsInfos.get(bestServer);
-
// Compute in which status we are starting the session to tell the RS
ServerStatus initStatus =
- computeInitialServerStatus(replServerStartMsg.getGenerationId(),
- bestServerInfo.getServerState(),
- replServerStartMsg.getDegradedStatusThreshold(),
+ computeInitialServerStatus(serverInfo.getGenerationId(),
+ serverInfo.getServerState(),
+ serverInfo.getDegradedStatusThreshold(),
this.getGenerationID());
// Perfom session start (handshake phase 2)
@@ -485,7 +655,7 @@
* reconnection at that time to retrieve a server with our group
* id.
*/
- byte tmpRsGroupId = bestServerInfo.getGroupId();
+ byte tmpRsGroupId = serverInfo.getGroupId();
boolean someServersWithSameGroupId =
hasSomeServerWithSameGroupId(topologyMsg.getRsList());
@@ -493,10 +663,10 @@
if ((tmpRsGroupId == groupId) ||
((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
{
- replicationServer = tmpReadableServerName;
- maxSendWindow = replServerStartMsg.getWindowSize();
- rsGroupId = replServerStartMsg.getGroupId();
- rsServerId = replServerStartMsg.getServerId();
+ replicationServer = session.getReadableRemoteAddress();
+ maxSendWindow = serverInfo.getWindowSize();
+ rsGroupId = serverInfo.getGroupId();
+ rsServerId = serverInfo.getServerId();
rsServerUrl = bestServer;
// May have created a broker with null replication domain for
@@ -504,8 +674,8 @@
if (domain != null)
{
domain.sessionInitiated(
- initStatus, replServerStartMsg.getServerState(),
- replServerStartMsg.getGenerationId(),
+ initStatus, serverInfo.getServerState(),
+ serverInfo.getGenerationId(),
session);
}
receiveTopo(topologyMsg);
@@ -524,7 +694,7 @@
startSameGroupIdPoller();
}
startRSHeartBeatMonitoring();
- if (replServerStartMsg.getVersion()
+ if (serverInfo.getProtocolVersion()
>= ProtocolVersion.REPLICATION_PROTOCOL_V3)
{
startChangeTimeHeartBeatPublishing();
@@ -584,8 +754,8 @@
rcvWindow = maxRcvWindow;
connectPhaseLock.notify();
- if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) ||
- (replServerStartMsg.getGenerationId() == -1))
+ if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
+ (serverInfo.getGenerationId() == -1))
{
Message message =
NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
@@ -602,7 +772,7 @@
baseDn.toString(),
replicationServer,
Long.toString(this.getGenerationID()),
- Long.toString(replServerStartMsg.getGenerationId()));
+ Long.toString(serverInfo.getGenerationId()));
logError(message);
}
} else
@@ -709,19 +879,19 @@
/**
* Connect to the provided server performing the first phase handshake
* (start messages exchange) and return the reply message from the replication
- * server.
+ * server, wrapped in a ServerInfo object.
*
* @param server Server to connect to.
* @param keepConnection Do we keep session opened or not after handshake.
* Use true if want to perform handshake phase 2 with the same session
* and keep the session to create as the current one.
- * @return The ReplServerStartMsg the server replied. Null if could not
+ * @return The answer from the server . Null if could not
* get an answer.
*/
- private ReplServerStartMsg performPhaseOneHandshake(String server,
+ private ServerInfo performPhaseOneHandshake(String server,
boolean keepConnection)
{
- ReplServerStartMsg replServerStartMsg = null;
+ ServerInfo serverInfo = null;
// Parse server string.
int separator = server.lastIndexOf(':');
@@ -738,8 +908,6 @@
int intPort = Integer.parseInt(port);
InetSocketAddress serverAddr = new InetSocketAddress(
InetAddress.getByName(hostname), intPort);
- if (keepConnection)
- tmpReadableServerName = serverAddr.toString();
Socket socket = new Socket();
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
@@ -759,19 +927,23 @@
localSession.publish(serverStartMsg);
/*
- * Read the ReplServerStartMsg that should come back.
+ * Read the ReplServerStartMsg or ReplServerStartDSMsg that should come
+ * back.
*/
- replServerStartMsg = (ReplServerStartMsg) localSession.receive();
+ ReplicationMsg msg = localSession.receive();
if (debugEnabled())
- {
- TRACER.debugInfo("In RB for " + baseDn +
- "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
- "\nAND RECEIVED:\n" + replServerStartMsg.toString());
- }
+ {
+ TRACER.debugInfo("In RB for " + baseDn +
+ "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() +
+ "\nAND RECEIVED:\n" + msg.toString());
+ }
+
+ // Wrap received message in a server info object
+ serverInfo = ServerInfo.newServerInfo(msg);
// Sanity check
- String repDn = replServerStartMsg.getBaseDn();
+ String repDn = serverInfo.getBaseDn();
if (!(this.baseDn.equals(repDn)))
{
Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -786,7 +958,7 @@
* if it is an old replication server).
*/
protocolVersion = ProtocolVersion.minWithCurrent(
- replServerStartMsg.getVersion());
+ serverInfo.getProtocolVersion());
localSession.setProtocolVersion(protocolVersion);
@@ -839,10 +1011,25 @@
{
if (localSession != null)
{
+ if (debugEnabled())
+ TRACER.debugInfo("In RB, closing session after phase 1");
+
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end communications
+ if (!error)
+ {
+ try
+ {
+ localSession.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
+ }
try
{
- if (debugEnabled())
- TRACER.debugInfo("In RB, closing session after phase 1");
localSession.close();
} catch (IOException e)
{
@@ -852,7 +1039,7 @@
}
if (error)
{
- replServerStartMsg = null;
+ serverInfo = null;
} // Be sure to return null.
}
@@ -864,7 +1051,7 @@
session = localSession;
}
- return replServerStartMsg;
+ return serverInfo;
}
/**
@@ -876,13 +1063,13 @@
* @param keepConnection Do we keep session opened or not after handshake.
* Use true if want to perform handshake phase 2 with the same session
* and keep the session to create as the current one.
- * @return The ReplServerStartMsg the server replied. Null if could not
+ * @return The ReplServerStartDSMsg the server replied. Null if could not
* get an answer.
*/
- private ReplServerStartMsg performECLPhaseOneHandshake(String server,
+ private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
boolean keepConnection)
{
- ReplServerStartMsg replServerStartMsg = null;
+ ReplServerStartDSMsg replServerStartDSMsg = null;
// Parse server string.
int separator = server.lastIndexOf(':');
@@ -899,8 +1086,6 @@
int intPort = Integer.parseInt(port);
InetSocketAddress serverAddr = new InetSocketAddress(
InetAddress.getByName(hostname), intPort);
- if (keepConnection)
- tmpReadableServerName = serverAddr.toString();
Socket socket = new Socket();
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
@@ -920,17 +1105,17 @@
localSession.publish(serverStartECLMsg);
// Read the ReplServerStartMsg that should come back.
- replServerStartMsg = (ReplServerStartMsg) localSession.receive();
+ replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive();
if (debugEnabled())
{
TRACER.debugInfo("In RB for " + baseDn +
"\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
- "\nAND RECEIVED:\n" + replServerStartMsg.toString());
+ "\nAND RECEIVED:\n" + replServerStartDSMsg.toString());
}
// Sanity check
- String repDn = replServerStartMsg.getBaseDn();
+ String repDn = replServerStartDSMsg.getBaseDn();
if (!(this.baseDn.equals(repDn)))
{
Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
@@ -946,7 +1131,7 @@
*/
if (keepConnection)
protocolVersion = ProtocolVersion.minWithCurrent(
- replServerStartMsg.getVersion());
+ replServerStartDSMsg.getVersion());
localSession.setProtocolVersion(protocolVersion);
if (!isSslEncryption)
@@ -998,10 +1183,22 @@
{
if (localSession != null)
{
+ if (debugEnabled())
+ TRACER.debugInfo("In RB, closing session after phase 1");
+
+ // V4 protocol introduces a StopMsg to properly end communications
+ if (!error)
+ {
+ try
+ {
+ localSession.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
try
{
- if (debugEnabled())
- TRACER.debugInfo("In RB, closing session after phase 1");
localSession.close();
} catch (IOException e)
{
@@ -1011,7 +1208,7 @@
}
if (error)
{
- replServerStartMsg = null;
+ replServerStartDSMsg = null;
} // Be sure to return null.
}
@@ -1023,7 +1220,7 @@
session = localSession;
}
- return replServerStartMsg;
+ return replServerStartDSMsg;
}
/**
@@ -1184,8 +1381,7 @@
* @return The computed best replication server.
*/
public static String computeBestReplicationServer(ServerState myState,
- HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn,
- byte groupId)
+ Map<String, ServerInfo> rsInfos, int serverId2, String baseDn, byte groupId)
{
/*
* Preference is given to servers with the requested group id:
@@ -1195,7 +1391,7 @@
*/
// Filter for servers with same group id
- HashMap<String, ServerInfo> sameGroupIdRsInfos =
+ Map<String, ServerInfo> sameGroupIdRsInfos =
new HashMap<String, ServerInfo>();
for (String repServer : rsInfos.keySet())
@@ -1231,7 +1427,7 @@
* @return The computed best replication server.
*/
private static String searchForBestReplicationServer(ServerState myState,
- HashMap<String, ServerInfo> rsInfos, int serverId2, String baseDn)
+ Map<String, ServerInfo> rsInfos, int serverId2, String baseDn)
{
/*
* Find replication servers who are up to date (or more up to date than us,
@@ -1266,7 +1462,7 @@
HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
/*
- * Start loop to differenciate up to date servers from late ones.
+ * Start loop to differentiate up to date servers from late ones.
*/
ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId2);
if (myChangeNumber == null)
@@ -1321,6 +1517,7 @@
if (ReplicationServer.isLocalReplicationServer(upServer))
{
localRS = true;
+ break;
}
}
if (localRS)
@@ -1459,7 +1656,8 @@
new HeartbeatMonitor("Replication Heartbeat Monitor on RS " +
getReplicationServer() + " " + rsServerId + " for " + baseDn +
" in DS " + serverId,
- session, heartbeatInterval);
+ session, heartbeatInterval, (protocolVersion >=
+ ProtocolVersion.REPLICATION_PROTOCOL_V4));
heartbeatMonitor.start();
}
}
@@ -1513,16 +1711,28 @@
*/
public void reStart(ProtocolSession failingSession)
{
- try
+
+ if (failingSession != null)
{
- if (failingSession != null)
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end communications
+ try
+ {
+ failingSession.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
+ try
{
failingSession.close();
- numLostConnections++;
+ } catch (IOException e1)
+ {
+ // ignore
}
- } catch (IOException e1)
- {
- // ignore
+ numLostConnections++;
}
if (failingSession == session)
@@ -1708,6 +1918,19 @@
TopologyMsg topoMsg = (TopologyMsg)msg;
receiveTopo(topoMsg);
}
+ else if (msg instanceof StopMsg)
+ {
+ /*
+ * RS performs a proper disconnection
+ */
+ Message message =
+ NOTE_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(replicationServer,
+ Integer.toString(rsServerId), baseDn.toString(),
+ Integer.toString(serverId));
+ logError(message);
+ // Try to find a suitable RS
+ this.reStart(failingSession);
+ }
else
{
return msg;
@@ -1723,10 +1946,10 @@
{
/*
- * If we did not initiate the close on our side, log a message.
+ * We did not initiate the close on our side, log an error message.
*/
Message message =
- NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer,
+ ERR_REPLICATION_SERVER_BADLY_DISCONNECTED.get(replicationServer,
Integer.toString(rsServerId), baseDn.toString(),
Integer.toString(serverId));
logError(message);
@@ -1783,14 +2006,26 @@
rsGroupId = (byte) -1;
rsServerId = -1;
rsServerUrl = null;
- try
+
+ if (session != null)
{
- if (session != null)
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end communications
+ try
+ {
+ session.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
+ try
{
session.close();
+ } catch (IOException e)
+ {
}
- } catch (IOException e)
- {
}
}
@@ -1896,7 +2131,7 @@
Collection<String> replicationServers, int window, long heartbeatInterval,
byte groupId)
{
- // These parameters needs to be renegociated with the ReplicationServer
+ // These parameters needs to be renegotiated with the ReplicationServer
// so if they have changed, that requires restarting the session with
// the ReplicationServer.
Boolean needToRestartSession = false;
@@ -1945,7 +2180,7 @@
private boolean debugEnabled()
{
- return true;
+ return false;
}
private static final void debugInfo(String s)
@@ -2057,13 +2292,13 @@
continue;
// Connect to server and get reply message
- ReplServerStartMsg replServerStartMsg =
+ ServerInfo serverInfo =
performPhaseOneHandshake(server, false);
- // Store reply message info in list
- if (replServerStartMsg != null)
+ // Is it a server with our group id ?
+ if (serverInfo != null)
{
- if (groupId == replServerStartMsg.getGroupId())
+ if (groupId == serverInfo.getGroupId())
{
// Found one server with the same group id as us, disconnect
// session to force reconnection to a server with same group
@@ -2072,6 +2307,20 @@
Byte.toString(groupId), baseDn.toString(),
Integer.toString(serverId));
logError(message);
+
+ if (protocolVersion >=
+ ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end
+ // communications
+ try
+ {
+ session.publish(new StopMsg());
+ } catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
try
{
session.close();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index db42214..9b8f67d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -1245,7 +1245,7 @@
* from which this server can be initialized.
*
* @param targetString The string representing the source
- * @return The source as a short value
+ * @return The source as a integer value
* @throws DirectoryException if the string is not valid
*/
public int decodeTarget(String targetString)
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
index cf503e4..77f0fce 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -3445,7 +3445,7 @@
SortedSet<String> replServers = new TreeSet<String>();
replServers.add("localhost:"+replicationServerPort);
DomainFakeCfg domainConf =
- new DomainFakeCfg(baseDn2, (short) 1702, replServers);
+ new DomainFakeCfg(baseDn2, 1702, replServers);
SortedSet<String> includeAttributes = new TreeSet<String>();
includeAttributes.add("sn");
domainConf.setEclIncludes(includeAttributes);
@@ -3457,7 +3457,7 @@
TEST_ROOT_DN_STRING3, TEST_BACKEND_ID3);
DN baseDn3 = DN.decode(TEST_ROOT_DN_STRING3);
domainConf =
- new DomainFakeCfg(baseDn3, (short) 1703, replServers);
+ new DomainFakeCfg(baseDn3, 1703, replServers);
includeAttributes = new TreeSet<String>();
includeAttributes.add("objectclass");
domainConf.setEclIncludes(includeAttributes);
@@ -3466,7 +3466,7 @@
replicationPlugin3.completeSynchronizationProvider();
domainConf =
- new DomainFakeCfg(baseDn2, (short) 1704, replServers);
+ new DomainFakeCfg(baseDn2, 1704, replServers);
includeAttributes = new TreeSet<String>();
includeAttributes.add("cn");
domainConf.setEclIncludes(includeAttributes);
@@ -3475,7 +3475,7 @@
Set<String> attrList = new HashSet<String>();
attrList.add(new String("cn"));
ReplicationBroker server01 = openReplicationSession(
- DN.decode(TEST_ROOT_DN_STRING2), (short) 1206,
+ DN.decode(TEST_ROOT_DN_STRING2), 1206,
100, replicationServerPort,
1000, true, -1 , domain21);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 56abd87..7529f60 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -36,11 +36,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.StringTokenizer;
import java.util.UUID;
-import java.util.concurrent.locks.Lock;
-import org.opends.server.types.ResultCode;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -69,13 +66,14 @@
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeValue;
import org.testng.annotations.BeforeClass;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
-import org.opends.server.types.LockManager;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.testng.annotations.DataProvider;
@@ -478,8 +476,14 @@
session.stopEncryption();
}
- // Read start session
- StartSessionMsg startSessionMsg = (StartSessionMsg) session.receive();
+ // Read start session or stop
+ ReplicationMsg msg = session.receive();
+ if (msg instanceof StopMsg){
+ // Disconnection of DS looking for best server
+ return false;
+ }
+
+ StartSessionMsg startSessionMsg = (StartSessionMsg)msg;
// Sanity checking for assured parameters
boolean receivedIsAssured = startSessionMsg.isAssured();
@@ -505,7 +509,8 @@
} catch (IOException e)
{
- // Probably un-connection of DS looking for best server
+ fail("Unexpected io exception in fake replication server handshake " +
+ "processing: " + e);
return false;
} catch (Exception e)
{
@@ -1364,9 +1369,9 @@
// assertFalse(ackMsg.hasTimeout());
// assertTrue(ackMsg.hasReplayError());
// assertFalse(ackMsg.hasWrongStatus());
-// List<Short> failedServers = ackMsg.getFailedServers();
+// List<Integer> failedServers = ackMsg.getFailedServers();
// assertEquals(failedServers.size(), 1);
-// assertEquals((short)failedServers.get(0), (short)1);
+// assertEquals((integer)failedServers.get(0), (integer)1);
} finally
{
endTest();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
index 825307a..349767e 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -40,6 +40,7 @@
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.testng.annotations.Test;
/**
@@ -98,7 +99,10 @@
aState.update(cn);
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -143,7 +147,10 @@
aState.update(cn);
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -190,7 +197,10 @@
aState.update(cn);
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -237,7 +247,10 @@
aState.update(cn);
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -285,7 +298,10 @@
aState.update(cn);
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -295,7 +311,10 @@
aState.update(cn);
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -345,7 +364,10 @@
aState.update(cn);
// This server has less changes than the other one but it has the same
// group id as us so he should be the winner
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -355,7 +377,10 @@
aState.update(cn);
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)2));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)2, 0);
+ rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -403,7 +428,10 @@
aState.update(cn);
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)2));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)2, 0);
+ rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -413,7 +441,10 @@
aState.update(cn);
cn = new ChangeNumber(2L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)2));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)2, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -462,7 +493,10 @@
aState.update(cn);
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -472,7 +506,10 @@
aState.update(cn);
cn = new ChangeNumber(3L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 3
aState = new ServerState();
@@ -482,7 +519,10 @@
aState.update(cn);
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -531,7 +571,10 @@
aState.update(cn);
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -541,7 +584,10 @@
aState.update(cn);
cn = new ChangeNumber(3L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)2));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)2, 0);
+ rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 3
aState = new ServerState();
@@ -553,7 +599,10 @@
aState.update(cn);
// This server has less changes than looser2 but it has the same
// group id as us so he should be the winner
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -600,7 +649,10 @@
aState.update(cn);
cn = new ChangeNumber(1L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -648,7 +700,10 @@
aState.update(cn);
cn = new ChangeNumber(10L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -658,7 +713,10 @@
aState.update(cn);
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -707,7 +765,10 @@
aState.update(cn);
cn = new ChangeNumber(10L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -717,7 +778,10 @@
aState.update(cn);
cn = new ChangeNumber(0L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 3
aState = new ServerState();
@@ -727,7 +791,10 @@
aState.update(cn);
cn = new ChangeNumber(10L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
@@ -780,7 +847,10 @@
aState.update(cn);
cn = new ChangeNumber(10L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER1, new ServerInfo(aState, (byte)1));
+ ReplServerStartMsg replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER1, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 2
aState = new ServerState();
@@ -790,7 +860,10 @@
aState.update(cn);
cn = new ChangeNumber(5L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER2, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER2, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 3
aState = new ServerState();
@@ -800,7 +873,10 @@
aState.update(cn);
cn = new ChangeNumber(10L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER3, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER3, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 4
aState = new ServerState();
@@ -810,7 +886,10 @@
aState.update(cn);
cn = new ChangeNumber(8L, 0, myId3);
aState.update(cn);
- rsInfos.put(WINNER, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(WINNER, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 5 (null one for our serverid)
aState = new ServerState();
@@ -818,7 +897,10 @@
aState.update(cn);
cn = new ChangeNumber(5L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER4, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER4, ServerInfo.newServerInfo(replServerStartMsg));
// State for server 6
aState = new ServerState();
@@ -828,7 +910,10 @@
aState.update(cn);
cn = new ChangeNumber(6L, 0, myId3);
aState.update(cn);
- rsInfos.put(LOOSER5, new ServerInfo(aState, (byte)1));
+ replServerStartMsg =
+ new ReplServerStartMsg(0, null, null, 0, aState, (short)0, 0L,
+ false, (byte)1, 0);
+ rsInfos.put(LOOSER5, ServerInfo.newServerInfo(replServerStartMsg));
String bestServer =
computeBestReplicationServer(mySt, rsInfos, myId1, " ", (byte)1);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index ce0b4ac..2744026 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -39,6 +39,7 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
@@ -849,7 +850,7 @@
AssuredType assuredType = null;
int assuredSdLevel = -100;
SortedSet<String> refUrls = null;
- SortedSet<String> attrs = new TreeSet<String>();
+ Set<String> eclIncludes = new HashSet<String>();
switch (dsId)
{
@@ -904,7 +905,7 @@
}
return new DSInfo(dsId, rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assMode,
- (byte)assuredSdLevel, groupId, urls, attrs);
+ (byte)assuredSdLevel, groupId, urls, eclIncludes);
}
/**
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
index af8f66a..a293fbe 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -889,7 +889,7 @@
{"1603303030303030303030303030303030313030303130303030303030300064633" +
"d746573740066616b65756e69717565696400000200301f0a0102301a040b646573" +
"6372697074696f6e310b04096e65772076616c756500",
- ModifyMsg.class, new ChangeNumber(1, (short) 0, (short) 1), "dc=test" },
+ ModifyMsg.class, new ChangeNumber(1, 0, 1), "dc=test" },
{"1803303030303031323366313238343132303030326430303030303037620064633" +
"d636f6d00756e69717565696400000201",
DeleteMsg.class, new ChangeNumber(0x123f1284120L,123,45), "dc=com"},
@@ -1092,11 +1092,11 @@
dsList4.add(dsInfo2);
dsList4.add(dsInfo1);
- RSInfo rsInfo1 = new RSInfo((short)4527, (long)45316, (byte)103);
+ RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
- RSInfo rsInfo2 = new RSInfo((short)4527, (long)0, (byte)0);
+ RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
- RSInfo rsInfo3 = new RSInfo((short)0, (long)-21113, (byte)98);
+ RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
List<RSInfo> rsList1 = new ArrayList<RSInfo>();
rsList1.add(rsInfo1);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 98a1e72..308a7df 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -886,6 +886,65 @@
newMsg.getDegradedStatusThreshold());
}
+ @DataProvider(name="createReplServerStartDSData")
+ public Object [][] createReplServerStartDSData() throws Exception
+ {
+ String baseDN = TEST_ROOT_DN_STRING;
+ ServerState state = new ServerState();
+ state.update(new ChangeNumber((long)0, 0, 0));
+ Object[] set1 = new Object[] {1, baseDN, 0, "localhost:8989", state, 0L, (byte)0, 0, 0, 0};
+
+ state = new ServerState();
+ state.update(new ChangeNumber((long)75, 5, 263));
+ Object[] set2 = new Object[] {16, baseDN, 100, "anotherHost:1025", state, 1245L, (byte)25, 3456, 3, 31512};
+
+ state = new ServerState();
+ state.update(new ChangeNumber((long)123, 5, 98));
+ Object[] set3 = new Object[] {36, baseDN, 100, "anotherHostAgain:8017", state, 6841L, (byte)32, 2496, 630, 9524};
+
+ return new Object [][] { set1, set2, set3 };
+ }
+
+ /**
+ * Test that ReplServerStartDSMsg encoding and decoding works
+ * by checking that : msg == new ReplServerStartMsg(msg.getBytes()).
+ */
+ @Test(dataProvider="createReplServerStartDSData")
+ public void replServerStartDSMsgTest(int serverId, String baseDN, int window,
+ String url, ServerState state, long genId, byte groupId, int degTh,
+ int weight, int connectedDSNumber) throws Exception
+ {
+ ReplServerStartDSMsg msg = new ReplServerStartDSMsg(serverId,
+ url, baseDN, window, state, ProtocolVersion.getCurrentVersion(), genId,
+ true, groupId, degTh, weight, connectedDSNumber);
+ ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes());
+ assertEquals(msg.getServerId(), newMsg.getServerId());
+ assertEquals(msg.getServerURL(), newMsg.getServerURL());
+ assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
+ assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
+ assertEquals(msg.getServerState().getMaxChangeNumber(1),
+ newMsg.getServerState().getMaxChangeNumber(1));
+ assertEquals(msg.getVersion(), newMsg.getVersion());
+ assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
+ assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
+ assertTrue(msg.getGroupId() == newMsg.getGroupId());
+ assertTrue(msg.getDegradedStatusThreshold() ==
+ newMsg.getDegradedStatusThreshold());
+ assertEquals(msg.getWeight(), newMsg.getWeight());
+ assertEquals(msg.getConnectedDSNumber(), newMsg.getConnectedDSNumber());
+ }
+
+ /**
+ * Test that StopMsg encoding and decoding works
+ * by checking that : msg == new StopMsg(msg.getBytes()).
+ */
+ @Test
+ public void stopMsgTest() throws Exception
+ {
+ StopMsg msg = new StopMsg();
+ StopMsg newMsg = new StopMsg(msg.getBytes());
+ }
+
/**
* Test that WindowMsg encoding and decoding works
* by checking that : msg == new WindowMsg(msg.getBytes()).
@@ -1457,8 +1516,7 @@
new HashMap<AttributeType,List<Attribute>>();
opList.put(attr.getAttributeType(), operationalAttributes);
- ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
- (short) 123, (short) 45);
+ ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), 123, 45);
DN dn = DN.decode(rawDN);
for (int i=1;i<perfRep;i++)
@@ -1538,8 +1596,7 @@
long buildnew = 0;
long t1,t2,t3,t31,t4,t5,t6 = 0;
- ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
- (short) 123, (short) 45);
+ ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), 123, 45);
DN dn = DN.decode(rawdn);
for (int i=1;i<perfRep;i++)
@@ -1627,8 +1684,7 @@
DeleteOperationBasis opBasis =
new DeleteOperationBasis(connection, 1, 1,null, DN.decode(rawDN));
LocalBackendDeleteOperation op = new LocalBackendDeleteOperation(opBasis);
- ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
- (short) 123, (short) 45);
+ ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), 123, 45);
op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid"));
t2 = System.nanoTime();
createop += (t2 - t1);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
index a5f21ce..d277405 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2007-2008 Sun Microsystems, Inc.
+ * Copyright 2007-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -60,8 +60,11 @@
// Threshold for status analyzers
private int degradedStatusThreshold = 5000;
+ // The weight of the server
+ private int weight = 1;
+
/**
- * Constructor without assured info
+ * Constructor without goup id, assured info and weight
*/
public ReplServerFakeConfiguration(
int port, String dirName, int purgeDelay, int serverId,
@@ -103,7 +106,7 @@
}
/**
- * Constructor with assured info
+ * Constructor with group id and assured info
*/
public ReplServerFakeConfiguration(
int port, String dirName, int purgeDelay, int serverId,
@@ -117,6 +120,19 @@
}
/**
+ * Constructor with group id, assured info and weight
+ */
+ public ReplServerFakeConfiguration(
+ int port, String dirName, int purgeDelay, int serverId,
+ int queueSize, int windowSize, SortedSet<String> servers,
+ int groupId, long assuredTimeout, int degradedStatusThreshold, int weight)
+ {
+ this(port, dirName, purgeDelay, serverId, queueSize, windowSize, servers,
+ groupId, assuredTimeout, degradedStatusThreshold);
+ this.weight = weight;
+ }
+
+ /**
* {@inheritDoc}
*/
public void addChangeListener(
@@ -233,4 +249,9 @@
this.degradedStatusThreshold = degradedStatusThreshold;
}
+ public int getWeight()
+ {
+ return weight;
+ }
+
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 0ed272c..d4e3302 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -73,6 +73,7 @@
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -1006,10 +1007,10 @@
// Read the Replication Server state from the ReplServerStartMsg that
// comes back.
- ReplServerStartMsg replStartMsg =
- (ReplServerStartMsg) session.receive();
- int serverwindow = replStartMsg.getWindowSize();
- ServerState replServerState = replStartMsg.getServerState();
+ ReplServerStartDSMsg replStartDSMsg =
+ (ReplServerStartDSMsg) session.receive();
+ int serverwindow = replStartDSMsg.getWindowSize();
+ ServerState replServerState = replStartDSMsg.getServerState();
if (!sslEncryption)
{
@@ -1052,9 +1053,9 @@
sslEncryption, (byte)10);
session.publish(msg);
- // Read the ReplServerStartMsg that should come back.
+ // Read the ReplServerStartDSMsg that should come back.
repMsg = session.receive();
- assertTrue(repMsg instanceof ReplServerStartMsg);
+ assertTrue(repMsg instanceof ReplServerStartDSMsg);
if (!sslEncryption)
{
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 32dbfe0..6940355 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -464,15 +464,15 @@
}
String exportedData=exportedDataBuilder.toString();
domain1 = new FakeReplicationDomain(
- testService, (short) 1, servers1,
+ testService, 1, servers1,
100, 0, exportedData, null, ENTRYCOUNT);
StringBuilder importedData = new StringBuilder();
domain2 = new FakeReplicationDomain(
- testService, (short) 2, servers2, 100, 0,
+ testService, 2, servers2, 100, 0,
null, importedData, 0);
- domain2.initializeFromRemote((short)1);
+ domain2.initializeFromRemote(1);
int count = 0;
while ((importedData.length() < exportedData.length()) && (count < 500))
--
Gitblit v1.10.0