opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -450,6 +450,27 @@ CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 63; /** * An error happened to send a ReplServerInfoMessage to another * replication server. */ public static final int MSGID_CHANGELOG_ERROR_SENDING_INFO = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 64; /** * An error happened to send an ErrorMessage to another * replication server. */ public static final int MSGID_CHANGELOG_ERROR_SENDING_ERROR = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 65; /** * An error happened to send a Message (probably a RoutableMessage) * to another replication server. */ public static final int MSGID_CHANGELOG_ERROR_SENDING_MSG = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 66; /** * Register the messages from this class in the core server. * */ @@ -530,8 +551,8 @@ "An unexpected error happened handling connection with %s." + "This connection is going to be closed. "); registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ACK, "An unexpected error happened sending an ack to %s." + "This connection is going to be closed. "); "An unexpected error occurred while sending an ack to %s." + "This connection is going to be closed and reopened. "); registerMessage( MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE, "An Exception was caught while receiving replication message : %s"); @@ -617,5 +638,15 @@ registerMessage(MSGID_DISCONNECTED_FROM_CHANGELOG, "The connection to Replication Server %s has been dropped by the " + "Replication Server"); registerMessage(MSGID_CHANGELOG_ERROR_SENDING_INFO, "An unexpected error occurred while sending a Server " + " Info message to %s. " + "This connection is going to be closed and reopened"); registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ERROR, "An unexpected error occurred while sending an Error Message to %s. "+ "This connection is going to be closed and reopened"); registerMessage(MSGID_CHANGELOG_ERROR_SENDING_MSG, "An unexpected error occurred while sending a Message to %s. "+ "This connection is going to be closed and reopened"); } } opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
New file @@ -0,0 +1,142 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2007 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; import java.util.zip.DataFormatException; /** * * This class defines a message that is sent by a replication server * to the other replication servers in the topology containing the list * of LDAP servers directly connected to it. * A replication server sends a ReplServerInfoMessage when an LDAP * server connects or disconnects. * * Exchanging these messages allows to have each replication server * knowing the complete list of LDAP servers in the topology and * their associated replication server and thus take the appropriate * decision to route a message to an LDAP server. * */ public class ReplServerInfoMessage extends ReplicationMessage { private List<String> connectedServers = null; /** * Creates a new changelogInfo message from its encoded form. * * @param in The byte array containing the encoded form of the message. * @throws java.util.zip.DataFormatException If the byte array does not * contain a valid encoded form of the message. */ public ReplServerInfoMessage(byte[] in) throws DataFormatException { try { /* first byte is the type */ if (in.length < 1 || in[0] != MSG_TYPE_REPL_SERVER_INFO) throw new DataFormatException( "Input is not a valid changelogInfo Message."); connectedServers = new ArrayList<String>(); int pos = 1; while (pos < in.length) { /* * Read the next server ID * first calculate the length then construct the string */ int length = getNextLength(in, pos); connectedServers.add(new String(in, pos, length, "UTF-8")); pos += length +1; } } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * Creates a new changelogInfo message from a list of the currently * connected servers. * * @param connectedServers The list of currently connected servers ID. */ public ReplServerInfoMessage(List<String> connectedServers) { this.connectedServers = connectedServers; } /** * {@inheritDoc} */ @Override public byte[] getBytes() { try { ByteArrayOutputStream oStream = new ByteArrayOutputStream(); /* Put the message type */ oStream.write(MSG_TYPE_REPL_SERVER_INFO); if (connectedServers.size() >= 1) { for (String server : connectedServers) { byte[] byteServerURL = server.getBytes("UTF-8"); oStream.write(byteServerURL); oStream.write(0); } } return oStream.toByteArray(); } catch (IOException e) { // never happens return null; } } /** * Get the list of servers currently connected to the Changelog server * that generated this message. * * @return A collection of the servers currently connected to the Changelog * server that generated this message. */ public List<String> getConnectedServers() { return connectedServers; } } opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -53,6 +53,7 @@ static final byte MSG_TYPE_DONE = 13; static final byte MSG_TYPE_ERROR = 14; static final byte MSG_TYPE_WINDOW_PROBE = 15; static final byte MSG_TYPE_REPL_SERVER_INFO = 16; // Adding a new type of message here probably requires to // change accordingly generateMsg method below @@ -73,6 +74,8 @@ * MSG_TYPE_ENTRY * MSG_TYPE_DONE * MSG_TYPE_ERROR * MSG_TYPE_WINDOW_PROBE * MSG_TYPE_REPL_SERVER_INFO * * @return the byte[] representation of this message. * @throws UnsupportedEncodingException When the encoding of the message @@ -140,6 +143,9 @@ case MSG_TYPE_WINDOW_PROBE: msg = new WindowProbe(buffer); break; case MSG_TYPE_REPL_SERVER_INFO: msg = new ReplServerInfoMessage(buffer); break; default: throw new DataFormatException("received message with unknown type"); } opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -43,9 +43,9 @@ import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.AckMessage; import org.opends.server.replication.protocol.ErrorMessage; import org.opends.server.replication.protocol.InitializeRequestMessage; import org.opends.server.replication.protocol.RoutableMessage; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.replication.protocol.ReplServerInfoMessage; import org.opends.server.types.DN; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; @@ -95,6 +95,7 @@ * We add new TreeSet in the HashMap when a new replication server register * to this replication server. */ private Map<Short, ServerHandler> replicationServers = new ConcurrentHashMap<Short, ServerHandler>(); @@ -253,6 +254,11 @@ return false; } connectedServers.put(handler.getServerId(), handler); // Update the remote replication servers with our list // of connected LDAP servers sendReplServerInfo(); return true; } } @@ -269,7 +275,13 @@ if (handler.isReplicationServer()) replicationServers.remove(handler.getServerId()); else { connectedServers.remove(handler.getServerId()); // Update the remote replication servers with our list // of connected LDAP servers sendReplServerInfo(); } } /** @@ -312,6 +324,12 @@ return false; } replicationServers.put(handler.getServerId(), handler); // Update this server with the list of LDAP servers // already connected handler.sendInfo( new ReplServerInfoMessage(getConnectedLDAPservers())); return true; } } @@ -376,6 +394,22 @@ return sourceDbHandlers.keySet(); } /** * Returns as a set of String the list of LDAP servers connected to us. * Each string is the serverID of a connected LDAP server. * * @return The set of connected LDAP servers */ public List<String> getConnectedLDAPservers() { List<String> mySet = new ArrayList<String>(0); for (ServerHandler handler : connectedServers.values()) { mySet.add(String.valueOf(handler.getServerId())); } return mySet; } /** * Creates and returns an iterator. @@ -473,15 +507,9 @@ protected List<ServerHandler> getDestinationServers(RoutableMessage msg, ServerHandler senderHandler) { List<ServerHandler> servers = new ArrayList<ServerHandler>(); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, "getDestinationServers" + " msgDest:" + msg.getDestination() , 1); if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER) { // TODO Import from the "closest server" to be implemented @@ -497,7 +525,7 @@ } } // Send to all connected LDAP servers // Sends to all connected LDAP servers for (ServerHandler destinationHandler : connectedServers.values()) { // Don't loop on the sender @@ -518,14 +546,20 @@ else { // the targeted server is NOT connected // Let's search for THE changelog server that MAY // have the targeted server connected. if (senderHandler.isLDAPserver()) { // let's forward to the other changelogs servers.addAll(replicationServers.values()); for (ServerHandler h : replicationServers.values()) { if (h.isRemoteLDAPServer(msg.getDestination())) { servers.add(h); } } } } } return servers; } @@ -543,37 +577,53 @@ if (servers.isEmpty()) { if (!(msg instanceof InitializeRequestMessage)) { // TODO A more elaborated policy is probably needed } else { ErrorMessage errMsg = new ErrorMessage( msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN, "serverID:" + msg.getDestination()); try { senderHandler.send(errMsg); } catch(IOException ioe) { // TODO Handle error properly (sender timeout in addition) } } return; } for (ServerHandler targetHandler : servers) { ErrorMessage errMsg = new ErrorMessage( msg.getsenderID(), MSGID_NO_REACHABLE_PEER_IN_THE_DOMAIN, "serverID:" + msg.getDestination()); try { targetHandler.send(msg); senderHandler.send(errMsg); } catch(IOException ioe) { // TODO Handle error properly (sender timeout in addition) /* * An error happened trying the send back an ack to this server. * Log an error and close the connection to this server. */ int msgID = MSGID_CHANGELOG_ERROR_SENDING_ERROR; String message = getMessage(msgID, this.toString()) + stackTraceToSingleLineString(ioe); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); senderHandler.shutdown(); } } else { for (ServerHandler targetHandler : servers) { try { targetHandler.send(msg); } catch(IOException ioe) { /* * An error happened trying the send back an ack to this server. * Log an error and close the connection to this server. */ int msgID = MSGID_CHANGELOG_ERROR_SENDING_MSG; String message = getMessage(msgID, this.toString()) + stackTraceToSingleLineString(ioe) + " " + msg.getClass().getCanonicalName(); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); senderHandler.shutdown(); // TODO Handle error properly (sender timeout in addition) } } } @@ -722,4 +772,48 @@ } return true; } /** * Send a ReplServerInfoMessage to all the connected replication servers * in order to let them know our connected LDAP servers. */ private void sendReplServerInfo() { ReplServerInfoMessage info = new ReplServerInfoMessage(getConnectedLDAPservers()); for (ServerHandler handler : replicationServers.values()) { try { handler.sendInfo(info); } catch (IOException e) { /* * An error happened trying the send back an ack to this server. * Log an error and close the connection to this server. */ int msgID = MSGID_CHANGELOG_ERROR_SENDING_INFO; String message = getMessage(msgID, this.toString()) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); handler.shutdown(); } } } /** * Sets the replication server informations for the provided * handler from the provided ReplServerInfoMessage. * * @param handler The server handler from which the info was received. * @param infoMsg The information message that was received. */ public void setReplServerInfo( ServerHandler handler, ReplServerInfoMessage infoMsg) { handler.setReplServerInfo(infoMsg); } } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -357,7 +357,7 @@ * @param baseDn The base Dn for which the ReplicationCache must be returned. * @return The ReplicationCache associated to the base DN given in parameter. */ ReplicationCache getReplicationCache(DN baseDn) public ReplicationCache getReplicationCache(DN baseDn) { ReplicationCache replicationCache; opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -35,6 +35,7 @@ import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.IOException; import java.util.List; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -61,6 +62,7 @@ import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.replication.protocol.WindowMessage; import org.opends.server.replication.protocol.WindowProbe; import org.opends.server.replication.protocol.ReplServerInfoMessage; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; @@ -129,6 +131,14 @@ private short protocolVersion; /** * When this Handler is connected to a changelog server this collection * will contain the list of LDAP servers connected to the remote changelog * server. */ private List<String> remoteLDAPservers = new ArrayList<String>(); /** * The time in milliseconds between heartbeats from the replication * server. Zero means heartbeats are off. @@ -1342,18 +1352,70 @@ public void process(RoutableMessage msg) { if (debugEnabled()) TRACER.debugInfo("SH(" + replicationServerId + ") forwards " + msg + " to " + serverId); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, "SH(" + replicationServerId + ") receives " + msg + " from " + serverId, 1); TRACER.debugInfo("SH(" + replicationServerId + ") receives " + msg + " from " + serverId); replicationCache.process(msg, this); } /** * Sends the provided ReplServerInfoMessage. * * @param info The ReplServerInfoMessage message to be sent. * @throws IOException When it occurs while sending the message, * */ public void sendInfo(ReplServerInfoMessage info) throws IOException { session.publish(info); } /** * * Sets the replication server from the message provided. * * @param infoMsg The information message. */ public void setReplServerInfo(ReplServerInfoMessage infoMsg) { remoteLDAPservers = infoMsg.getConnectedServers(); } /** * When this handler is connected to a replication server, specifies if * a wanted server is connected to this replication server. * * @param wantedServer The server we want to know if it is connected * to the replication server represented by this handler. * @return boolean True is the wanted server is connected to the server * represented by this handler. */ public boolean isRemoteLDAPServer(short wantedServer) { for (String server : remoteLDAPservers) { if (wantedServer == Short.valueOf(server)) { return true; } } return false; } /** * When the handler is connected to a replication server, specifies the * replication server has remote LDAP servers connected to it. * * @return boolean True is the replication server has remote LDAP servers * connected to it. */ public List<String> getRemoteLDAPServers() { return remoteLDAPservers; } /** * Send an InitializeRequestMessage to the server connected through this * handler. * @@ -1365,12 +1427,6 @@ if (debugEnabled()) TRACER.debugInfo("SH(" + replicationServerId + ") forwards " + msg + " to " + serverId); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, "SH(" + replicationServerId + ") forwards " + msg + " to " + serverId, 1); session.publish(msg); } opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -46,6 +46,7 @@ import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.replication.protocol.WindowMessage; import org.opends.server.replication.protocol.WindowProbe; import org.opends.server.replication.protocol.ReplServerInfoMessage; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.loggers.debug.DebugTracer; @@ -165,6 +166,11 @@ WindowProbe windowProbeMsg = (WindowProbe) msg; handler.process(windowProbeMsg); } else if (msg instanceof ReplServerInfoMessage) { ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg; handler.setReplServerInfo(infoMsg); } else if (msg == null) { /* opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -45,7 +45,11 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.net.ServerSocket; import java.util.ArrayList; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import org.opends.server.TestCaseUtils; @@ -103,7 +107,7 @@ */ public class InitOnLineTest extends ReplicationTestCase { { /** * The tracer object for the debug logger */ @@ -124,17 +128,21 @@ boolean ssShutdownRequested = false; protected String[] updatedEntries; boolean externalDS = false; short server1ID = 1; short server2ID = 2; short server3ID = 3; short changelog1ID = 12; short changelog2ID = 13; int changelogPort = 8989; private static final short server1ID = 11; private static final short server2ID = 21; private static final short server3ID = 31; private static final short changelog1ID = 1; private static final short changelog2ID = 2; private static final short changelog3ID = 3; private static int[] replServerPort = new int[4]; private DN baseDn; ReplicationBroker server2 = null; ReplicationBroker server3 = null; ReplicationServer changelog1 = null; ReplicationServer changelog2 = null; ReplicationServer changelog3 = null; boolean emptyOldChanges = true; ReplicationDomain sd = null; @@ -221,16 +229,6 @@ "ds-task-initialize-domain-dn: dc=example,dc=com", "ds-task-initialize-replica-server-id: all"); // Change log String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN; String changeLogLdif = "dn: " + changeLogStringDN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-changelog-server-config\n" + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n" + "ds-cfg-changelog-server-id: 1\n" + "ds-cfg-window-size: " + WINDOW_SIZE + "\n" + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE; replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif); replServerEntry = null; } @@ -604,17 +602,20 @@ "dn: dc=example,dc=com\n" + "objectClass: top\n" + "objectClass: domain\n" + "dc: example\n" + "entryUUID: 21111111-1111-1111-1111-111111111111\n" + "\n", "dn: ou=People,dc=example,dc=com\n" + "objectClass: top\n" + "objectClass: organizationalUnit\n" + "ou: People\n" + "entryUUID: 21111111-1111-1111-1111-111111111112\n" + "\n", "dn: cn=Fiona Jensen,ou=people,dc=example,dc=com\n" + "objectclass: top\n" + "objectclass: person\n" + "objectclass: organizationalPerson\n" + "objectclass: inetOrgPerson\n" + "cn: Fiona Jensen\n" + "sn: Jensen\n" + "uid: fiona\n" @@ -625,6 +626,7 @@ + "objectclass: top\n" + "objectclass: person\n" + "objectclass: organizationalPerson\n" + "objectclass: inetOrgPerson\n" + "cn: Robert Langman\n" + "sn: Langman\n" + "uid: robert\n" @@ -738,25 +740,37 @@ */ private ReplicationServer createChangelogServer(short changelogId) { SortedSet<String> servers = null; servers = new TreeSet<String>(); try { if ((changelogId==changelog1ID)&&(changelog1!=null)) return changelog1; if ((changelogId==changelog2ID)&&(changelog2!=null)) return changelog2; if (changelogId==changelog1ID) { int chPort = getChangelogPort(changelogId); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100, null); ReplicationServer replicationServer = new ReplicationServer(conf); Thread.sleep(1000); return replicationServer; if (changelog1!=null) return changelog1; } else if (changelogId==changelog2ID) { if (changelog2!=null) return changelog2; } else if (changelogId==changelog3ID) { if (changelog3!=null) return changelog3; } servers.add("localhost:" + getChangelogPort(changelog1ID)); servers.add("localhost:" + getChangelogPort(changelog2ID)); servers.add("localhost:" + getChangelogPort(changelog3ID)); int chPort = getChangelogPort(changelogId); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100, servers); ReplicationServer replicationServer = new ReplicationServer(conf); Thread.sleep(1000); return replicationServer; } catch (Exception e) { @@ -796,7 +810,6 @@ DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), "Unable to add the synchronized server"); entryList.add(synchroServerEntry.getDN()); sd = ReplicationDomain.retrievesReplicationDomain(baseDn); @@ -820,14 +833,29 @@ private int getChangelogPort(short changelogID) { return (changelogPort+changelogID); if (replServerPort[changelogID] == 0) { try { // Find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); replServerPort[changelogID] = socket.getLocalPort(); socket.close(); } catch(Exception e) { fail("Cannot retrieve a free port for replication server." + e.getMessage()); } } return replServerPort[changelogID]; } /** * Tests the import side of the Initialize task */ @Test(enabled=false) public void InitializeImport() throws Exception public void initializeImport() throws Exception { String testCase = "InitializeImport"; @@ -883,7 +911,7 @@ * Tests the export side of the Initialize task */ @Test(enabled=false) public void InitializeExport() throws Exception public void initializeExport() throws Exception { String testCase = "Replication/InitializeExport"; @@ -917,7 +945,7 @@ * Tests the import side of the InitializeTarget task */ @Test(enabled=false) public void InitializeTargetExport() throws Exception public void initializeTargetExport() throws Exception { String testCase = "Replication/InitializeTargetExport"; @@ -957,7 +985,7 @@ * Tests the import side of the InitializeTarget task */ @Test(enabled=false) public void InitializeTargetExportAll() throws Exception public void initializeTargetExportAll() throws Exception { String testCase = "Replication/InitializeTargetExportAll"; @@ -1001,7 +1029,7 @@ * Tests the import side of the InitializeTarget task */ @Test(enabled=false) public void InitializeTargetImport() throws Exception public void initializeTargetImport() throws Exception { String testCase = "InitializeTargetImport"; @@ -1042,7 +1070,7 @@ * Tests the import side of the InitializeTarget task */ @Test(enabled=false) public void InitializeTargetConfigErrors() throws Exception public void initializeTargetConfigErrors() throws Exception { String testCase = "InitializeTargetConfigErrors"; @@ -1096,7 +1124,7 @@ * Tests the import side of the InitializeTarget task */ @Test(enabled=false) public void InitializeConfigErrors() throws Exception public void initializeConfigErrors() throws Exception { String testCase = "InitializeConfigErrors"; @@ -1116,10 +1144,10 @@ ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize", "objectclass: ds-task-initialize-from-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: foo", "ds-task-initialize-source: " + server2ID); "ds-task-initialize-replica-server-id: " + server2ID); addTask(taskInit, ResultCode.INVALID_DN_SYNTAX, TaskMessages.MSGID_TASK_INITIALIZE_INVALID_DN); @@ -1129,10 +1157,10 @@ ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize", "objectclass: ds-task-initialize-from-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: dc=foo", "ds-task-initialize-source: " + server2ID); "ds-task-initialize-replica-server-id: " + server2ID); addTask(taskInit, ResultCode.OTHER, MSGID_NO_MATCHING_DOMAIN); // Invalid Source @@ -1141,10 +1169,10 @@ ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize", "objectclass: ds-task-initialize-from-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTask", "ds-task-initialize-domain-dn: " + baseDn, "ds-task-initialize-source: -3"); "ds-task-initialize-replica-server-id: -3"); addTask(taskInit, ResultCode.OTHER, MSGID_INVALID_IMPORT_SOURCE); @@ -1162,21 +1190,101 @@ } @Test(enabled=false) public void InitializeTargetBroken() throws Exception public void initializeTargetBroken() throws Exception { String testCase = "InitializeTargetBroken"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeBroken() throws Exception public void initializeBroken() throws Exception { String testCase = "InitializeBroken"; fail(testCase + " NYI"); } /* * TestReplServerInfos tests that in a topology with more * than one replication server, in each replication server * is stored the list of LDAP servers connected to each * replication server of the topology, thanks to the * ReplServerInfoMessage(s) exchanged by the replication * servers. */ @Test(enabled=false) public void InitializeTargetExportMultiSS() throws Exception public void testReplServerInfos() throws Exception { String testCase = "Replication/TestReplServerInfos"; log("Starting " + testCase); // Create the Repl Servers changelog1 = createChangelogServer(changelog1ID); changelog2 = createChangelogServer(changelog2ID); changelog3 = createChangelogServer(changelog3ID); // Connects lDAP1 to replServer1 connectServer1ToChangelog(changelog1ID); // Connects lDAP2 to replServer2 ReplicationBroker broker2 = openReplicationSession(DN.decode("dc=example,dc=com"), server2ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges); // Connects lDAP3 to replServer2 ReplicationBroker broker3 = openReplicationSession(DN.decode("dc=example,dc=com"), server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges); // Check that the list of connected LDAP servers is correct // in each replication servers List<String> l1 = changelog1.getReplicationCache(baseDn). getConnectedLDAPservers(); assertEquals(l1.size(), 1); assertEquals(l1.get(0), String.valueOf(server1ID)); List<String> l2; l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers(); assertEquals(l2.size(), 2); assertEquals(l2.get(0), String.valueOf(server3ID)); assertEquals(l2.get(1), String.valueOf(server2ID)); List<String> l3; l3 = changelog3.getReplicationCache(baseDn).getConnectedLDAPservers(); assertEquals(l3.size(), 0); // Test updates broker3.stop(); Thread.sleep(1000); l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers(); assertEquals(l2.size(), 1); assertEquals(l2.get(0), String.valueOf(server2ID)); broker3 = openReplicationSession(DN.decode("dc=example,dc=com"), server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges); broker2.stop(); Thread.sleep(1000); l2 = changelog2.getReplicationCache(baseDn).getConnectedLDAPservers(); assertEquals(l2.size(), 1); assertEquals(l2.get(0), String.valueOf(server3ID)); // TODO Test ReplicationCache.getDestinationServers method. broker2.stop(); broker3.stop(); cleanEntries(); changelog3.shutdown(); changelog3 = null; changelog2.shutdown(); changelog2 = null; changelog1.shutdown(); changelog1 = null; } @Test(enabled=false) public void initializeTargetExportMultiSS() throws Exception { String testCase = "Replication/InitializeTargetExportMultiSS"; @@ -1222,17 +1330,20 @@ } @Test(enabled=false) public void InitializeExportMultiSS() throws Exception public void initializeExportMultiSS() throws Exception { String testCase = "Replication/InitializeExportMultiSS"; log("Starting "+testCase); // Create 2 changelogs changelog1 = createChangelogServer(changelog1ID); Thread.sleep(3000); Thread.sleep(1000); changelog2 = createChangelogServer(changelog2ID); Thread.sleep(3000); Thread.sleep(1000); changelog3 = createChangelogServer(changelog3ID); Thread.sleep(1000); // Connect DS to the replicationServer 1 connectServer1ToChangelog(changelog1ID); @@ -1240,7 +1351,7 @@ // Put entries in DB addTestEntriesToDB(); // Connect a broker acting as server 2 to changelog2 // Connect a broker acting as server 2 to Repl Server 2 if (server2 == null) { server2 = openReplicationSession(DN.decode("dc=example,dc=com"), @@ -1248,6 +1359,14 @@ 1000, emptyOldChanges); } // Connect a broker acting as server 3 to Repl Server 3 if (server3 == null) { server3 = openReplicationSession(DN.decode("dc=example,dc=com"), server3ID, 100, getChangelogPort(changelog3ID), 1000, emptyOldChanges); } Thread.sleep(3000); // S2 sends init request @@ -1267,7 +1386,7 @@ } @Test(enabled=false) public void InitializeNoSource() throws Exception public void initializeNoSource() throws Exception { String testCase = "InitializeNoSource"; log("Starting "+testCase); @@ -1317,7 +1436,7 @@ } @Test(enabled=false) public void InitializeTargetNoTarget() throws Exception public void initializeTargetNoTarget() throws Exception { String testCase = "InitializeTargetNoTarget" + baseDn; log("Starting "+testCase); @@ -1336,10 +1455,10 @@ ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-initialize-target", "objectclass: ds-task-initialize-remote-replica", "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask", "ds-task-initialize-target-domain-dn: "+baseDn, "ds-task-initialize-target-scope: " + 10); "ds-task-initialize-domain-dn: "+baseDn, "ds-task-initialize-replica-server-id: " + 0); addTask(taskInit, ResultCode.SUCCESS, 0); @@ -1355,32 +1474,32 @@ } @Test(enabled=false) public void InitializeStopped() throws Exception public void initializeStopped() throws Exception { String testCase = "InitializeStopped"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeTargetStopped() throws Exception public void initializeTargetStopped() throws Exception { String testCase = "InitializeTargetStopped"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeCompressed() throws Exception public void initializeCompressed() throws Exception { String testCase = "InitializeStopped"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeTargetEncrypted() throws Exception public void initializeTargetEncrypted() throws Exception { String testCase = "InitializeTargetCompressed"; fail(testCase + " NYI"); } @Test(enabled=false) public void InitializeSimultaneous() throws Exception public void initializeSimultaneous() throws Exception { String testCase = "InitializeSimultaneous"; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -173,7 +173,8 @@ { logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, "ReplicationTestCase/openChangelogSession" + e.getMessage(), 1); "ReplicationTestCase/openChangelogSession " + e.getMessage() + " when emptying old changes", 1); } } return broker; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -483,7 +483,7 @@ * by checking that : msg == new ServerStartMessage(msg.getBytes()). */ @Test(dataProvider="serverStart") public void ServerStartMessageTest(short serverId, DN baseDN, int window, public void serverStartMessageTest(short serverId, DN baseDN, int window, ServerState state) throws Exception { state.update(new ChangeNumber((long)1, 1,(short)1)); @@ -516,7 +516,7 @@ * by checking that : msg == new ReplServerStartMessage(msg.getBytes()). */ @Test(dataProvider="changelogStart") public void ChangelogStartMessageTest(short serverId, DN baseDN, int window, public void replserverStartMessageTest(short serverId, DN baseDN, int window, String url, ServerState state) throws Exception { state.update(new ChangeNumber((long)1, 1,(short)1)); @@ -537,7 +537,7 @@ * by checking that : msg == new WindowMessage(msg.getBytes()). */ @Test() public void WindowMessageTest() throws Exception public void windowMessageTest() throws Exception { WindowMessage msg = new WindowMessage(123); WindowMessage newMsg = new WindowMessage(msg.getBytes()); @@ -557,11 +557,25 @@ } /** * Test ReplServerInfoMessage encoding and decoding. */ @Test() public void replServerInfoMessageTest() throws Exception { List<String> connectedServers = new ArrayList<String>(0); connectedServers.add("s1"); connectedServers.add("s2"); ReplServerInfoMessage msg = new ReplServerInfoMessage(connectedServers); ReplServerInfoMessage newMsg = new ReplServerInfoMessage(msg.getBytes()); assertEquals(msg.getConnectedServers(), newMsg.getConnectedServers()); } /** * Test that EntryMessage encoding and decoding works * by checking that : msg == new EntryMessageTest(msg.getBytes()). */ @Test() public void EntryMessageTest() throws Exception public void entryMessageTest() throws Exception { String taskInitFromS2 = new String( "dn: ds-task-id=" + UUID.randomUUID() + @@ -586,7 +600,7 @@ * Test that InitializeRequestMessage encoding and decoding works */ @Test() public void InitializeRequestMessageTest() throws Exception public void initializeRequestMessageTest() throws Exception { short sender = 1; short target = 2; @@ -602,7 +616,7 @@ * Test that InitializeTargetMessage encoding and decoding works */ @Test() public void InitializeTargetMessageTest() throws Exception public void initializeTargetMessageTest() throws Exception { short senderID = 1; short targetID = 2; @@ -631,7 +645,7 @@ * Test that DoneMessage encoding and decoding works */ @Test() public void DoneMessage() throws Exception public void doneMessageTest() throws Exception { DoneMessage msg = new DoneMessage((short)1, (short)2); DoneMessage newMsg = new DoneMessage(msg.getBytes()); @@ -643,7 +657,7 @@ * Test that ErrorMessage encoding and decoding works */ @Test() public void ErrorMessage() throws Exception public void errorMessageTest() throws Exception { ErrorMessage msg = new ErrorMessage((short)1, (short)2, 12, "details"); ErrorMessage newMsg = new ErrorMessage(msg.getBytes());