opends/src/messages/messages/replication.properties
@@ -76,8 +76,8 @@ NOTICE_SERVER_DISCONNECT_16=%s has disconnected from this replication server NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \ listening on %s NOTICE_CHANGELOG_MISSING_CHANGES_18=The replication server %s is missing some \ changes that this server has already processed on suffix %s NOTICE_FOUND_CHANGELOGS_WITH_MY_CHANGES_18=Found %d replication server(s) with \ up to date chnages for suffix %s NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \ server should be configured SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \ @@ -85,8 +85,8 @@ MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \ database for base DN %s NOTICE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES_22=Could not find a \ replication server that has seen all the local changes on suffix %s. Going to replay \ changes replication server that has seen all the local changes on suffix %s. Found %d \ replications server(s) not up to date. Going to replay changes NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \ server on suffix %s, retrying... NOTICE_EXCEPTION_CLOSING_DATABASE_24=Error closing changelog database %s : @@ -249,4 +249,12 @@ SEVERE_ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED_99=The replication \ server backend cannot export its entries in LDIF format because the \ export-ldif command must be run as a task DEBUG_GOING_TO_SEARCH_FOR_CHANGES_100=The replication server is late \ regarding our changes: going to send missing ones DEBUG_SENDING_CHANGE_101=Sending change number: %s DEBUG_CHANGES_SENT_102=All missing changes sent to replication server SEVERE_ERR_PUBLISHING_FAKE_OPS_103=Caught exception publishing fake operations \ for domain %s to replication server %s : %s SEVERE_ERR_COMPUTING_FAKE_OPS_104=Caught exception computing fake operations \ for domain %s for replication server %s : %s opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,6 +25,7 @@ * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import org.opends.messages.*; import static org.opends.server.loggers.ErrorLogger.logError; @@ -41,6 +42,8 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.TreeSet; import java.util.concurrent.Semaphore; @@ -61,17 +64,16 @@ import org.opends.server.types.SearchResultReference; import org.opends.server.types.SearchScope; /** * The broker for Multi-master Replication. */ public class ReplicationBroker implements InternalSearchListener { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private boolean shutdown = false; private Collection<String> servers; private boolean connected = false; @@ -95,23 +97,23 @@ private long generationId = -1; private ReplSessionSecurity replSessionSecurity; // Trick for avoiding a inner class for many parameters return for // performHandshake method. private String tmpReadableServerName = null; /** * The time in milliseconds between heartbeats from the replication * server. Zero means heartbeats are off. */ private long heartbeatInterval = 0; /** * A thread to monitor heartbeats on the session. */ private HeartbeatMonitor heartbeatMonitor = null; /** * The number of times the connection was lost. */ private int numLostConnections = 0; /** * When the broker cannot connect to any replication server * it log an error and keeps continuing every second. @@ -121,7 +123,6 @@ * finally succeed to connect. */ private boolean connectionError = false; private final Object connectPhaseLock = new Object(); /** @@ -194,7 +195,6 @@ this.connect(); } /** * Connect to a ReplicationServer. * @@ -202,7 +202,7 @@ */ private void connect() { ReplServerStartMessage replServerStartMsg = null; HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // Stop any existing heartbeat monitor from a previous session. if (heartbeatMonitor != null) @@ -211,127 +211,74 @@ heartbeatMonitor = null; } // checkState is true for the first loop on all replication servers // looking for one already up-to-date. // If we found some responding replication servers but none up-to-date // then we set check-state to false and do a second loop where the first // found will be the one elected and then we will update this replication // server. boolean checkState = true; boolean receivedResponse = true; // TODO: We are doing here 2 loops opening , closing , reopening session to // the same servers .. risk to have 'same server id' erros. // Would be better to do only one loop, keeping the best candidate while // traversing the list of replication servers to connect to. if (servers.size()==1) { checkState = false; } synchronized (connectPhaseLock) { while ((!connected) && (!shutdown) && (receivedResponse)) { receivedResponse = false; /* * Connect to each replication server and get their ServerState then find * out which one is the best to connect to. */ for (String server : servers) { int separator = server.lastIndexOf(':'); String port = server.substring(separator + 1); String hostname = server.substring(0, separator); // Connect to server and get reply message ReplServerStartMessage replServerStartMsg = performHandshake(server, false); tmpReadableServerName = null; // Not needed now // Store reply message in list if (replServerStartMsg != null) { ServerState rsState = replServerStartMsg.getServerState(); rsStates.put(server, rsState); } } // for servers ReplServerStartMessage replServerStartMsg = null; if (rsStates.size() > 0) { // At least one server answered, find the best one. String bestServer = computeBestReplicationServer(state, rsStates, serverID, baseDn); // Best found, now connect to this one replServerStartMsg = performHandshake(bestServer, true); if (replServerStartMsg != null) { try { /* * Open a socket connection to the next candidate. */ InetSocketAddress ServerAddr = new InetSocketAddress( InetAddress.getByName(hostname), Integer.parseInt(port)); Socket socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); socket.connect(ServerAddr, 500); session = replSessionSecurity.createClientSession(server, socket); boolean isSslEncryption = replSessionSecurity.isSslEncryption(server); /* * Send our ServerStartMessage. */ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, halfRcvWindow*2, heartbeatInterval, state, protocolVersion, generationId, isSslEncryption); session.publish(msg); /* * Read the ReplServerStartMessage that should come back. */ session.setSoTimeout(1000); replServerStartMsg = (ReplServerStartMessage) session.receive(); receivedResponse = true; /* * We have sent our own protocol version to the replication server. * The replication server will use the same one (or an older one * if it is an old replication server). */ protocolVersion = ProtocolVersion.minWithCurrent( replServerStartMsg.getVersion()); session.setSoTimeout(timeout); if (!isSslEncryption) { session.stopEncryption(); } /* * We must not publish changes to a replicationServer that has not * seen all our previous changes because this could cause some * other ldap servers to miss those changes. * Check that the ReplicationServer has seen all our previous * changes. * If not, try another replicationServer. * If no other replicationServer has seen all our changes, recover * those changes and send them again to any replicationServer. */ ChangeNumber replServerMaxChangeNumber = replServerStartMsg.getServerState().getMaxChangeNumber(serverID); if (replServerMaxChangeNumber == null) { replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); } ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID); if ((ourMaxChangeNumber == null) || (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) if ((ourMaxChangeNumber != null) && (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) { replicationServer = ServerAddr.toString(); maxSendWindow = replServerStartMsg.getWindowSize(); connected = true; startHeartBeat(); break; } else { if (checkState == true) { /* This replicationServer is missing some * of our changes, we are going to try another server * but before log a notice message */ Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server, baseDn.toNormalizedString()); logError(message); } else { // Replication server is missing some of our changes: let's send // them to him. replayOperations.clear(); // TODO: i18n logError(Message.raw("going to search for changes")); Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); logError(message); /* * Get all the changes that have not been seen by this * replicationServer and populate the replayOperations * replication server and populate the replayOperations * list. */ InternalSearchOperation op = seachForChangedEntries( @@ -345,53 +292,40 @@ * Log an error for the repair tool * that will need to resynchronize the servers. */ Message message = ERR_CANNOT_RECOVER_CHANGES.get( message = ERR_CANNOT_RECOVER_CHANGES.get( baseDn.toNormalizedString()); logError(message); replicationServer = ServerAddr.toString(); maxSendWindow = replServerStartMsg.getWindowSize(); connected = true; startHeartBeat(); } else } else { replicationServer = ServerAddr.toString(); maxSendWindow = replServerStartMsg.getWindowSize(); connected = true; for (FakeOperation replayOp : replayOperations) { logError(Message.raw("sendingChange")); // TODO: i18n message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber(). toString()); logError(message); session.publish(replayOp.generateMessage()); } startHeartBeat(); logError(Message.raw("changes sent")); // TODO: i18n break; } } } } catch (ConnectException e) { /* * There was no server waiting on this host:port * Log a notice and try the next replicationServer in the list */ if (!connectionError ) { // the error message is only logged once to avoid overflowing // the error log Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); message = DEBUG_CHANGES_SENT.get(); logError(message); } } catch (Exception e) replicationServer = tmpReadableServerName; maxSendWindow = replServerStartMsg.getWindowSize(); connected = true; startHeartBeat(); } catch (IOException e) { Message message = ERR_EXCEPTION_STARTING_SESSION.get( baseDn.toNormalizedString(), server, e.getLocalizedMessage() + Message message = ERR_PUBLISHING_FAKE_OPS.get( baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); } finally } catch (Exception e) { Message message = ERR_COMPUTING_FAKE_OPS.get( baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); } finally { if (connected == false) { @@ -408,37 +342,20 @@ } } } } // for servers // We have traversed all the replication servers if ((!connected) && (checkState == true) && receivedResponse) { /* * We could not find a replicationServer that has seen all the * changes that this server has already processed, start again * the loop looking for any replicationServer. */ Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( baseDn.toNormalizedString()); logError(message); checkState = false; } } // We have traversed all the replication servers as many times as needed // to find one if one is up and running. } // Could perform handshake with best } // Reached some servers if (connected) { // This server has connected correctly. // Log a message to let the administrator know that the failure was // resolved. // wakeup all the thread that were waiting on the window // Wakeup all the thread that were waiting on the window // on the previous connection. connectionError = false; if (sendWindow != null) { sendWindow.release(Integer.MAX_VALUE); } this.sendWindow = new Semaphore(maxSendWindow); connectPhaseLock.notify(); @@ -451,8 +368,7 @@ replicationServer, Long.toString(this.generationId)); logError(message); } else } else { Message message = NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get( @@ -462,17 +378,14 @@ Long.toString(replServerStartMsg.getGenerationId())); logError(message); } } else } else { /* * This server could not find any replicationServer * It's going to start in degraded mode. * Log a message * This server could not find any replicationServer. It's going to start * in degraded mode. Log a message. */ if (!connectionError) { checkState = false; connectionError = true; connectPhaseLock.notify(); Message message = @@ -484,6 +397,315 @@ } /** * Connect to the provided server performing the handshake (start messages * exchange) and return the reply message from the replication server. * * @param server Server to connect to. * @param keepConnection Do we keep session opened or not after handshake. * @return The ReplServerStartMessage the server replied. Null if could not * get an answer. */ public ReplServerStartMessage performHandshake(String server, boolean keepConnection) { ReplServerStartMessage replServerStartMsg = null; // Parse server string. int separator = server.lastIndexOf(':'); String port = server.substring(separator + 1); String hostname = server.substring(0, separator); boolean error = false; try { /* * Open a socket connection to the next candidate. */ int intPort = Integer.parseInt(port); InetSocketAddress serverAddr = new InetSocketAddress( InetAddress.getByName(hostname), intPort); tmpReadableServerName = serverAddr.toString(); Socket socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); socket.connect(serverAddr, 500); session = replSessionSecurity.createClientSession(server, socket); boolean isSslEncryption = replSessionSecurity.isSslEncryption(server); /* * Send our ServerStartMessage. */ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, halfRcvWindow * 2, heartbeatInterval, state, protocolVersion, generationId, isSslEncryption); session.publish(msg); /* * Read the ReplServerStartMessage that should come back. */ session.setSoTimeout(1000); replServerStartMsg = (ReplServerStartMessage) session.receive(); /* * We have sent our own protocol version to the replication server. * The replication server will use the same one (or an older one * if it is an old replication server). */ protocolVersion = ProtocolVersion.minWithCurrent( replServerStartMsg.getVersion()); session.setSoTimeout(timeout); if (!isSslEncryption) { session.stopEncryption(); } } catch (ConnectException e) { /* * There was no server waiting on this host:port * Log a notice and try the next replicationServer in the list */ if (!connectionError) { // the error message is only logged once to avoid overflowing // the error log Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); logError(message); } error = true; } catch (Exception e) { Message message = ERR_EXCEPTION_STARTING_SESSION.get( baseDn.toNormalizedString(), server, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); error = true; } // Close session if requested if (!keepConnection || error) { if (session != null) { try { session.close(); } catch (IOException e) { // The session was already closed, just ignore. } session = null; } if (error) { replServerStartMsg = null; } // Be sure to return null. } return replServerStartMsg; } /** * Returns the replication server that best fits our need so that we can * connect to it. * * Note: this method put as public static for unit testing purpose. * * @param myState The local server state. * @param rsStates The list of available replication servers and their * associated server state. * @param serverId The server id for the suffix we are working for. * @param baseDn The suffix for which we are working for. * @return The computed best replication server. */ public static String computeBestReplicationServer(ServerState myState, HashMap<String, ServerState> rsStates, short serverId, DN baseDn) { /* * Find replication servers who are up to date (or more up to date than us, * if for instance we failed and restarted, having sent some changes to the * RS but without having time to store our own state) regarding our own * server id. Then, among them, choose the server that is the most up to * date regarding the whole topology. * * If no server is up to date regarding our own server id, find the one who * is the most up to date regarding our server id. */ // Should never happen (sanity check) if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) || (baseDn == null)) { return null; } String bestServer = null; // Servers up to dates with regard to our changes HashMap<String, ServerState> upToDateServers = new HashMap<String, ServerState>(); // Servers late with regard to our changes HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>(); /* * Start loop to differenciate up to date servers from late ones. */ ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId); if (myChangeNumber == null) { myChangeNumber = new ChangeNumber(0, 0, serverId); } for (String repServer : rsStates.keySet()) { ServerState rsState = rsStates.get(repServer); ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId); if (rsChangeNumber == null) { rsChangeNumber = new ChangeNumber(0, 0, serverId); } // Store state in right list if (myChangeNumber.olderOrEqual(rsChangeNumber)) { upToDateServers.put(repServer, rsState); } else { lateOnes.put(repServer, rsState); } } if (upToDateServers.size() > 0) { /* * Some up to date servers, among them, choose the one that has the * maximum number of changes to send us. This is the most up to date one * regarding the whole topology. This server is the one which has the less * difference with the topology server state. For comparison, we need to * compute the difference for each server id with the topology server * state. */ Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get( upToDateServers.size(), baseDn.toNormalizedString()); logError(message); /* * First of all, compute the virtual server state for the whole topology, * which is composed of the most up to date change numbers for * each server id in the topology. */ ServerState topoState = new ServerState(); for (ServerState curState : upToDateServers.values()) { Iterator<Short> it = curState.iterator(); while (it.hasNext()) { Short sId = it.next(); ChangeNumber curSidCn = curState.getMaxChangeNumber(sId); if (curSidCn == null) { curSidCn = new ChangeNumber(0, 0, sId); } // Update topology state topoState.update(curSidCn); } } // For up to date servers // Min of the max shifts long minShift = -1L; for (String upServer : upToDateServers.keySet()) { /* * Compute the maximum difference between the time of a server id's * change number and the time of the matching server id's change * number in the topology server state. * * Note: we could have used the sequence number here instead of the * timestamp, but this would have caused a problem when the sequence * number loops and comes back to 0 (computation would have becomen * meaningless). */ long shift = -1L; ServerState curState = upToDateServers.get(upServer); Iterator<Short> it = curState.iterator(); while (it.hasNext()) { Short sId = it.next(); ChangeNumber curSidCn = curState.getMaxChangeNumber(sId); if (curSidCn == null) { curSidCn = new ChangeNumber(0, 0, sId); } // Cannot be null as checked at construction time ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId); // Cannot be negative as topoState computed as being the max CN // for each server id in the topology long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime(); if (tmpShift > shift) { shift = tmpShift; } } if ((minShift < 0) // First time in loop || (shift < minShift)) { // This sever is even closer to topo state bestServer = upServer; minShift = shift; } } // For up to date servers } else { /* * We could not find a replication server that has seen all the * changes that this server has already processed, */ // lateOnes cannot be empty Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( baseDn.toNormalizedString(), lateOnes.size()); logError(message); // Min of the shifts long minShift = -1L; for (String lateServer : lateOnes.keySet()) { /* * Choose the server who is the closest to us regarding our server id * (this is the most up to date regarding our server id). */ ServerState curState = lateOnes.get(lateServer); ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId); if (ourSidCn == null) { ourSidCn = new ChangeNumber(0, 0, serverId); } // Cannot be negative as our Cn for our server id is strictly // greater than those of the servers in late server list long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime(); if ((minShift < 0) // First time in loop || (tmpShift < minShift)) { // This sever is even closer to topo state bestServer = lateServer; minShift = tmpShift; } } // For late servers } return bestServer; } /** * Search for the changes that happened since fromChangeNumber * based on the historical attribute. * @param baseDn the base DN @@ -531,7 +753,6 @@ } } /** * restart the ReplicationBroker. */ @@ -589,7 +810,6 @@ } } /** * Publish a message to the other servers. * @param msg the message to publish @@ -598,7 +818,7 @@ { boolean done = false; while (!done) while (!done && !shutdown) { if (connectionError) { @@ -643,8 +863,7 @@ credit = currentWindowSemaphore.tryAcquire( (long) 500, TimeUnit.MILLISECONDS); } else } else { credit = true; } @@ -689,8 +908,7 @@ } } } } catch (InterruptedException e) } catch (InterruptedException e) { // just loop. if (debugEnabled()) @@ -702,7 +920,6 @@ } } /** * Receive a message. * This method is not multithread safe and should either always be @@ -730,8 +947,7 @@ { WindowMessage windowMsg = (WindowMessage) msg; sendWindow.release(windowMsg.getNumAck()); } else } else { if (msg instanceof UpdateMessage) { @@ -764,7 +980,6 @@ return null; } /** * stop the server. */ @@ -774,7 +989,9 @@ shutdown = true; connected = false; if (heartbeatMonitor!= null) { heartbeatMonitor.shutdown(); } try { if (debugEnabled()) @@ -784,9 +1001,12 @@ } if (session != null) { session.close(); } } catch (IOException e) {} { } } /** @@ -834,6 +1054,7 @@ { return replicationServer; } /** * {@inheritDoc} */ @@ -906,10 +1127,13 @@ public int getCurrentSendWindow() { if (connected) { return sendWindow.availablePermits(); else } else { return 0; } } /** * Get the number of times the connection was lost. @@ -920,7 +1144,6 @@ return numLostConnections; } /** * Change some config parameters. * @@ -968,7 +1191,11 @@ return !connectionError; } private boolean debugEnabled() { return true; } private boolean debugEnabled() { return true; } private static final void debugInfo(String s) { logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -133,9 +133,6 @@ // At startup, the listen thread wait on this flag for the connet // thread to look for other servers in the topology. // TODO when a replication server is out of date (has old changes // to receive from other servers, the listen thread should not accept // connection from ldap servers. (issue 1302) private boolean connectedInTopology = false; private final Object connectedInTopologyLock = new Object(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
New file @@ -0,0 +1,684 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.util.HashMap; import static org.opends.server.replication.plugin.ReplicationBroker.*; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.testng.Assert.*; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; import org.opends.server.types.DN; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** * Test the algorithm for find the best replication server among the configured * ones. */ public class ComputeBestServerTest extends ReplicationTestCase { // The tracer object for the debug logger private static final DebugTracer TRACER = getTracer(); private void debugInfo(String s) { logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); if (debugEnabled()) { TRACER.debugInfo("** TEST **" + s); } } private void debugInfo(String message, Exception e) { debugInfo(message + stackTraceToSingleLineString(e)); } /** * Set up the environment. * * @throws Exception * If the environment could not be set up. */ @BeforeClass @Override public void setUp() throws Exception { // Don't need server context in these tests } /** * Clean up the environment. * * @throws Exception * If the environment could not be set up. */ @AfterClass @Override public void classCleanUp() throws Exception { // Don't need server context in these tests } /** * Test with one replication server, nobody has a change number (simulates) * very first connection. * * @throws Exception If a problem occured */ @Test public void testNullCNBoth() throws Exception { String testCase = "testNullCNBoth"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(0L, 0, myId2); aState.update(cn); cn = new ChangeNumber(0L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with one replication server, only replication server has a non null * changenumber for ds server id * @throws Exception If a problem occured */ @Test public void testNullCNDS() throws Exception { String testCase = "testNullCNDS"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(0L, 0, myId1); aState.update(cn); cn = new ChangeNumber(0L, 0, myId2); aState.update(cn); cn = new ChangeNumber(0L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with one replication server, only ds server has a non null * changenumber for ds server id but rs has a null one. * * @throws Exception If a problem occured */ @Test public void testNullCNRS() throws Exception { String testCase = "testNullCNRS"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(1L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(0L, 0, myId2); aState.update(cn); cn = new ChangeNumber(0L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with one replication server, up to date. * * @throws Exception If a problem occured */ @Test public void test1ServerUp() throws Exception { String testCase = "test1ServerUp"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(1L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(1L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with 2 replication servers, up to date. * * @throws Exception If a problem occured */ @Test public void test2ServersUp() throws Exception { String testCase = "test2ServersUp"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; final String LOOSER1 = "looser1"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(1L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(1L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(LOOSER1, aState); // State for server 2 aState = new ServerState(); cn = new ChangeNumber(2L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with 3 replication servers, up to date. * * @throws Exception If a problem occured */ @Test public void test3ServersUp() throws Exception { String testCase = "test3ServersUp"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; final String LOOSER1 = "looser1"; final String LOOSER2 = "looser2"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(1L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(1L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(LOOSER1, aState); // State for server 2 aState = new ServerState(); cn = new ChangeNumber(2L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(3L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); // State for server 3 aState = new ServerState(); cn = new ChangeNumber(3L, 0, myId1); aState.update(cn); cn = new ChangeNumber(2L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(LOOSER2, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with one replication server, late. * * @throws Exception If a problem occured */ @Test public void test1ServerLate() throws Exception { String testCase = "test1ServerLate"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(1L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(0L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with 2 replication servers, late. * * @throws Exception If a problem occured */ @Test public void test2ServersLate() throws Exception { String testCase = "test2ServersLate"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; final String LOOSER1 = "looser1"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(2L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(0L, 0, myId1); aState.update(cn); cn = new ChangeNumber(10L, 0, myId2); aState.update(cn); cn = new ChangeNumber(10L, 0, myId3); aState.update(cn); rsStates.put(LOOSER1, aState); // State for server 2 aState = new ServerState(); cn = new ChangeNumber(1L, 0, myId1); aState.update(cn); cn = new ChangeNumber(0L, 0, myId2); aState.update(cn); cn = new ChangeNumber(0L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with 3 replication servers, late. * * @throws Exception If a problem occured */ @Test public void test3ServersLate() throws Exception { String testCase = "test3ServersLate"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; final String LOOSER1 = "looser1"; final String LOOSER2 = "looser2"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(4L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(1L, 0, myId1); aState.update(cn); cn = new ChangeNumber(10L, 0, myId2); aState.update(cn); cn = new ChangeNumber(10L, 0, myId3); aState.update(cn); rsStates.put(LOOSER1, aState); // State for server 2 aState = new ServerState(); cn = new ChangeNumber(3L, 0, myId1); aState.update(cn); cn = new ChangeNumber(0L, 0, myId2); aState.update(cn); cn = new ChangeNumber(0L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); // State for server 3 aState = new ServerState(); cn = new ChangeNumber(2L, 0, myId1); aState.update(cn); cn = new ChangeNumber(10L, 0, myId2); aState.update(cn); cn = new ChangeNumber(10L, 0, myId3); aState.update(cn); rsStates.put(LOOSER2, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with 6 replication servers, some up, some late, one null * * @throws Exception If a problem occured */ @Test public void test6ServersMixed() throws Exception { String testCase = "test6ServersMixed"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; final String LOOSER1 = "looser1"; final String LOOSER2 = "looser2"; final String LOOSER3 = "looser3"; final String LOOSER4 = "looser4"; final String LOOSER5 = "looser5"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(5L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(4L, 0, myId1); aState.update(cn); cn = new ChangeNumber(10L, 0, myId2); aState.update(cn); cn = new ChangeNumber(10L, 0, myId3); aState.update(cn); rsStates.put(LOOSER1, aState); // State for server 2 aState = new ServerState(); cn = new ChangeNumber(7L, 0, myId1); aState.update(cn); cn = new ChangeNumber(6L, 0, myId2); aState.update(cn); cn = new ChangeNumber(5L, 0, myId3); aState.update(cn); rsStates.put(LOOSER2, aState); // State for server 3 aState = new ServerState(); cn = new ChangeNumber(3L, 0, myId1); aState.update(cn); cn = new ChangeNumber(10L, 0, myId2); aState.update(cn); cn = new ChangeNumber(10L, 0, myId3); aState.update(cn); rsStates.put(LOOSER3, aState); // State for server 4 aState = new ServerState(); cn = new ChangeNumber(6L, 0, myId1); aState.update(cn); cn = new ChangeNumber(6L, 0, myId2); aState.update(cn); cn = new ChangeNumber(8L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); // State for server 5 (null one for our serverid) aState = new ServerState(); cn = new ChangeNumber(5L, 0, myId2); aState.update(cn); cn = new ChangeNumber(5L, 0, myId3); aState.update(cn); rsStates.put(LOOSER4, aState); // State for server 6 aState = new ServerState(); cn = new ChangeNumber(5L, 0, myId1); aState.update(cn); cn = new ChangeNumber(7L, 0, myId2); aState.update(cn); cn = new ChangeNumber(6L, 0, myId3); aState.update(cn); rsStates.put(LOOSER5, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
New file @@ -0,0 +1,477 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.io.IOException; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.testng.Assert.*; import java.net.ServerSocket; import java.util.SortedSet; import java.util.TreeSet; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.DN; import org.opends.server.types.ResultCode; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** * Test if the replication domain is able to switch of replication rerver * if there is some replication server failure. */ @Test(sequential = true) public class ReplicationServerFailoverTest extends ReplicationTestCase { private static final String BASEDN_STRING = "dc=example,dc=com"; private static final short DS1_ID = 1; private static final short DS2_ID = 2; private static final short RS1_ID = 11; private static final short RS2_ID = 12; private int rs1Port = -1; private int rs2Port = -1; private ReplicationDomain rd1 = null; private ReplicationDomain rd2 = null; private ReplicationServer rs1 = null; private ReplicationServer rs2 = null; // The tracer object for the debug logger private static final DebugTracer TRACER = getTracer(); private void debugInfo(String s) { logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); if (debugEnabled()) { TRACER.debugInfo("** TEST **" + s); } } private void debugInfo(String message, Exception e) { debugInfo(message + stackTraceToSingleLineString(e)); } private void initTest() { rs1Port = -1; rs2Port = -1; rd1 = null; rd2 = null; rs1 = null; rs2 = null; findFreePorts(); } private void endTest() { if (rd1 != null) { rd1.shutdown(); rd1 = null; } if (rd2 != null) { rd2.shutdown(); rd2 = null; } if (rs1 != null) { rs1.shutdown(); rs1 = null; } if (rs2 != null) { rs2.shutdown(); rs2 = null; } rs1Port = -1; rs2Port = -1; } /** * Test the failover feature when one RS fails: * 1 DS (DS1) and 2 RS (RS1 and RS2) in topology. * DS1 connected to RS1 (DS1<->RS1) * Both RS are connected together (RS1<->RS2) * RS1 fails, DS1 should be connected to RS2 * * @throws Exception If a problem occured */ @Test public void testFailOverSingle() throws Exception { String testCase = "testFailOverSingle"; debugInfo("Starting " + testCase); initTest(); // Start RS1 rs1 = createReplicationServer(RS1_ID, testCase); // Start RS2 rs2 = createReplicationServer(RS2_ID, testCase); // Start DS1 DN baseDn = DN.decode(BASEDN_STRING); rd1 = createReplicationDomain(baseDn, DS1_ID, testCase); // DS1 connected to RS1 ? String msg = "Before " + RS1_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); // Simulate RS1 failure rs1.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS2 ? msg = "After " + RS1_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); endTest(); } /** * Test the failover feature when one RS fails: * 2 DS (DS1 and DS2) and 2 RS (RS1 and RS2) in topology. * Each DS connected to its own RS (DS1<->RS1, DS2<->RS2) * Both RS are connected together (RS1<->RS2) * RS1 fails, DS1 and DS2 should be both connected to RS2 * RS1 comes back (no change) * RS2 fails, DS1 and DS2 should be both connected to RS1 * * @throws Exception If a problem occured */ @Test(enabled = false) // This test to be run in standalone, not in precommit // because the timing is important as we restart servers after they fail // and thus cannot warrenty that the recovering server is the right one if // the sleep time is not enough with regard to thread scheduling in heavy // precommit environment public void testFailOverMulti() throws Exception { String testCase = "testFailOverMulti"; debugInfo("Starting " + testCase); initTest(); // Start RS1 rs1 = createReplicationServer(RS1_ID, testCase); // Start RS2 rs2 = createReplicationServer(RS2_ID, testCase); // Start DS1 DN baseDn = DN.decode(BASEDN_STRING); rd1 = createReplicationDomain(baseDn, DS1_ID, testCase); // Start DS2 rd2 = createReplicationDomain(baseDn, DS2_ID, testCase); // DS1 connected to RS1 ? String msg = "Before " + RS1_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); // DS2 connected to RS2 ? checkConnection(DS2_ID, RS2_ID, msg); // Simulate RS1 failure rs1.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS2 ? msg = "After " + RS1_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); // DS2 connected to RS2 ? checkConnection(DS2_ID, RS2_ID, msg); // Restart RS1 rs1 = createReplicationServer(RS1_ID, testCase); // Let time for RS1 to restart sleep(5000); // DS1 connected to RS2 ? msg = "Before " + RS2_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); // DS2 connected to RS2 ? checkConnection(DS2_ID, RS2_ID, msg); // Simulate RS2 failure rs2.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS1 ? msg = "After " + RS2_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); // DS2 connected to RS1 ? checkConnection(DS2_ID, RS1_ID, msg); // Restart RS2 rs2 = createReplicationServer(RS2_ID, testCase); // Let time for RS2 to restart sleep(5000); // DS1 connected to RS1 ? msg = "After " + RS2_ID + " restart"; checkConnection(DS1_ID, RS1_ID, msg); // DS2 connected to RS1 ? checkConnection(DS2_ID, RS1_ID, msg); endTest(); } private void sleep(long time) { try { Thread.sleep(time); } catch (InterruptedException ex) { fail("Error sleeping " + stackTraceToSingleLineString(ex)); } } /** * Check connection of the provided replication domain to the provided * replication server. */ private void checkConnection(short dsId, short rsId, String msg) { int rsPort = -1; ReplicationDomain rd = null; if (dsId == DS1_ID) { rd = rd1; } else if (dsId == DS2_ID) { rd = rd2; } else { fail("Unknown replication domain server id."); } if (rsId == RS1_ID) { rsPort = rs1Port; } else if (rsId == RS2_ID) { rsPort = rs2Port; } else { fail("Unknown replication server id."); } // Connected ? assertEquals(rd.isConnected(), true, "Replication domain " + dsId + " is not connected to a replication server (" + msg + ")"); // Right port ? String serverStr = rd.getReplicationServer(); int index = serverStr.lastIndexOf(':'); if ((index == -1) || (index >= serverStr.length())) fail("Enable to find port number in: " + serverStr); String rdPortStr = serverStr.substring(index + 1); int rdPort = -1; try { rdPort = (new Integer(rdPortStr)).intValue(); } catch (Exception e) { fail("Enable to get an int from: " + rdPortStr); } assertEquals(rdPort, rsPort, "Replication domain " + dsId + " is not connected to right replication server port (" + rdPort + ") was expecting " + rsPort + " (" + msg + ")"); } /** * Find needed free TCP ports. */ private void findFreePorts() { try { ServerSocket socket1 = TestCaseUtils.bindFreePort(); ServerSocket socket2 = TestCaseUtils.bindFreePort(); rs1Port = socket1.getLocalPort(); rs2Port = socket2.getLocalPort(); socket1.close(); socket2.close(); } catch (IOException e) { fail("Unable to determinate some free ports " + stackTraceToSingleLineString(e)); } } /** * Creates a new ReplicationServer. */ private ReplicationServer createReplicationServer(short serverId, String suffix) { SortedSet<String> replServers = new TreeSet<String>(); try { int port = -1; if (serverId == RS1_ID) { port = rs1Port; replServers.add("localhost:" + rs2Port); } else if (serverId == RS2_ID) { port = rs2Port; replServers.add("localhost:" + rs1Port); } else { fail("Unknown replication server id."); } String dir = "genid" + serverId + suffix + "Db"; ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100, replServers); ReplicationServer replicationServer = new ReplicationServer(conf); return replicationServer; } catch (Exception e) { fail("createReplicationServer " + stackTraceToSingleLineString(e)); } return null; } /** * Creates a new ReplicationDomain. */ private ReplicationDomain createReplicationDomain(DN baseDn, short serverId, String suffix) { SortedSet<String> replServers = new TreeSet<String>(); try { if (serverId == DS1_ID) { replServers.add("localhost:" + rs1Port); } else if (serverId == DS2_ID) { replServers.add("localhost:" + rs2Port); } else { fail("Unknown replication domain server id."); } DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, serverId, replServers); //domainConf.setHeartbeatInterval(500); ReplicationDomain replicationDomain = MultimasterReplication.createNewDomain(domainConf); // Add other server (doing that after connection insure we connect to // the right server) // WARNING: only works because for the moment, applying changes to conf // does not force reconnection in replication domain // when it is coded, the reconnect may 1 of both servers and we can not // guaranty anymore that we reach the server we want at the beginning. if (serverId == DS1_ID) { replServers.add("localhost:" + rs2Port); } else if (serverId == DS2_ID) { replServers.add("localhost:" + rs1Port); } else { fail("Unknown replication domain server id."); } domainConf = new DomainFakeCfg(baseDn, serverId, replServers); ConfigChangeResult chgRes = replicationDomain.applyConfigurationChange(domainConf); if ((chgRes == null) || (!chgRes.getResultCode().equals(ResultCode.SUCCESS))) { fail("Could not change replication domain config" + " (add some replication servers)."); } return replicationDomain; } catch (Exception e) { fail("createReplicationDomain " + stackTraceToSingleLineString(e)); } return null; } /** * Set up the environment. * * @throws Exception * If the environment could not be set up. */ @BeforeClass @Override public void setUp() throws Exception { super.setUp(); // In case we need to extend } /** * Clean up the environment. * * @throws Exception * If the environment could not be set up. */ @AfterClass @Override public void classCleanUp() throws Exception { super.classCleanUp(); // In case we need it extend } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -785,7 +785,8 @@ } else { fail("ReplicationServer transmission failed: no expected message class."); fail("ReplicationServer transmission failed: no expected message" + " class: " + msg2); break; } }