From 1c8b422d63f419d8c85a28b1f2276ac0f3e3632c Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 12 Jul 2007 15:41:32 +0000
Subject: [PATCH] Fix for 1895 Summary: Total update does not work with 3 servers that are also replication servers
---
opends/src/server/org/opends/server/messages/ReplicationMessages.java | 35 ++
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java | 6
opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java | 142 +++++++++++
opends/src/server/org/opends/server/replication/server/ReplicationCache.java | 166 ++++++++++---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 3
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 30 +
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 82 +++++-
opends/src/server/org/opends/server/replication/server/ServerReader.java | 6
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java | 247 +++++++++++++++-----
10 files changed, 594 insertions(+), 125 deletions(-)
diff --git a/opends/src/server/org/opends/server/messages/ReplicationMessages.java b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
index 72f4ad0..388a21b 100644
--- a/opends/src/server/org/opends/server/messages/ReplicationMessages.java
+++ b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -450,6 +450,27 @@
CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 63;
/**
+ * An error happened to send a ReplServerInfoMessage to another
+ * replication server.
+ */
+ public static final int MSGID_CHANGELOG_ERROR_SENDING_INFO =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 64;
+
+ /**
+ * An error happened to send an ErrorMessage to another
+ * replication server.
+ */
+ public static final int MSGID_CHANGELOG_ERROR_SENDING_ERROR =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 65;
+
+ /**
+ * An error happened to send a Message (probably a RoutableMessage)
+ * to another replication server.
+ */
+ public static final int MSGID_CHANGELOG_ERROR_SENDING_MSG =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 66;
+
+ /**
* Register the messages from this class in the core server.
*
*/
@@ -530,8 +551,8 @@
"An unexpected error happened handling connection with %s." +
"This connection is going to be closed. ");
registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ACK,
- "An unexpected error happened sending an ack to %s." +
- "This connection is going to be closed. ");
+ "An unexpected error occurred while sending an ack to %s." +
+ "This connection is going to be closed and reopened. ");
registerMessage(
MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE,
"An Exception was caught while receiving replication message : %s");
@@ -617,5 +638,15 @@
registerMessage(MSGID_DISCONNECTED_FROM_CHANGELOG,
"The connection to Replication Server %s has been dropped by the "
+ "Replication Server");
+ registerMessage(MSGID_CHANGELOG_ERROR_SENDING_INFO,
+ "An unexpected error occurred while sending a Server " +
+ " Info message to %s. " +
+ "This connection is going to be closed and reopened");
+ registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ERROR,
+ "An unexpected error occurred while sending an Error Message to %s. "+
+ "This connection is going to be closed and reopened");
+ registerMessage(MSGID_CHANGELOG_ERROR_SENDING_MSG,
+ "An unexpected error occurred while sending a Message to %s. "+
+ "This connection is going to be closed and reopened");
}
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
new file mode 100644
index 0000000..b048b8a
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
@@ -0,0 +1,142 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.replication.protocol;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.DataFormatException;
+
+/**
+ *
+ * This class defines a message that is sent by a replication server
+ * to the other replication servers in the topology containing the list
+ * of LDAP servers directly connected to it.
+ * A replication server sends a ReplServerInfoMessage when an LDAP
+ * server connects or disconnects.
+ *
+ * Exchanging these messages allows to have each replication server
+ * knowing the complete list of LDAP servers in the topology and
+ * their associated replication server and thus take the appropriate
+ * decision to route a message to an LDAP server.
+ *
+ */
+public class ReplServerInfoMessage extends ReplicationMessage
+{
+ private List<String> connectedServers = null;
+
+ /**
+ * Creates a new changelogInfo message from its encoded form.
+ *
+ * @param in The byte array containing the encoded form of the message.
+ * @throws java.util.zip.DataFormatException If the byte array does not
+ * contain a valid encoded form of the message.
+ */
+ public ReplServerInfoMessage(byte[] in) throws DataFormatException
+ {
+ try
+ {
+ /* first byte is the type */
+ if (in.length < 1 || in[0] != MSG_TYPE_REPL_SERVER_INFO)
+ throw new DataFormatException(
+ "Input is not a valid changelogInfo Message.");
+
+ connectedServers = new ArrayList<String>();
+ int pos = 1;
+ while (pos < in.length)
+ {
+ /*
+ * Read the next server ID
+ * first calculate the length then construct the string
+ */
+ int length = getNextLength(in, pos);
+ connectedServers.add(new String(in, pos, length, "UTF-8"));
+ pos += length +1;
+ }
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+
+ /**
+ * Creates a new changelogInfo message from a list of the currently
+ * connected servers.
+ *
+ * @param connectedServers The list of currently connected servers ID.
+ */
+ public ReplServerInfoMessage(List<String> connectedServers)
+ {
+ this.connectedServers = connectedServers;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ try
+ {
+ ByteArrayOutputStream oStream = new ByteArrayOutputStream();
+
+ /* Put the message type */
+ oStream.write(MSG_TYPE_REPL_SERVER_INFO);
+ if (connectedServers.size() >= 1)
+ {
+ for (String server : connectedServers)
+ {
+ byte[] byteServerURL = server.getBytes("UTF-8");
+ oStream.write(byteServerURL);
+ oStream.write(0);
+ }
+ }
+ return oStream.toByteArray();
+ }
+ catch (IOException e)
+ {
+ // never happens
+ return null;
+ }
+ }
+
+ /**
+ * Get the list of servers currently connected to the Changelog server
+ * that generated this message.
+ *
+ * @return A collection of the servers currently connected to the Changelog
+ * server that generated this message.
+ */
+ public List<String> getConnectedServers()
+ {
+ return connectedServers;
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
index 15847ed..4bed982 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -53,6 +53,7 @@
static final byte MSG_TYPE_DONE = 13;
static final byte MSG_TYPE_ERROR = 14;
static final byte MSG_TYPE_WINDOW_PROBE = 15;
+ static final byte MSG_TYPE_REPL_SERVER_INFO = 16;
// Adding a new type of message here probably requires to
// change accordingly generateMsg method below
@@ -73,6 +74,8 @@
* MSG_TYPE_ENTRY
* MSG_TYPE_DONE
* MSG_TYPE_ERROR
+ * MSG_TYPE_WINDOW_PROBE
+ * MSG_TYPE_REPL_SERVER_INFO
*
* @return the byte[] representation of this message.
* @throws UnsupportedEncodingException When the encoding of the message
@@ -140,6 +143,9 @@
case MSG_TYPE_WINDOW_PROBE:
msg = new WindowProbe(buffer);
break;
+ case MSG_TYPE_REPL_SERVER_INFO:
+ msg = new ReplServerInfoMessage(buffer);
+ break;
default:
throw new DataFormatException("received message with unknown type");
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index 2fbc4da..24a167f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -43,9 +43,9 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.ErrorMessage;
-import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
+import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.DN;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -95,6 +95,7 @@
* We add new TreeSet in the HashMap when a new replication server register
* to this replication server.
*/
+
private Map<Short, ServerHandler> replicationServers =
new ConcurrentHashMap<Short, ServerHandler>();
@@ -253,6 +254,11 @@
return false;
}
connectedServers.put(handler.getServerId(), handler);
+
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
+
return true;
}
}
@@ -269,7 +275,13 @@
if (handler.isReplicationServer())
replicationServers.remove(handler.getServerId());
else
+ {
connectedServers.remove(handler.getServerId());
+
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
+ }
}
/**
@@ -312,6 +324,12 @@
return false;
}
replicationServers.put(handler.getServerId(), handler);
+
+ // Update this server with the list of LDAP servers
+ // already connected
+ handler.sendInfo(
+ new ReplServerInfoMessage(getConnectedLDAPservers()));
+
return true;
}
}
@@ -376,6 +394,22 @@
return sourceDbHandlers.keySet();
}
+ /**
+ * Returns as a set of String the list of LDAP servers connected to us.
+ * Each string is the serverID of a connected LDAP server.
+ *
+ * @return The set of connected LDAP servers
+ */
+ public List<String> getConnectedLDAPservers()
+ {
+ List<String> mySet = new ArrayList<String>(0);
+
+ for (ServerHandler handler : connectedServers.values())
+ {
+ mySet.add(String.valueOf(handler.getServerId()));
+ }
+ return mySet;
+ }
/**
* Creates and returns an iterator.
@@ -473,15 +507,9 @@
protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
ServerHandler senderHandler)
{
-
List<ServerHandler> servers =
new ArrayList<ServerHandler>();
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- "getDestinationServers"
- + " msgDest:" + msg.getDestination() , 1);
-
if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
{
// TODO Import from the "closest server" to be implemented
@@ -497,7 +525,7 @@
}
}
- // Send to all connected LDAP servers
+ // Sends to all connected LDAP servers
for (ServerHandler destinationHandler : connectedServers.values())
{
// Don't loop on the sender
@@ -518,14 +546,20 @@
else
{
// the targeted server is NOT connected
+ // Let's search for THE changelog server that MAY
+ // have the targeted server connected.
if (senderHandler.isLDAPserver())
{
- // let's forward to the other changelogs
- servers.addAll(replicationServers.values());
+ for (ServerHandler h : replicationServers.values())
+ {
+ if (h.isRemoteLDAPServer(msg.getDestination()))
+ {
+ servers.add(h);
+ }
+ }
}
}
}
-
return servers;
}
@@ -543,37 +577,53 @@
if (servers.isEmpty())
{
- if (!(msg instanceof InitializeRequestMessage))
- {
- // TODO A more elaborated policy is probably needed
- }
- else
- {
- ErrorMessage errMsg = new ErrorMessage(
- msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
- "serverID:" + msg.getDestination());
-
- try
- {
- senderHandler.send(errMsg);
- }
- catch(IOException ioe)
- {
- // TODO Handle error properly (sender timeout in addition)
- }
- }
- return;
- }
-
- for (ServerHandler targetHandler : servers)
- {
+ ErrorMessage errMsg = new ErrorMessage(
+ msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN,
+ "serverID:" + msg.getDestination());
try
{
- targetHandler.send(msg);
+ senderHandler.send(errMsg);
}
catch(IOException ioe)
{
// TODO Handle error properly (sender timeout in addition)
+ /*
+ * An error happened trying the send back an ack to this server.
+ * Log an error and close the connection to this server.
+ */
+ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ERROR;
+ String message = getMessage(msgID, this.toString())
+ + stackTraceToSingleLineString(ioe);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ senderHandler.shutdown();
+ }
+ }
+ else
+ {
+ for (ServerHandler targetHandler : servers)
+ {
+ try
+ {
+ targetHandler.send(msg);
+ }
+ catch(IOException ioe)
+ {
+ /*
+ * An error happened trying the send back an ack to this server.
+ * Log an error and close the connection to this server.
+ */
+ int msgID = MSGID_CHANGELOG_ERROR_SENDING_MSG;
+ String message = getMessage(msgID, this.toString())
+ + stackTraceToSingleLineString(ioe) + " "
+ + msg.getClass().getCanonicalName();
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ senderHandler.shutdown();
+ // TODO Handle error properly (sender timeout in addition)
+ }
}
}
@@ -722,4 +772,48 @@
}
return true;
}
+
+ /**
+ * Send a ReplServerInfoMessage to all the connected replication servers
+ * in order to let them know our connected LDAP servers.
+ */
+ private void sendReplServerInfo()
+ {
+ ReplServerInfoMessage info =
+ new ReplServerInfoMessage(getConnectedLDAPservers());
+ for (ServerHandler handler : replicationServers.values())
+ {
+ try
+ {
+ handler.sendInfo(info);
+ }
+ catch (IOException e)
+ {
+ /*
+ * An error happened trying the send back an ack to this server.
+ * Log an error and close the connection to this server.
+ */
+ int msgID = MSGID_CHANGELOG_ERROR_SENDING_INFO;
+ String message = getMessage(msgID, this.toString())
+ + stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ handler.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Sets the replication server informations for the provided
+ * handler from the provided ReplServerInfoMessage.
+ *
+ * @param handler The server handler from which the info was received.
+ * @param infoMsg The information message that was received.
+ */
+ public void setReplServerInfo(
+ ServerHandler handler, ReplServerInfoMessage infoMsg)
+ {
+ handler.setReplServerInfo(infoMsg);
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 8eb31b9..7027dda 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -357,7 +357,7 @@
* @param baseDn The base Dn for which the ReplicationCache must be returned.
* @return The ReplicationCache associated to the base DN given in parameter.
*/
- ReplicationCache getReplicationCache(DN baseDn)
+ public ReplicationCache getReplicationCache(DN baseDn)
{
ReplicationCache replicationCache;
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 7a0f72c..9dc3848 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -35,6 +35,7 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
+import java.util.List;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -61,6 +62,7 @@
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
+import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -129,6 +131,14 @@
private short protocolVersion;
+
+ /**
+ * When this Handler is connected to a changelog server this collection
+ * will contain the list of LDAP servers connected to the remote changelog
+ * server.
+ */
+ private List<String> remoteLDAPservers = new ArrayList<String>();
+
/**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
@@ -1342,18 +1352,70 @@
public void process(RoutableMessage msg)
{
if (debugEnabled())
- TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
- msg + " to " + serverId);
-
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- "SH(" + replicationServerId + ") receives " + msg +
- " from " + serverId, 1);
+ TRACER.debugInfo("SH(" + replicationServerId + ") receives " +
+ msg + " from " + serverId);
replicationCache.process(msg, this);
}
/**
+ * Sends the provided ReplServerInfoMessage.
+ *
+ * @param info The ReplServerInfoMessage message to be sent.
+ * @throws IOException When it occurs while sending the message,
+ *
+ */
+ public void sendInfo(ReplServerInfoMessage info)
+ throws IOException
+ {
+ session.publish(info);
+ }
+
+ /**
+ *
+ * Sets the replication server from the message provided.
+ *
+ * @param infoMsg The information message.
+ */
+ public void setReplServerInfo(ReplServerInfoMessage infoMsg)
+ {
+ remoteLDAPservers = infoMsg.getConnectedServers();
+ }
+
+ /**
+ * When this handler is connected to a replication server, specifies if
+ * a wanted server is connected to this replication server.
+ *
+ * @param wantedServer The server we want to know if it is connected
+ * to the replication server represented by this handler.
+ * @return boolean True is the wanted server is connected to the server
+ * represented by this handler.
+ */
+ public boolean isRemoteLDAPServer(short wantedServer)
+ {
+ for (String server : remoteLDAPservers)
+ {
+ if (wantedServer == Short.valueOf(server))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * When the handler is connected to a replication server, specifies the
+ * replication server has remote LDAP servers connected to it.
+ *
+ * @return boolean True is the replication server has remote LDAP servers
+ * connected to it.
+ */
+ public List<String> getRemoteLDAPServers()
+ {
+ return remoteLDAPservers;
+ }
+
+ /**
* Send an InitializeRequestMessage to the server connected through this
* handler.
*
@@ -1365,12 +1427,6 @@
if (debugEnabled())
TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
msg + " to " + serverId);
-
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- "SH(" + replicationServerId + ") forwards " +
- msg + " to " + serverId, 1);
-
session.publish(msg);
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 16373fc..0c0875d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -46,6 +46,7 @@
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
+import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.loggers.debug.DebugTracer;
@@ -165,6 +166,11 @@
WindowProbe windowProbeMsg = (WindowProbe) msg;
handler.process(windowProbeMsg);
}
+ else if (msg instanceof ReplServerInfoMessage)
+ {
+ ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
+ handler.setReplServerInfo(infoMsg);
+ }
else if (msg == null)
{
/*
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index 2f6f0c8..dd84def 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -45,7 +45,11 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.net.ServerSocket;
import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.UUID;
import org.opends.server.TestCaseUtils;
@@ -103,7 +107,7 @@
*/
public class InitOnLineTest extends ReplicationTestCase
-{
+ {
/**
* The tracer object for the debug logger
*/
@@ -124,17 +128,21 @@
boolean ssShutdownRequested = false;
protected String[] updatedEntries;
boolean externalDS = false;
- short server1ID = 1;
- short server2ID = 2;
- short server3ID = 3;
- short changelog1ID = 12;
- short changelog2ID = 13;
- int changelogPort = 8989;
+ private static final short server1ID = 11;
+ private static final short server2ID = 21;
+ private static final short server3ID = 31;
+ private static final short changelog1ID = 1;
+ private static final short changelog2ID = 2;
+ private static final short changelog3ID = 3;
+ private static int[] replServerPort = new int[4];
+
private DN baseDn;
ReplicationBroker server2 = null;
+ ReplicationBroker server3 = null;
ReplicationServer changelog1 = null;
ReplicationServer changelog2 = null;
+ ReplicationServer changelog3 = null;
boolean emptyOldChanges = true;
ReplicationDomain sd = null;
@@ -221,16 +229,6 @@
"ds-task-initialize-domain-dn: dc=example,dc=com",
"ds-task-initialize-replica-server-id: all");
- // Change log
- String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
- String changeLogLdif = "dn: " + changeLogStringDN + "\n"
- + "objectClass: top\n"
- + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
- + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
- + "ds-cfg-changelog-server-id: 1\n"
- + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
- + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
- replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
replServerEntry = null;
}
@@ -604,17 +602,20 @@
"dn: dc=example,dc=com\n"
+ "objectClass: top\n"
+ "objectClass: domain\n"
+ + "dc: example\n"
+ "entryUUID: 21111111-1111-1111-1111-111111111111\n"
+ "\n",
"dn: ou=People,dc=example,dc=com\n"
+ "objectClass: top\n"
+ "objectClass: organizationalUnit\n"
+ + "ou: People\n"
+ "entryUUID: 21111111-1111-1111-1111-111111111112\n"
+ "\n",
"dn: cn=Fiona Jensen,ou=people,dc=example,dc=com\n"
+ "objectclass: top\n"
+ "objectclass: person\n"
+ "objectclass: organizationalPerson\n"
+ + "objectclass: inetOrgPerson\n"
+ "cn: Fiona Jensen\n"
+ "sn: Jensen\n"
+ "uid: fiona\n"
@@ -625,6 +626,7 @@
+ "objectclass: top\n"
+ "objectclass: person\n"
+ "objectclass: organizationalPerson\n"
+ + "objectclass: inetOrgPerson\n"
+ "cn: Robert Langman\n"
+ "sn: Langman\n"
+ "uid: robert\n"
@@ -738,25 +740,37 @@
*/
private ReplicationServer createChangelogServer(short changelogId)
{
+ SortedSet<String> servers = null;
+ servers = new TreeSet<String>();
try
{
- if ((changelogId==changelog1ID)&&(changelog1!=null))
- return changelog1;
-
- if ((changelogId==changelog2ID)&&(changelog2!=null))
- return changelog2;
-
+ if (changelogId==changelog1ID)
{
- int chPort = getChangelogPort(changelogId);
-
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
- null);
- ReplicationServer replicationServer = new ReplicationServer(conf);
- Thread.sleep(1000);
-
- return replicationServer;
+ if (changelog1!=null)
+ return changelog1;
}
+ else if (changelogId==changelog2ID)
+ {
+ if (changelog2!=null)
+ return changelog2;
+ }
+ else if (changelogId==changelog3ID)
+ {
+ if (changelog3!=null)
+ return changelog3;
+ }
+ servers.add("localhost:" + getChangelogPort(changelog1ID));
+ servers.add("localhost:" + getChangelogPort(changelog2ID));
+ servers.add("localhost:" + getChangelogPort(changelog3ID));
+
+ int chPort = getChangelogPort(changelogId);
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
+ servers);
+ ReplicationServer replicationServer = new ReplicationServer(conf);
+ Thread.sleep(1000);
+
+ return replicationServer;
}
catch (Exception e)
{
@@ -796,7 +810,6 @@
DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
"Unable to add the synchronized server");
- entryList.add(synchroServerEntry.getDN());
sd = ReplicationDomain.retrievesReplicationDomain(baseDn);
@@ -820,14 +833,29 @@
private int getChangelogPort(short changelogID)
{
- return (changelogPort+changelogID);
+ if (replServerPort[changelogID] == 0)
+ {
+ try
+ {
+ // Find a free port for the replicationServer
+ ServerSocket socket = TestCaseUtils.bindFreePort();
+ replServerPort[changelogID] = socket.getLocalPort();
+ socket.close();
+ }
+ catch(Exception e)
+ {
+ fail("Cannot retrieve a free port for replication server."
+ + e.getMessage());
+ }
+ }
+ return replServerPort[changelogID];
}
/**
* Tests the import side of the Initialize task
*/
@Test(enabled=false)
- public void InitializeImport() throws Exception
+ public void initializeImport() throws Exception
{
String testCase = "InitializeImport";
@@ -883,7 +911,7 @@
* Tests the export side of the Initialize task
*/
@Test(enabled=false)
- public void InitializeExport() throws Exception
+ public void initializeExport() throws Exception
{
String testCase = "Replication/InitializeExport";
@@ -917,7 +945,7 @@
* Tests the import side of the InitializeTarget task
*/
@Test(enabled=false)
- public void InitializeTargetExport() throws Exception
+ public void initializeTargetExport() throws Exception
{
String testCase = "Replication/InitializeTargetExport";
@@ -957,7 +985,7 @@
* Tests the import side of the InitializeTarget task
*/
@Test(enabled=false)
- public void InitializeTargetExportAll() throws Exception
+ public void initializeTargetExportAll() throws Exception
{
String testCase = "Replication/InitializeTargetExportAll";
@@ -1001,7 +1029,7 @@
* Tests the import side of the InitializeTarget task
*/
@Test(enabled=false)
- public void InitializeTargetImport() throws Exception
+ public void initializeTargetImport() throws Exception
{
String testCase = "InitializeTargetImport";
@@ -1042,7 +1070,7 @@
* Tests the import side of the InitializeTarget task
*/
@Test(enabled=false)
- public void InitializeTargetConfigErrors() throws Exception
+ public void initializeTargetConfigErrors() throws Exception
{
String testCase = "InitializeTargetConfigErrors";
@@ -1096,7 +1124,7 @@
* Tests the import side of the InitializeTarget task
*/
@Test(enabled=false)
- public void InitializeConfigErrors() throws Exception
+ public void initializeConfigErrors() throws Exception
{
String testCase = "InitializeConfigErrors";
@@ -1116,10 +1144,10 @@
",cn=Scheduled Tasks,cn=Tasks",
"objectclass: top",
"objectclass: ds-task",
- "objectclass: ds-task-initialize",
+ "objectclass: ds-task-initialize-from-remote-replica",
"ds-task-class-name: org.opends.server.tasks.InitializeTask",
"ds-task-initialize-domain-dn: foo",
- "ds-task-initialize-source: " + server2ID);
+ "ds-task-initialize-replica-server-id: " + server2ID);
addTask(taskInit, ResultCode.INVALID_DN_SYNTAX,
TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN);
@@ -1129,10 +1157,10 @@
",cn=Scheduled Tasks,cn=Tasks",
"objectclass: top",
"objectclass: ds-task",
- "objectclass: ds-task-initialize",
+ "objectclass: ds-task-initialize-from-remote-replica",
"ds-task-class-name: org.opends.server.tasks.InitializeTask",
"ds-task-initialize-domain-dn: dc=foo",
- "ds-task-initialize-source: " + server2ID);
+ "ds-task-initialize-replica-server-id: " + server2ID);
addTask(taskInit, ResultCode.OTHER, MSGID_NO_MATCHING_DOMAIN);
// Invalid Source
@@ -1141,10 +1169,10 @@
",cn=Scheduled Tasks,cn=Tasks",
"objectclass: top",
"objectclass: ds-task",
- "objectclass: ds-task-initialize",
+ "objectclass: ds-task-initialize-from-remote-replica",
"ds-task-class-name: org.opends.server.tasks.InitializeTask",
"ds-task-initialize-domain-dn: " + baseDn,
- "ds-task-initialize-source: -3");
+ "ds-task-initialize-replica-server-id: -3");
addTask(taskInit, ResultCode.OTHER,
MSGID_INVALID_IMPORT_SOURCE);
@@ -1162,21 +1190,101 @@
}
@Test(enabled=false)
- public void InitializeTargetBroken() throws Exception
+ public void initializeTargetBroken() throws Exception
{
String testCase = "InitializeTargetBroken";
fail(testCase + " NYI");
}
@Test(enabled=false)
- public void InitializeBroken() throws Exception
+ public void initializeBroken() throws Exception
{
String testCase = "InitializeBroken";
fail(testCase + " NYI");
}
+ /*
+ * TestReplServerInfos tests that in a topology with more
+ * than one replication server, in each replication server
+ * is stored the list of LDAP servers connected to each
+ * replication server of the topology, thanks to the
+ * ReplServerInfoMessage(s) exchanged by the replication
+ * servers.
+ */
@Test(enabled=false)
- public void InitializeTargetExportMultiSS() throws Exception
+ public void testReplServerInfos() throws Exception
+ {
+ String testCase = "Replication/TestReplServerInfos";
+
+ log("Starting " + testCase);
+
+ // Create the Repl Servers
+ changelog1 = createChangelogServer(changelog1ID);
+ changelog2 = createChangelogServer(changelog2ID);
+ changelog3 = createChangelogServer(changelog3ID);
+
+ // Connects lDAP1 to replServer1
+ connectServer1ToChangelog(changelog1ID);
+
+ // Connects lDAP2 to replServer2
+ ReplicationBroker broker2 =
+ openReplicationSession(DN.decode("dc=example,dc=com"),
+ server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
+
+ // Connects lDAP3 to replServer2
+ ReplicationBroker broker3 =
+ openReplicationSession(DN.decode("dc=example,dc=com"),
+ server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
+
+ // Check that the list of connected LDAP servers is correct
+ // in each replication servers
+ List<String> l1 = changelog1.getReplicationCache(baseDn).
+ getConnectedLDAPservers();
+ assertEquals(l1.size(), 1);
+ assertEquals(l1.get(0), String.valueOf(server1ID));
+
+ List<String> l2;
+ l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+ assertEquals(l2.size(), 2);
+ assertEquals(l2.get(0), String.valueOf(server3ID));
+ assertEquals(l2.get(1), String.valueOf(server2ID));
+
+ List<String> l3;
+ l3 = changelog3.getReplicationCache(baseDn).getConnectedLDAPservers();
+ assertEquals(l3.size(), 0);
+
+ // Test updates
+ broker3.stop();
+ Thread.sleep(1000);
+ l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+ assertEquals(l2.size(), 1);
+ assertEquals(l2.get(0), String.valueOf(server2ID));
+
+ broker3 = openReplicationSession(DN.decode("dc=example,dc=com"),
+ server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
+ broker2.stop();
+ Thread.sleep(1000);
+ l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers();
+ assertEquals(l2.size(), 1);
+ assertEquals(l2.get(0), String.valueOf(server3ID));
+
+ // TODO Test ReplicationCache.getDestinationServers method.
+
+ broker2.stop();
+ broker3.stop();
+
+ cleanEntries();
+
+ changelog3.shutdown();
+ changelog3 = null;
+ changelog2.shutdown();
+ changelog2 = null;
+ changelog1.shutdown();
+ changelog1 = null;
+ }
+
+ @Test(enabled=false)
+ public void initializeTargetExportMultiSS() throws Exception
{
String testCase = "Replication/InitializeTargetExportMultiSS";
@@ -1222,17 +1330,20 @@
}
@Test(enabled=false)
- public void InitializeExportMultiSS() throws Exception
+ public void initializeExportMultiSS() throws Exception
{
String testCase = "Replication/InitializeExportMultiSS";
log("Starting "+testCase);
// Create 2 changelogs
changelog1 = createChangelogServer(changelog1ID);
- Thread.sleep(3000);
+ Thread.sleep(1000);
changelog2 = createChangelogServer(changelog2ID);
- Thread.sleep(3000);
+ Thread.sleep(1000);
+
+ changelog3 = createChangelogServer(changelog3ID);
+ Thread.sleep(1000);
// Connect DS to the replicationServer 1
connectServer1ToChangelog(changelog1ID);
@@ -1240,7 +1351,7 @@
// Put entries in DB
addTestEntriesToDB();
- // Connect a broker acting as server 2 to changelog2
+ // Connect a broker acting as server 2 to Repl Server 2
if (server2 == null)
{
server2 = openReplicationSession(DN.decode("dc=example,dc=com"),
@@ -1248,6 +1359,14 @@
1000, emptyOldChanges);
}
+ // Connect a broker acting as server 3 to Repl Server 3
+ if (server3 == null)
+ {
+ server3 = openReplicationSession(DN.decode("dc=example,dc=com"),
+ server3ID, 100, getChangelogPort(changelog3ID),
+ 1000, emptyOldChanges);
+ }
+
Thread.sleep(3000);
// S2 sends init request
@@ -1267,7 +1386,7 @@
}
@Test(enabled=false)
- public void InitializeNoSource() throws Exception
+ public void initializeNoSource() throws Exception
{
String testCase = "InitializeNoSource";
log("Starting "+testCase);
@@ -1317,7 +1436,7 @@
}
@Test(enabled=false)
- public void InitializeTargetNoTarget() throws Exception
+ public void initializeTargetNoTarget() throws Exception
{
String testCase = "InitializeTargetNoTarget" + baseDn;
log("Starting "+testCase);
@@ -1336,10 +1455,10 @@
",cn=Scheduled Tasks,cn=Tasks",
"objectclass: top",
"objectclass: ds-task",
- "objectclass: ds-task-initialize-target",
+ "objectclass: ds-task-initialize-remote-replica",
"ds-task-class-name: org.opends.server.tasks.InitializeTargetTask",
- "ds-task-initialize-target-domain-dn: "+baseDn,
- "ds-task-initialize-target-scope: " + 10);
+ "ds-task-initialize-domain-dn: "+baseDn,
+ "ds-task-initialize-replica-server-id: " + 0);
addTask(taskInit, ResultCode.SUCCESS, 0);
@@ -1355,32 +1474,32 @@
}
@Test(enabled=false)
- public void InitializeStopped() throws Exception
+ public void initializeStopped() throws Exception
{
String testCase = "InitializeStopped";
fail(testCase + " NYI");
}
@Test(enabled=false)
- public void InitializeTargetStopped() throws Exception
+ public void initializeTargetStopped() throws Exception
{
String testCase = "InitializeTargetStopped";
fail(testCase + " NYI");
}
@Test(enabled=false)
- public void InitializeCompressed() throws Exception
+ public void initializeCompressed() throws Exception
{
String testCase = "InitializeStopped";
fail(testCase + " NYI");
}
@Test(enabled=false)
- public void InitializeTargetEncrypted() throws Exception
+ public void initializeTargetEncrypted() throws Exception
{
String testCase = "InitializeTargetCompressed";
fail(testCase + " NYI");
}
@Test(enabled=false)
- public void InitializeSimultaneous() throws Exception
+ public void initializeSimultaneous() throws Exception
{
String testCase = "InitializeSimultaneous";
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 7ab93ba..86a13f3 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -173,7 +173,8 @@
{
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.NOTICE,
- "ReplicationTestCase/openChangelogSession" + e.getMessage(), 1);
+ "ReplicationTestCase/openChangelogSession " + e.getMessage()
+ + " when emptying old changes", 1);
}
}
return broker;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index f112a94..68de891 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -483,7 +483,7 @@
* by checking that : msg == new ServerStartMessage(msg.getBytes()).
*/
@Test(dataProvider="serverStart")
- public void ServerStartMessageTest(short serverId, DN baseDN, int window,
+ public void serverStartMessageTest(short serverId, DN baseDN, int window,
ServerState state) throws Exception
{
state.update(new ChangeNumber((long)1, 1,(short)1));
@@ -516,7 +516,7 @@
* by checking that : msg == new ReplServerStartMessage(msg.getBytes()).
*/
@Test(dataProvider="changelogStart")
- public void ChangelogStartMessageTest(short serverId, DN baseDN, int window,
+ public void replserverStartMessageTest(short serverId, DN baseDN, int window,
String url, ServerState state) throws Exception
{
state.update(new ChangeNumber((long)1, 1,(short)1));
@@ -537,7 +537,7 @@
* by checking that : msg == new WindowMessage(msg.getBytes()).
*/
@Test()
- public void WindowMessageTest() throws Exception
+ public void windowMessageTest() throws Exception
{
WindowMessage msg = new WindowMessage(123);
WindowMessage newMsg = new WindowMessage(msg.getBytes());
@@ -557,11 +557,25 @@
}
/**
+ * Test ReplServerInfoMessage encoding and decoding.
+ */
+ @Test()
+ public void replServerInfoMessageTest() throws Exception
+ {
+ List<String> connectedServers = new ArrayList<String>(0);
+ connectedServers.add("s1");
+ connectedServers.add("s2");
+ ReplServerInfoMessage msg = new ReplServerInfoMessage(connectedServers);
+ ReplServerInfoMessage newMsg = new ReplServerInfoMessage(msg.getBytes());
+ assertEquals(msg.getConnectedServers(), newMsg.getConnectedServers());
+ }
+
+ /**
* Test that EntryMessage encoding and decoding works
* by checking that : msg == new EntryMessageTest(msg.getBytes()).
*/
@Test()
- public void EntryMessageTest() throws Exception
+ public void entryMessageTest() throws Exception
{
String taskInitFromS2 = new String(
"dn: ds-task-id=" + UUID.randomUUID() +
@@ -586,7 +600,7 @@
* Test that InitializeRequestMessage encoding and decoding works
*/
@Test()
- public void InitializeRequestMessageTest() throws Exception
+ public void initializeRequestMessageTest() throws Exception
{
short sender = 1;
short target = 2;
@@ -602,7 +616,7 @@
* Test that InitializeTargetMessage encoding and decoding works
*/
@Test()
- public void InitializeTargetMessageTest() throws Exception
+ public void initializeTargetMessageTest() throws Exception
{
short senderID = 1;
short targetID = 2;
@@ -631,7 +645,7 @@
* Test that DoneMessage encoding and decoding works
*/
@Test()
- public void DoneMessage() throws Exception
+ public void doneMessageTest() throws Exception
{
DoneMessage msg = new DoneMessage((short)1, (short)2);
DoneMessage newMsg = new DoneMessage(msg.getBytes());
@@ -643,7 +657,7 @@
* Test that ErrorMessage encoding and decoding works
*/
@Test()
- public void ErrorMessage() throws Exception
+ public void errorMessageTest() throws Exception
{
ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details");
ErrorMessage newMsg = new ErrorMessage(msg.getBytes());
--
Gitblit v1.10.0