From b45a7bf251b59ef156cfd7f3235384ac8835fcd4 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 24 May 2007 13:14:06 +0000
Subject: [PATCH] [Issue 1085] Synchronization protocol must be extensible
---
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 36 +++
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java | 76 +++++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java | 42 +++++
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java | 30 +-
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 25 +-
opends/src/server/org/opends/server/tasks/InitializeTask.java | 5
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 6
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 27 +++
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java | 3
opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java | 30 ++-
opends/src/server/org/opends/server/replication/protocol/StartMessage.java | 150 ++++++++++++++++++
11 files changed, 376 insertions(+), 54 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index 913e039..a27dc96 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -33,10 +33,6 @@
import static org.opends.server.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.TreeSet;
-import java.util.concurrent.Semaphore;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
@@ -44,6 +40,10 @@
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -52,11 +52,12 @@
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartMessage;
+import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ServerStartMessage;
import org.opends.server.replication.protocol.SocketSession;
-import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.types.DN;
@@ -99,6 +100,7 @@
private int halfRcvWindow;
private int maxRcvWindow;
private int timeout = 0;
+ private short protocolVersion;
/**
* The time in milliseconds between heartbeats from the replication
@@ -155,6 +157,7 @@
this.maxRcvWindow = window;
this.halfRcvWindow = window/2;
this.heartbeatInterval = heartbeatInterval;
+ this.protocolVersion = ProtocolVersion.currentVersion();
}
/**
@@ -230,7 +233,8 @@
*/
ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
- halfRcvWindow*2, heartbeatInterval, state);
+ halfRcvWindow*2, heartbeatInterval, state,
+ protocolVersion);
session.publish(msg);
@@ -239,6 +243,14 @@
*/
session.setSoTimeout(1000);
startMsg = (ReplServerStartMessage) session.receive();
+
+ /*
+ * We have sent our own protocol version to the replication server.
+ * The replication server will use the same one (or an older one
+ * if it is an old replication server).
+ */
+ protocolVersion = ProtocolVersion.minWithCurrent(
+ startMsg.getVersion());
session.setSoTimeout(timeout);
/*
@@ -727,4 +739,14 @@
// session with the replicationServer or renegociate the parameters that
// were sent in the ServerStart message
}
+
+ /**
+ * Get the version of the replication protocol.
+ * @return The version of the replication protocol.
+ */
+ public short getProtocolVersion()
+ {
+ return protocolVersion;
+ }
+
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
new file mode 100644
index 0000000..7d791c0
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -0,0 +1,76 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+
+/**
+ * The version utility class for the replication protocol.
+ */
+public class ProtocolVersion
+{
+ /**
+ * Get the version included in the Start Message mean the replication
+ * protocol version used by the server that created the message.
+ *
+ * @return The version used by the server that created the message.
+ */
+ static short CURRENT_VERSION = 1;
+
+ /**
+ * Specifies the current version of the replication protocol.
+ *
+ * @return The current version of the protocol.
+ */
+ public static short currentVersion()
+ {
+ return CURRENT_VERSION;
+ }
+
+ /**
+ * For test purpose.
+ * @param currentVersion The provided current version.
+ */
+ public static void setCurrentVersion(short currentVersion)
+ {
+ CURRENT_VERSION = currentVersion;
+ }
+
+ /**
+ * Specifies the oldest version of the protocol from the provided one
+ * and the current one.
+ *
+ * @param version The version to be compared to the current one.
+ * @return The minimal protocol version.
+ */
+ public static short minWithCurrent(short version)
+ {
+ Short sVersion = Short.valueOf(version);
+ Short newVersion = (sVersion<CURRENT_VERSION?sVersion:CURRENT_VERSION);
+ return newVersion;
+ }
+}
+
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
index 4b0e8ff..013d4cd 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
@@ -38,7 +38,7 @@
* Message sent by a replication server to another replication server
* at Startup.
*/
-public class ReplServerStartMessage extends ReplicationMessage implements
+public class ReplServerStartMessage extends StartMessage implements
Serializable
{
private static final long serialVersionUID = -5871385537169856856L;
@@ -58,11 +58,14 @@
* @param baseDn base DN for which the ReplServerStartMessage is created.
* @param windowSize The window size.
* @param serverState our ServerState for this baseDn.
+ * @param protocolVersion The replication protocol version of the creator.
*/
public ReplServerStartMessage(short serverId, String serverURL, DN baseDn,
int windowSize,
- ServerState serverState)
+ ServerState serverState,
+ short protocolVersion)
{
+ super(protocolVersion);
this.serverId = serverId;
this.serverURL = serverURL;
if (baseDn != null)
@@ -85,13 +88,12 @@
/* The ReplServerStartMessage is encoded in the form :
* <baseDn><ServerId><ServerUrl><windowsize><ServerState>
*/
+ super(MSG_TYPE_REPL_SERVER_START, in);
+
try
{
- /* first byte is the type */
- if (in[0] != MSG_TYPE_REPL_SERVER_START)
- throw new DataFormatException(
- "input is not a valid ReplServerStartMsg");
- int pos = 1;
+ /* first bytes are the header */
+ int pos = headerLength;
/* read the dn
* first calculate the length then construct the string
@@ -193,15 +195,13 @@
byte[] byteServerState = serverState.getBytes();
byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
- int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
- byteServerUrl.length + 1 + byteWindowSize.length + 1 +
- byteServerState.length + 1;
+ int length = byteDn.length + 1 + byteServerId.length + 1 +
+ byteServerUrl.length + 1 + byteWindowSize.length + 1 +
+ byteServerState.length + 1;
- byte[] resultByteArray = new byte[length];
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_REPL_SERVER_START;
- int pos = 1;
+ /* encode the header in a byte[] large enough to also contain the mods */
+ byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length);
+ int pos = headerLength;
/* put the baseDN and a terminating 0 */
pos = addByteArray(byteDn, resultByteArray, pos);
diff --git a/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java b/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
index cdd9df2..a046356 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
@@ -33,15 +33,15 @@
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
-import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
/**
* This message is used by LDAP server when they first connect.
* to a replication server to let them know who they are and what is their state
* (their RUV)
*/
-public class ServerStartMessage extends ReplicationMessage implements
+public class ServerStartMessage extends StartMessage implements
Serializable
{
private static final long serialVersionUID = 8649393307038290287L;
@@ -75,13 +75,17 @@
* @param windowSize The window size used by this server.
* @param heartbeatInterval The requested heartbeat interval.
* @param serverState The state of this server.
+ * @param protocolVersion The replication protocol version of the creator.
*/
public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
int maxReceiveQueue, int maxSendDelay,
int maxSendQueue, int windowSize,
long heartbeatInterval,
- ServerState serverState)
+ ServerState serverState,
+ short protocolVersion)
{
+ super(protocolVersion);
+
this.serverId = serverId;
this.baseDn = baseDn.toString();
this.maxReceiveDelay = maxReceiveDelay;
@@ -113,16 +117,16 @@
*/
public ServerStartMessage(byte[] in) throws DataFormatException
{
+ super(MSG_TYPE_SERVER_START, in);
+
/* The ServerStartMessage is encoded in the form :
- * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
+ * <header><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue>
* <maxSendDelay><maxSendQueue><window><heartbeatInterval><ServerState>
*/
try
{
- /* first byte is the type */
- if (in[0] != MSG_TYPE_SERVER_START)
- throw new DataFormatException("input is not a valid ServerStart msg");
- int pos = 1;
+ /* first bytes are the header */
+ int pos = headerLength;
/*
* read the dn
@@ -306,7 +310,7 @@
String.valueOf(heartbeatInterval).getBytes("UTF-8");
byte[] byteServerState = serverState.getBytes();
- int length = 1 + byteDn.length + 1 + byteServerId.length + 1 +
+ int length = byteDn.length + 1 + byteServerId.length + 1 +
byteServerUrl.length + 1 +
byteMaxRecvDelay.length + 1 +
byteMaxRecvQueue.length + 1 +
@@ -316,11 +320,9 @@
byteHeartbeatInterval.length + 1 +
byteServerState.length + 1;
- byte[] resultByteArray = new byte[length];
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_SERVER_START;
- int pos = 1;
+ /* encode the header in a byte[] large enough to also contain the mods */
+ byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length);
+ int pos = headerLength;
pos = addByteArray(byteDn, resultByteArray, pos);
diff --git a/opends/src/server/org/opends/server/replication/protocol/StartMessage.java b/opends/src/server/org/opends/server/replication/protocol/StartMessage.java
new file mode 100644
index 0000000..150d3d9
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/protocol/StartMessage.java
@@ -0,0 +1,150 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+
+/**
+ * This abstract message class is the superclass for start messages used
+ * by LDAP servers and Replication servers to initiate their communications.
+ * This class specifies a message header that contains the Replication
+ * Protocol version.
+ */
+public abstract class StartMessage extends ReplicationMessage
+{
+ private short protocolVersion;
+
+ /**
+ * The length of the header of this message.
+ */
+ protected int headerLength;
+
+ /**
+ * Create a new StartMessage.
+ *
+ * @param protocolVersion The Replication Protocol version of the server
+ * for which the StartMessage is created.
+ */
+ public StartMessage(short protocolVersion)
+ {
+ this.protocolVersion = protocolVersion;
+ }
+
+ /**
+ * Creates a new ServerStartMessage from its encoded form.
+ *
+ * @param type The type of the message to create.
+ * @param encodedMsg The byte array containing the encoded form of the
+ * StartMessage.
+ * @throws DataFormatException If the byte array does not contain a valid
+ * encoded form of the ServerStartMessage.
+ */
+ public StartMessage(byte type, byte [] encodedMsg) throws DataFormatException
+ {
+ headerLength = decodeHeader(type, encodedMsg);
+ }
+
+ /**
+ * Encode the header for the start message.
+ *
+ * @param type The type of the message to create.
+ * @param additionalLength additional length needed to encode the remaining
+ * part of the UpdateMessage.
+ * @return a byte array containing the common header and enough space to
+ * encode the reamining bytes of the UpdateMessage as was specified
+ * by the additionalLength.
+ * (byte array length = common header length + additionalLength)
+ * @throws UnsupportedEncodingException if UTF-8 is not supported.
+ */
+ public byte[] encodeHeader(byte type, int additionalLength)
+ throws UnsupportedEncodingException
+ {
+ byte[] versionByte = Short.toString(protocolVersion).getBytes("UTF-8");
+
+ /* The message header is stored in the form :
+ * <message type><protocol version>
+ */
+ int length = 1 + versionByte.length + 1 +
+ additionalLength;
+
+ byte[] encodedMsg = new byte[length];
+
+ /* put the type of the operation */
+ encodedMsg[0] = type;
+ int pos = 1;
+
+ /* put the protocol version */
+ headerLength = addByteArray(versionByte, encodedMsg, pos);
+
+ return encodedMsg;
+ }
+
+ /**
+ * Decode the Header part of this message, and check its type.
+ *
+ * @param type The type of this message.
+ * @param encodedMsg the encoded form of the message.
+ * @return the position at which the remaining part of the message starts.
+ * @throws DataFormatException if the encodedMsg does not contain a valid
+ * common header.
+ */
+ public int decodeHeader(byte type, byte [] encodedMsg)
+ throws DataFormatException
+ {
+ /* first byte is the type */
+ if (encodedMsg[0] != type)
+ throw new DataFormatException("byte[] is not a valid msg");
+
+ try
+ {
+ /* then read the version */
+ int pos = 1;
+ int length = getNextLength(encodedMsg, pos);
+ protocolVersion = Short.valueOf(
+ new String(encodedMsg, pos, length, "UTF-8"));
+ pos += length + 1;
+ return pos;
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+
+ }
+
+ /**
+ * Get the version included in the Start Message mean the replication
+ * protocol version used by the server that created the message.
+ *
+ * @return The version used by the server that created the message.
+ */
+ public short getVersion()
+ {
+ return protocolVersion;
+ }
+}
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 9a34d92..a098c4b 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -49,6 +49,7 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.HeartbeatThread;
@@ -118,6 +119,8 @@
private int saturationCount = 0;
private short replicationServerId;
+ private short protocolVersion;
+
/**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
@@ -146,6 +149,7 @@
super("Server Handler");
this.session = session;
this.maxQueueSize = queueSize;
+ this.protocolVersion = ProtocolVersion.currentVersion();
}
/**
@@ -175,20 +179,27 @@
{
if (baseDn != null)
{
+ // This is an outgoing connection. Publish our start message.
this.baseDn = baseDn;
replicationCache = replicationServer.getReplicationCache(baseDn);
ServerState localServerState = replicationCache.getDbServerState();
ReplServerStartMessage msg =
new ReplServerStartMessage(replicationServerId, replicationServerURL,
- baseDn, windowSize, localServerState);
+ baseDn, windowSize, localServerState,
+ protocolVersion);
session.publish(msg);
}
+ // Wait and process ServerStart or ReplServerStart
ReplicationMessage msg = session.receive();
if (msg instanceof ServerStartMessage)
{
+ // The remote server is an LDAP Server
ServerStartMessage receivedMsg = (ServerStartMessage) msg;
+
+ protocolVersion = ProtocolVersion.minWithCurrent(
+ receivedMsg.getVersion());
serverId = receivedMsg.getServerId();
serverURL = receivedMsg.getServerURL();
this.baseDn = receivedMsg.getBaseDn();
@@ -233,17 +244,22 @@
serverIsLDAPserver = true;
+ // This an incoming connection. Publish our start message
replicationCache = replicationServer.getReplicationCache(this.baseDn);
ServerState localServerState = replicationCache.getDbServerState();
ReplServerStartMessage myStartMsg =
new ReplServerStartMessage(replicationServerId, replicationServerURL,
- this.baseDn, windowSize, localServerState);
+ this.baseDn, windowSize, localServerState,
+ protocolVersion);
session.publish(myStartMsg);
sendWindowSize = receivedMsg.getWindowSize();
}
else if (msg instanceof ReplServerStartMessage)
{
+ // The remote server is a replication server
ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
+ protocolVersion = ProtocolVersion.minWithCurrent(
+ receivedMsg.getVersion());
serverId = receivedMsg.getServerId();
serverURL = receivedMsg.getServerURL();
int separator = serverURL.lastIndexOf(':');
@@ -255,14 +271,19 @@
{
replicationCache = replicationServer.getReplicationCache(this.baseDn);
ServerState serverState = replicationCache.getDbServerState();
+
+ // Publish our start message
ReplServerStartMessage outMsg =
new ReplServerStartMessage(replicationServerId,
replicationServerURL,
- this.baseDn, windowSize, serverState);
+ this.baseDn, windowSize, serverState,
+ protocolVersion);
session.publish(outMsg);
}
else
+ {
this.baseDn = baseDn;
+ }
this.serverState = receivedMsg.getServerState();
sendWindowSize = receivedMsg.getWindowSize();
}
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
index e586f78..c4cf747 100644
--- a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
+++ b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -133,9 +133,10 @@
}
catch(DirectoryException de)
{
+ // This log will go to the task log message
logError(ErrorLogCategory.TASK,
ErrorLogSeverity.SEVERE_ERROR,
- "Initialize Task stopped by error", 1);
+ "Initialize Task stopped by error" + de.getErrorMessage(), 1);
return TaskState.STOPPED_BY_ERROR;
}
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTask.java b/opends/src/server/org/opends/server/tasks/InitializeTask.java
index 2733b89..3f828c1 100644
--- a/opends/src/server/org/opends/server/tasks/InitializeTask.java
+++ b/opends/src/server/org/opends/server/tasks/InitializeTask.java
@@ -87,6 +87,10 @@
*/
@Override public void initializeTask() throws DirectoryException
{
+ if (TaskState.isDone(getTaskState()))
+ {
+ return;
+ }
// FIXME -- Do we need any special authorization here?
Entry taskEntry = getTaskEntry();
@@ -117,7 +121,6 @@
domain=ReplicationDomain.retrievesReplicationDomain(domainDN);
-
attrList = taskEntry.getAttribute(typeSourceScope);
String sourceString = TaskUtils.getSingleValueString(attrList);
source = domain.decodeSource(sourceString);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
index a6733f7..f6921f5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -28,10 +28,13 @@
package org.opends.server.replication;
import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
+import java.util.ArrayList;
import java.util.List;
import org.opends.server.TestCaseUtils;
@@ -42,8 +45,10 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
@@ -329,4 +334,39 @@
assertEquals(modOp.getResultCode(), ResultCode.SUCCESS);
}
}
+
+ @Test(enabled=true)
+ public void protocolVersion() throws Exception
+ {
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "Starting Replication ProtocolWindowTest : protocolVersion" , 1);
+
+ final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+
+ // Test : Make a broker degrade its version when connecting to an old
+ // replication server.
+ ProtocolVersion.setCurrentVersion((short)2);
+
+ ReplicationBroker broker = new ReplicationBroker(
+ new ServerState(),
+ baseDn,
+ (short) 13, 0, 0, 0, 0, 1000, 0);
+
+
+ // Check broker hard-coded version
+ short pversion = broker.getProtocolVersion();
+ assertEquals(pversion, 2);
+
+ // Connect the broker to the replication server
+ ProtocolVersion.setCurrentVersion((short)0);
+ ArrayList<String> servers = new ArrayList<String>(1);
+ servers.add("localhost:" + replServerPort);
+ broker.start(servers);
+ TestCaseUtils.sleep(100); // wait for connection established
+
+ // Check broker negociated version
+ pversion = broker.getProtocolVersion();
+ assertEquals(pversion, 0);
+ }
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index dd19136..d37f9cc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -340,18 +340,23 @@
DN.decode(synchroPluginStringDN)),
"Unable to add the Multimaster replication plugin");
-
- // Add the replication server
- DirectoryServer.getConfigHandler().addEntry(replServerEntry, null);
- assertNotNull(DirectoryServer.getConfigEntry(replServerEntry.getDN()),
+ if (replServerEntry != null)
+ {
+ // Add the replication server
+ DirectoryServer.getConfigHandler().addEntry(replServerEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(replServerEntry.getDN()),
"Unable to add the replication server");
- configEntryList.add(replServerEntry.getDN());
+ configEntryList.add(replServerEntry.getDN());
+ }
- // We also have a replicated suffix (replication domain)
- DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
- assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
- "Unable to add the synchronized server");
- configEntryList.add(synchroServerEntry.getDN());
+ if (synchroServerEntry != null)
+ {
+ // We also have a replicated suffix (replication domain)
+ DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+ "Unable to add the synchronized server");
+ configEntryList.add(synchroServerEntry.getDN());
+ }
}
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 669271b..d3a5557 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -486,7 +486,7 @@
{
state.update(new ChangeNumber((long)1, 1,(short)1));
ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
- window, window, window, window, window, window, state);
+ window, window, window, window, window, window, state, (short)1);
ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
assertEquals(msg.getServerId(), newMsg.getServerId());
assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -498,6 +498,7 @@
assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval());
assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
newMsg.getServerState().getMaxChangeNumber((short)1));
+ assertEquals(msg.getVersion(), newMsg.getVersion());
}
@DataProvider(name="changelogStart")
@@ -518,7 +519,7 @@
{
state.update(new ChangeNumber((long)1, 1,(short)1));
ReplServerStartMessage msg = new ReplServerStartMessage(serverId,
- url, baseDN, window, state);
+ url, baseDN, window, state, (short)1);
ReplServerStartMessage newMsg = new ReplServerStartMessage(msg.getBytes());
assertEquals(msg.getServerId(), newMsg.getServerId());
assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -526,6 +527,7 @@
assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
assertEquals(msg.getServerState().getMaxChangeNumber((short)1),
newMsg.getServerState().getMaxChangeNumber((short)1));
+ assertEquals(msg.getVersion(), newMsg.getVersion());
}
/**
--
Gitblit v1.10.0