opends/src/messages/messages/replication.properties
@@ -76,8 +76,8 @@ NOTICE_SERVER_DISCONNECT_16=%s has disconnected from this replication server NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \ listening on %s NOTICE_CHANGELOG_MISSING_CHANGES_18=The replication server %s is missing some \ changes that this server has already processed on suffix %s NOTICE_FOUND_CHANGELOGS_WITH_MY_CHANGES_18=Found %d replication server(s) with \ up to date chnages for suffix %s NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \ server should be configured SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \ @@ -85,8 +85,8 @@ MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \ database for base DN %s NOTICE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES_22=Could not find a \ replication server that has seen all the local changes on suffix %s. Going to replay \ changes replication server that has seen all the local changes on suffix %s. Found %d \ replications server(s) not up to date. Going to replay changes NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \ server on suffix %s, retrying... NOTICE_EXCEPTION_CLOSING_DATABASE_24=Error closing changelog database %s : @@ -249,4 +249,12 @@ SEVERE_ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED_99=The replication \ server backend cannot export its entries in LDIF format because the \ export-ldif command must be run as a task DEBUG_GOING_TO_SEARCH_FOR_CHANGES_100=The replication server is late \ regarding our changes: going to send missing ones DEBUG_SENDING_CHANGE_101=Sending change number: %s DEBUG_CHANGES_SENT_102=All missing changes sent to replication server SEVERE_ERR_PUBLISHING_FAKE_OPS_103=Caught exception publishing fake operations \ for domain %s to replication server %s : %s SEVERE_ERR_COMPUTING_FAKE_OPS_104=Caught exception computing fake operations \ for domain %s for replication server %s : %s opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,6 +25,7 @@ * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import org.opends.messages.*; import static org.opends.server.loggers.ErrorLogger.logError; @@ -41,6 +42,8 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.TreeSet; import java.util.concurrent.Semaphore; @@ -61,17 +64,16 @@ import org.opends.server.types.SearchResultReference; import org.opends.server.types.SearchScope; /** * The broker for Multi-master Replication. */ public class ReplicationBroker implements InternalSearchListener { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private boolean shutdown = false; private Collection<String> servers; private boolean connected = false; @@ -95,23 +97,23 @@ private long generationId = -1; private ReplSessionSecurity replSessionSecurity; // Trick for avoiding a inner class for many parameters return for // performHandshake method. private String tmpReadableServerName = null; /** * The time in milliseconds between heartbeats from the replication * server. Zero means heartbeats are off. */ private long heartbeatInterval = 0; /** * A thread to monitor heartbeats on the session. */ private HeartbeatMonitor heartbeatMonitor = null; /** * The number of times the connection was lost. */ private int numLostConnections = 0; /** * When the broker cannot connect to any replication server * it log an error and keeps continuing every second. @@ -121,7 +123,6 @@ * finally succeed to connect. */ private boolean connectionError = false; private final Object connectPhaseLock = new Object(); /** @@ -149,9 +150,9 @@ * @param replSessionSecurity The session security configuration. */ public ReplicationBroker(ServerState state, DN baseDn, short serverID, int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window, long heartbeatInterval, long generationId, ReplSessionSecurity replSessionSecurity) int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window, long heartbeatInterval, long generationId, ReplSessionSecurity replSessionSecurity) { this.baseDn = baseDn; this.serverID = serverID; @@ -164,7 +165,7 @@ new TreeSet<FakeOperation>(new FakeOperationComparator()); this.rcvWindow = window; this.maxRcvWindow = window; this.halfRcvWindow = window/2; this.halfRcvWindow = window / 2; this.heartbeatInterval = heartbeatInterval; this.protocolVersion = ProtocolVersion.currentVersion(); this.generationId = generationId; @@ -194,7 +195,6 @@ this.connect(); } /** * Connect to a ReplicationServer. * @@ -202,7 +202,7 @@ */ private void connect() { ReplServerStartMessage replServerStartMsg = null; HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // Stop any existing heartbeat monitor from a previous session. if (heartbeatMonitor != null) @@ -211,187 +211,121 @@ heartbeatMonitor = null; } // checkState is true for the first loop on all replication servers // looking for one already up-to-date. // If we found some responding replication servers but none up-to-date // then we set check-state to false and do a second loop where the first // found will be the one elected and then we will update this replication // server. boolean checkState = true; boolean receivedResponse = true; // TODO: We are doing here 2 loops opening , closing , reopening session to // the same servers .. risk to have 'same server id' erros. // Would be better to do only one loop, keeping the best candidate while // traversing the list of replication servers to connect to. if (servers.size()==1) { checkState = false; } synchronized (connectPhaseLock) { while ((!connected) && (!shutdown) && (receivedResponse)) /* * Connect to each replication server and get their ServerState then find * out which one is the best to connect to. */ for (String server : servers) { receivedResponse = false; for (String server : servers) { int separator = server.lastIndexOf(':'); String port = server.substring(separator + 1); String hostname = server.substring(0, separator); // Connect to server and get reply message ReplServerStartMessage replServerStartMsg = performHandshake(server, false); tmpReadableServerName = null; // Not needed now // Store reply message in list if (replServerStartMsg != null) { ServerState rsState = replServerStartMsg.getServerState(); rsStates.put(server, rsState); } } // for servers ReplServerStartMessage replServerStartMsg = null; if (rsStates.size() > 0) { // At least one server answered, find the best one. String bestServer = computeBestReplicationServer(state, rsStates, serverID, baseDn); // Best found, now connect to this one replServerStartMsg = performHandshake(bestServer, true); if (replServerStartMsg != null) { try { /* * Open a socket connection to the next candidate. */ InetSocketAddress ServerAddr = new InetSocketAddress( InetAddress.getByName(hostname), Integer.parseInt(port)); Socket socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); socket.connect(ServerAddr, 500); session = replSessionSecurity.createClientSession(server, socket); boolean isSslEncryption = replSessionSecurity.isSslEncryption(server); /* * Send our ServerStartMessage. */ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, halfRcvWindow*2, heartbeatInterval, state, protocolVersion, generationId, isSslEncryption); session.publish(msg); /* * Read the ReplServerStartMessage that should come back. */ session.setSoTimeout(1000); replServerStartMsg = (ReplServerStartMessage) session.receive(); receivedResponse = true; /* * We have sent our own protocol version to the replication server. * The replication server will use the same one (or an older one * if it is an old replication server). */ protocolVersion = ProtocolVersion.minWithCurrent( replServerStartMsg.getVersion()); session.setSoTimeout(timeout); if (!isSslEncryption) { session.stopEncryption(); } /* * We must not publish changes to a replicationServer that has not * seen all our previous changes because this could cause some * other ldap servers to miss those changes. * Check that the ReplicationServer has seen all our previous * changes. * If not, try another replicationServer. * If no other replicationServer has seen all our changes, recover * those changes and send them again to any replicationServer. */ ChangeNumber replServerMaxChangeNumber = replServerStartMsg.getServerState().getMaxChangeNumber(serverID); if (replServerMaxChangeNumber == null) { replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID); } ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID); if ((ourMaxChangeNumber == null) || (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) if ((ourMaxChangeNumber != null) && (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) { replicationServer = ServerAddr.toString(); maxSendWindow = replServerStartMsg.getWindowSize(); connected = true; startHeartBeat(); break; } else { if (checkState == true) // Replication server is missing some of our changes: let's send // them to him. replayOperations.clear(); Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); logError(message); /* * Get all the changes that have not been seen by this * replication server and populate the replayOperations * list. */ InternalSearchOperation op = seachForChangedEntries( baseDn, replServerMaxChangeNumber, this); if (op.getResultCode() != ResultCode.SUCCESS) { /* This replicationServer is missing some * of our changes, we are going to try another server * but before log a notice message /* * An error happened trying to search for the updates * This server will start acepting again new updates but * some inconsistencies will stay between servers. * Log an error for the repair tool * that will need to resynchronize the servers. */ Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server, baseDn.toNormalizedString()); message = ERR_CANNOT_RECOVER_CHANGES.get( baseDn.toNormalizedString()); logError(message); } else { for (FakeOperation replayOp : replayOperations) { message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber(). toString()); logError(message); session.publish(replayOp.generateMessage()); } message = DEBUG_CHANGES_SENT.get(); logError(message); } else { replayOperations.clear(); // TODO: i18n logError(Message.raw("going to search for changes")); /* * Get all the changes that have not been seen by this * replicationServer and populate the replayOperations * list. */ InternalSearchOperation op = seachForChangedEntries( baseDn, replServerMaxChangeNumber, this); if (op.getResultCode() != ResultCode.SUCCESS) { /* * An error happened trying to search for the updates * This server will start acepting again new updates but * some inconsistencies will stay between servers. * Log an error for the repair tool * that will need to resynchronize the servers. */ Message message = ERR_CANNOT_RECOVER_CHANGES.get( baseDn.toNormalizedString()); logError(message); replicationServer = ServerAddr.toString(); maxSendWindow = replServerStartMsg.getWindowSize(); connected = true; startHeartBeat(); } else { replicationServer = ServerAddr.toString(); maxSendWindow = replServerStartMsg.getWindowSize(); connected = true; for (FakeOperation replayOp : replayOperations) { logError(Message.raw("sendingChange")); // TODO: i18n session.publish(replayOp.generateMessage()); } startHeartBeat(); logError(Message.raw("changes sent")); // TODO: i18n break; } } } } catch (ConnectException e) replicationServer = tmpReadableServerName; maxSendWindow = replServerStartMsg.getWindowSize(); connected = true; startHeartBeat(); } catch (IOException e) { /* * There was no server waiting on this host:port * Log a notice and try the next replicationServer in the list */ if (!connectionError ) { // the error message is only logged once to avoid overflowing // the error log Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); logError(message); } } catch (Exception e) { Message message = ERR_EXCEPTION_STARTING_SESSION.get( baseDn.toNormalizedString(), server, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); Message message = ERR_PUBLISHING_FAKE_OPS.get( baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); } finally } catch (Exception e) { Message message = ERR_COMPUTING_FAKE_OPS.get( baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); } finally { if (connected == false) { @@ -402,81 +336,60 @@ session.close(); } catch (IOException e) { // The session was already closed, just ignore. // The session was already closed, just ignore. } session = null; } } } } // for servers // We have traversed all the replication servers if ((!connected) && (checkState == true) && receivedResponse) { /* * We could not find a replicationServer that has seen all the * changes that this server has already processed, start again * the loop looking for any replicationServer. */ Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( baseDn.toNormalizedString()); logError(message); checkState = false; } } // We have traversed all the replication servers as many times as needed // to find one if one is up and running. } // Could perform handshake with best } // Reached some servers if (connected) { // This server has connected correctly. // Log a message to let the administrator know that the failure was // resolved. // wakeup all the thread that were waiting on the window // Wakeup all the thread that were waiting on the window // on the previous connection. connectionError = false; if (sendWindow != null) { sendWindow.release(Integer.MAX_VALUE); } this.sendWindow = new Semaphore(maxSendWindow); connectPhaseLock.notify(); if ((replServerStartMsg.getGenerationId() == this.generationId) || (replServerStartMsg.getGenerationId() == -1)) (replServerStartMsg.getGenerationId() == -1)) { Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get( baseDn.toString(), replicationServer, Long.toString(this.generationId)); baseDn.toString(), replicationServer, Long.toString(this.generationId)); logError(message); } else } else { Message message = NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get( baseDn.toString(), replicationServer, Long.toString(this.generationId), Long.toString(replServerStartMsg.getGenerationId())); baseDn.toString(), replicationServer, Long.toString(this.generationId), Long.toString(replServerStartMsg.getGenerationId())); logError(message); } } else } else { /* * This server could not find any replicationServer * It's going to start in degraded mode. * Log a message * This server could not find any replicationServer. It's going to start * in degraded mode. Log a message. */ if (!connectionError) { checkState = false; connectionError = true; connectPhaseLock.notify(); Message message = NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString()); NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString()); logError(message); } } @@ -484,6 +397,315 @@ } /** * Connect to the provided server performing the handshake (start messages * exchange) and return the reply message from the replication server. * * @param server Server to connect to. * @param keepConnection Do we keep session opened or not after handshake. * @return The ReplServerStartMessage the server replied. Null if could not * get an answer. */ public ReplServerStartMessage performHandshake(String server, boolean keepConnection) { ReplServerStartMessage replServerStartMsg = null; // Parse server string. int separator = server.lastIndexOf(':'); String port = server.substring(separator + 1); String hostname = server.substring(0, separator); boolean error = false; try { /* * Open a socket connection to the next candidate. */ int intPort = Integer.parseInt(port); InetSocketAddress serverAddr = new InetSocketAddress( InetAddress.getByName(hostname), intPort); tmpReadableServerName = serverAddr.toString(); Socket socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); socket.connect(serverAddr, 500); session = replSessionSecurity.createClientSession(server, socket); boolean isSslEncryption = replSessionSecurity.isSslEncryption(server); /* * Send our ServerStartMessage. */ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, halfRcvWindow * 2, heartbeatInterval, state, protocolVersion, generationId, isSslEncryption); session.publish(msg); /* * Read the ReplServerStartMessage that should come back. */ session.setSoTimeout(1000); replServerStartMsg = (ReplServerStartMessage) session.receive(); /* * We have sent our own protocol version to the replication server. * The replication server will use the same one (or an older one * if it is an old replication server). */ protocolVersion = ProtocolVersion.minWithCurrent( replServerStartMsg.getVersion()); session.setSoTimeout(timeout); if (!isSslEncryption) { session.stopEncryption(); } } catch (ConnectException e) { /* * There was no server waiting on this host:port * Log a notice and try the next replicationServer in the list */ if (!connectionError) { // the error message is only logged once to avoid overflowing // the error log Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server); logError(message); } error = true; } catch (Exception e) { Message message = ERR_EXCEPTION_STARTING_SESSION.get( baseDn.toNormalizedString(), server, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); error = true; } // Close session if requested if (!keepConnection || error) { if (session != null) { try { session.close(); } catch (IOException e) { // The session was already closed, just ignore. } session = null; } if (error) { replServerStartMsg = null; } // Be sure to return null. } return replServerStartMsg; } /** * Returns the replication server that best fits our need so that we can * connect to it. * * Note: this method put as public static for unit testing purpose. * * @param myState The local server state. * @param rsStates The list of available replication servers and their * associated server state. * @param serverId The server id for the suffix we are working for. * @param baseDn The suffix for which we are working for. * @return The computed best replication server. */ public static String computeBestReplicationServer(ServerState myState, HashMap<String, ServerState> rsStates, short serverId, DN baseDn) { /* * Find replication servers who are up to date (or more up to date than us, * if for instance we failed and restarted, having sent some changes to the * RS but without having time to store our own state) regarding our own * server id. Then, among them, choose the server that is the most up to * date regarding the whole topology. * * If no server is up to date regarding our own server id, find the one who * is the most up to date regarding our server id. */ // Should never happen (sanity check) if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) || (baseDn == null)) { return null; } String bestServer = null; // Servers up to dates with regard to our changes HashMap<String, ServerState> upToDateServers = new HashMap<String, ServerState>(); // Servers late with regard to our changes HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>(); /* * Start loop to differenciate up to date servers from late ones. */ ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId); if (myChangeNumber == null) { myChangeNumber = new ChangeNumber(0, 0, serverId); } for (String repServer : rsStates.keySet()) { ServerState rsState = rsStates.get(repServer); ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId); if (rsChangeNumber == null) { rsChangeNumber = new ChangeNumber(0, 0, serverId); } // Store state in right list if (myChangeNumber.olderOrEqual(rsChangeNumber)) { upToDateServers.put(repServer, rsState); } else { lateOnes.put(repServer, rsState); } } if (upToDateServers.size() > 0) { /* * Some up to date servers, among them, choose the one that has the * maximum number of changes to send us. This is the most up to date one * regarding the whole topology. This server is the one which has the less * difference with the topology server state. For comparison, we need to * compute the difference for each server id with the topology server * state. */ Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get( upToDateServers.size(), baseDn.toNormalizedString()); logError(message); /* * First of all, compute the virtual server state for the whole topology, * which is composed of the most up to date change numbers for * each server id in the topology. */ ServerState topoState = new ServerState(); for (ServerState curState : upToDateServers.values()) { Iterator<Short> it = curState.iterator(); while (it.hasNext()) { Short sId = it.next(); ChangeNumber curSidCn = curState.getMaxChangeNumber(sId); if (curSidCn == null) { curSidCn = new ChangeNumber(0, 0, sId); } // Update topology state topoState.update(curSidCn); } } // For up to date servers // Min of the max shifts long minShift = -1L; for (String upServer : upToDateServers.keySet()) { /* * Compute the maximum difference between the time of a server id's * change number and the time of the matching server id's change * number in the topology server state. * * Note: we could have used the sequence number here instead of the * timestamp, but this would have caused a problem when the sequence * number loops and comes back to 0 (computation would have becomen * meaningless). */ long shift = -1L; ServerState curState = upToDateServers.get(upServer); Iterator<Short> it = curState.iterator(); while (it.hasNext()) { Short sId = it.next(); ChangeNumber curSidCn = curState.getMaxChangeNumber(sId); if (curSidCn == null) { curSidCn = new ChangeNumber(0, 0, sId); } // Cannot be null as checked at construction time ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId); // Cannot be negative as topoState computed as being the max CN // for each server id in the topology long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime(); if (tmpShift > shift) { shift = tmpShift; } } if ((minShift < 0) // First time in loop || (shift < minShift)) { // This sever is even closer to topo state bestServer = upServer; minShift = shift; } } // For up to date servers } else { /* * We could not find a replication server that has seen all the * changes that this server has already processed, */ // lateOnes cannot be empty Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get( baseDn.toNormalizedString(), lateOnes.size()); logError(message); // Min of the shifts long minShift = -1L; for (String lateServer : lateOnes.keySet()) { /* * Choose the server who is the closest to us regarding our server id * (this is the most up to date regarding our server id). */ ServerState curState = lateOnes.get(lateServer); ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId); if (ourSidCn == null) { ourSidCn = new ChangeNumber(0, 0, serverId); } // Cannot be negative as our Cn for our server id is strictly // greater than those of the servers in late server list long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime(); if ((minShift < 0) // First time in loop || (tmpShift < minShift)) { // This sever is even closer to topo state bestServer = lateServer; minShift = tmpShift; } } // For late servers } return bestServer; } /** * Search for the changes that happened since fromChangeNumber * based on the historical attribute. * @param baseDn the base DN @@ -493,26 +715,26 @@ * @throws Exception when raised. */ public static InternalSearchOperation seachForChangedEntries( DN baseDn, ChangeNumber fromChangeNumber, InternalSearchListener resultListener) throws Exception DN baseDn, ChangeNumber fromChangeNumber, InternalSearchListener resultListener) throws Exception { InternalClientConnection conn = InternalClientConnection.getRootConnection(); LDAPFilter filter = LDAPFilter.decode( "("+ Historical.HISTORICALATTRIBUTENAME + ">=dummy:" + fromChangeNumber + ")"); "(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:" + fromChangeNumber + ")"); LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); attrs.add(Historical.HISTORICALATTRIBUTENAME); attrs.add(Historical.ENTRYUIDNAME); return conn.processSearch( new ASN1OctetString(baseDn.toString()), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, filter, attrs, resultListener); new ASN1OctetString(baseDn.toString()), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, filter, attrs, resultListener); } /** @@ -524,14 +746,13 @@ if (heartbeatInterval > 0) { heartbeatMonitor = new HeartbeatMonitor("Replication Heartbeat Monitor on " + baseDn + " with " + getReplicationServer(), session, heartbeatInterval); new HeartbeatMonitor("Replication Heartbeat Monitor on " + baseDn + " with " + getReplicationServer(), session, heartbeatInterval); heartbeatMonitor.start(); } } /** * restart the ReplicationBroker. */ @@ -556,7 +777,7 @@ } } catch (IOException e1) { // ignore // ignore } if (failingSession == session) @@ -572,7 +793,7 @@ { MessageBuilder mb = new MessageBuilder(); mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( baseDn.toNormalizedString(), e.getLocalizedMessage())); baseDn.toNormalizedString(), e.getLocalizedMessage())); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); } @@ -583,13 +804,12 @@ Thread.sleep(500); } catch (InterruptedException e) { // ignore // ignore } } } } /** * Publish a message to the other servers. * @param msg the message to publish @@ -598,7 +818,7 @@ { boolean done = false; while (!done) while (!done && !shutdown) { if (connectionError) { @@ -611,7 +831,7 @@ if (debugEnabled()) { debugInfo("ReplicationBroker.publish() Publishing a " + " message is not possible due to existing connection error."); " message is not possible due to existing connection error."); } return; @@ -642,9 +862,8 @@ // want to hold off reconnection in case the connection dropped. credit = currentWindowSemaphore.tryAcquire( (long) 500, TimeUnit.MILLISECONDS); } else (long) 500, TimeUnit.MILLISECONDS); } else { credit = true; } @@ -685,24 +904,22 @@ if (debugEnabled()) { debugInfo("ReplicationBroker.publish() " + "IO exception raised : " + e.getLocalizedMessage()); "IO exception raised : " + e.getLocalizedMessage()); } } } } catch (InterruptedException e) } catch (InterruptedException e) { // just loop. if (debugEnabled()) { debugInfo("ReplicationBroker.publish() " + "Interrupted exception raised." + e.getLocalizedMessage()); "Interrupted exception raised." + e.getLocalizedMessage()); } } } } /** * Receive a message. * This method is not multithread safe and should either always be @@ -730,8 +947,7 @@ { WindowMessage windowMsg = (WindowMessage) msg; sendWindow.release(windowMsg.getNumAck()); } else } else { if (msg instanceof UpdateMessage) { @@ -752,11 +968,11 @@ if (shutdown == false) { Message message = NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer); NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer); logError(message); debugInfo("ReplicationBroker.receive() " + baseDn + " Exception raised." + e + e.getLocalizedMessage()); " Exception raised." + e + e.getLocalizedMessage()); this.reStart(failingSession); } } @@ -764,7 +980,6 @@ return null; } /** * stop the server. */ @@ -773,8 +988,10 @@ replicationServer = "stopped"; shutdown = true; connected = false; if (heartbeatMonitor!= null) if (heartbeatMonitor != null) { heartbeatMonitor.shutdown(); } try { if (debugEnabled()) @@ -784,9 +1001,12 @@ } if (session != null) { session.close(); } } catch (IOException e) {} { } } /** @@ -834,12 +1054,13 @@ { return replicationServer; } /** * {@inheritDoc} */ public void handleInternalSearchEntry( InternalSearchOperation searchOperation, SearchResultEntry searchEntry) InternalSearchOperation searchOperation, SearchResultEntry searchEntry) { /* * Only deal with modify operation so far @@ -862,10 +1083,10 @@ * {@inheritDoc} */ public void handleInternalSearchReference( InternalSearchOperation searchOperation, SearchResultReference searchReference) InternalSearchOperation searchOperation, SearchResultReference searchReference) { // TODO to be implemented // TODO to be implemented } /** @@ -906,9 +1127,12 @@ public int getCurrentSendWindow() { if (connected) { return sendWindow.availablePermits(); else } else { return 0; } } /** @@ -920,7 +1144,6 @@ return numLostConnections; } /** * Change some config parameters. * @@ -933,8 +1156,8 @@ * @param heartbeatInterval The heartbeat interval. */ public void changeConfig(Collection<String> replicationServers, int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window, long heartbeatInterval) int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window, long heartbeatInterval) { this.servers = replicationServers; this.maxRcvWindow = window; @@ -943,9 +1166,9 @@ this.maxReceiveQueue = maxReceiveQueue; this.maxSendDelay = maxSendDelay; this.maxSendQueue = maxSendQueue; // TODO : Changing those parameters requires to either restart a new // session with the replicationServer or renegociate the parameters that // were sent in the ServerStart message // TODO : Changing those parameters requires to either restart a new // session with the replicationServer or renegociate the parameters that // were sent in the ServerStart message } /** @@ -968,7 +1191,11 @@ return !connectionError; } private boolean debugEnabled() { return true; } private boolean debugEnabled() { return true; } private static final void debugInfo(String s) { logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -133,9 +133,6 @@ // At startup, the listen thread wait on this flag for the connet // thread to look for other servers in the topology. // TODO when a replication server is out of date (has old changes // to receive from other servers, the listen thread should not accept // connection from ldap servers. (issue 1302) private boolean connectedInTopology = false; private final Object connectedInTopologyLock = new Object(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
New file @@ -0,0 +1,684 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.util.HashMap; import static org.opends.server.replication.plugin.ReplicationBroker.*; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.testng.Assert.*; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; import org.opends.server.types.DN; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** * Test the algorithm for find the best replication server among the configured * ones. */ public class ComputeBestServerTest extends ReplicationTestCase { // The tracer object for the debug logger private static final DebugTracer TRACER = getTracer(); private void debugInfo(String s) { logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); if (debugEnabled()) { TRACER.debugInfo("** TEST **" + s); } } private void debugInfo(String message, Exception e) { debugInfo(message + stackTraceToSingleLineString(e)); } /** * Set up the environment. * * @throws Exception * If the environment could not be set up. */ @BeforeClass @Override public void setUp() throws Exception { // Don't need server context in these tests } /** * Clean up the environment. * * @throws Exception * If the environment could not be set up. */ @AfterClass @Override public void classCleanUp() throws Exception { // Don't need server context in these tests } /** * Test with one replication server, nobody has a change number (simulates) * very first connection. * * @throws Exception If a problem occured */ @Test public void testNullCNBoth() throws Exception { String testCase = "testNullCNBoth"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(0L, 0, myId2); aState.update(cn); cn = new ChangeNumber(0L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with one replication server, only replication server has a non null * changenumber for ds server id * @throws Exception If a problem occured */ @Test public void testNullCNDS() throws Exception { String testCase = "testNullCNDS"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(0L, 0, myId1); aState.update(cn); cn = new ChangeNumber(0L, 0, myId2); aState.update(cn); cn = new ChangeNumber(0L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with one replication server, only ds server has a non null * changenumber for ds server id but rs has a null one. * * @throws Exception If a problem occured */ @Test public void testNullCNRS() throws Exception { String testCase = "testNullCNRS"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(1L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(0L, 0, myId2); aState.update(cn); cn = new ChangeNumber(0L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with one replication server, up to date. * * @throws Exception If a problem occured */ @Test public void test1ServerUp() throws Exception { String testCase = "test1ServerUp"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(1L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(1L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with 2 replication servers, up to date. * * @throws Exception If a problem occured */ @Test public void test2ServersUp() throws Exception { String testCase = "test2ServersUp"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; final String LOOSER1 = "looser1"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(1L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(1L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(LOOSER1, aState); // State for server 2 aState = new ServerState(); cn = new ChangeNumber(2L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with 3 replication servers, up to date. * * @throws Exception If a problem occured */ @Test public void test3ServersUp() throws Exception { String testCase = "test3ServersUp"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; final String LOOSER1 = "looser1"; final String LOOSER2 = "looser2"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(1L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(1L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(LOOSER1, aState); // State for server 2 aState = new ServerState(); cn = new ChangeNumber(2L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(3L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); // State for server 3 aState = new ServerState(); cn = new ChangeNumber(3L, 0, myId1); aState.update(cn); cn = new ChangeNumber(2L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(LOOSER2, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with one replication server, late. * * @throws Exception If a problem occured */ @Test public void test1ServerLate() throws Exception { String testCase = "test1ServerLate"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(1L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(0L, 0, myId1); aState.update(cn); cn = new ChangeNumber(1L, 0, myId2); aState.update(cn); cn = new ChangeNumber(1L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with 2 replication servers, late. * * @throws Exception If a problem occured */ @Test public void test2ServersLate() throws Exception { String testCase = "test2ServersLate"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; final String LOOSER1 = "looser1"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(2L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(0L, 0, myId1); aState.update(cn); cn = new ChangeNumber(10L, 0, myId2); aState.update(cn); cn = new ChangeNumber(10L, 0, myId3); aState.update(cn); rsStates.put(LOOSER1, aState); // State for server 2 aState = new ServerState(); cn = new ChangeNumber(1L, 0, myId1); aState.update(cn); cn = new ChangeNumber(0L, 0, myId2); aState.update(cn); cn = new ChangeNumber(0L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with 3 replication servers, late. * * @throws Exception If a problem occured */ @Test public void test3ServersLate() throws Exception { String testCase = "test3ServersLate"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; final String LOOSER1 = "looser1"; final String LOOSER2 = "looser2"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(4L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(1L, 0, myId1); aState.update(cn); cn = new ChangeNumber(10L, 0, myId2); aState.update(cn); cn = new ChangeNumber(10L, 0, myId3); aState.update(cn); rsStates.put(LOOSER1, aState); // State for server 2 aState = new ServerState(); cn = new ChangeNumber(3L, 0, myId1); aState.update(cn); cn = new ChangeNumber(0L, 0, myId2); aState.update(cn); cn = new ChangeNumber(0L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); // State for server 3 aState = new ServerState(); cn = new ChangeNumber(2L, 0, myId1); aState.update(cn); cn = new ChangeNumber(10L, 0, myId2); aState.update(cn); cn = new ChangeNumber(10L, 0, myId3); aState.update(cn); rsStates.put(LOOSER2, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } /** * Test with 6 replication servers, some up, some late, one null * * @throws Exception If a problem occured */ @Test public void test6ServersMixed() throws Exception { String testCase = "test6ServersMixed"; debugInfo("Starting " + testCase); // definitions for server ids short myId1 = 1; short myId2 = 2; short myId3 = 3; // definitions for server names final String WINNER = "winner"; final String LOOSER1 = "looser1"; final String LOOSER2 = "looser2"; final String LOOSER3 = "looser3"; final String LOOSER4 = "looser4"; final String LOOSER5 = "looser5"; // Create my state ServerState mySt = new ServerState(); ChangeNumber cn = new ChangeNumber(5L, 0, myId1); mySt.update(cn); cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo mySt.update(cn); cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo mySt.update(cn); // Create replication servers state list HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>(); // State for server 1 ServerState aState = new ServerState(); cn = new ChangeNumber(4L, 0, myId1); aState.update(cn); cn = new ChangeNumber(10L, 0, myId2); aState.update(cn); cn = new ChangeNumber(10L, 0, myId3); aState.update(cn); rsStates.put(LOOSER1, aState); // State for server 2 aState = new ServerState(); cn = new ChangeNumber(7L, 0, myId1); aState.update(cn); cn = new ChangeNumber(6L, 0, myId2); aState.update(cn); cn = new ChangeNumber(5L, 0, myId3); aState.update(cn); rsStates.put(LOOSER2, aState); // State for server 3 aState = new ServerState(); cn = new ChangeNumber(3L, 0, myId1); aState.update(cn); cn = new ChangeNumber(10L, 0, myId2); aState.update(cn); cn = new ChangeNumber(10L, 0, myId3); aState.update(cn); rsStates.put(LOOSER3, aState); // State for server 4 aState = new ServerState(); cn = new ChangeNumber(6L, 0, myId1); aState.update(cn); cn = new ChangeNumber(6L, 0, myId2); aState.update(cn); cn = new ChangeNumber(8L, 0, myId3); aState.update(cn); rsStates.put(WINNER, aState); // State for server 5 (null one for our serverid) aState = new ServerState(); cn = new ChangeNumber(5L, 0, myId2); aState.update(cn); cn = new ChangeNumber(5L, 0, myId3); aState.update(cn); rsStates.put(LOOSER4, aState); // State for server 6 aState = new ServerState(); cn = new ChangeNumber(5L, 0, myId1); aState.update(cn); cn = new ChangeNumber(7L, 0, myId2); aState.update(cn); cn = new ChangeNumber(6L, 0, myId3); aState.update(cn); rsStates.put(LOOSER5, aState); String bestServer = computeBestReplicationServer(mySt, rsStates, myId1, new DN()); assertEquals(bestServer, WINNER, "Wrong best replication server."); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
New file @@ -0,0 +1,477 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.io.IOException; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.testng.Assert.*; import java.net.ServerSocket; import java.util.SortedSet; import java.util.TreeSet; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.DN; import org.opends.server.types.ResultCode; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** * Test if the replication domain is able to switch of replication rerver * if there is some replication server failure. */ @Test(sequential = true) public class ReplicationServerFailoverTest extends ReplicationTestCase { private static final String BASEDN_STRING = "dc=example,dc=com"; private static final short DS1_ID = 1; private static final short DS2_ID = 2; private static final short RS1_ID = 11; private static final short RS2_ID = 12; private int rs1Port = -1; private int rs2Port = -1; private ReplicationDomain rd1 = null; private ReplicationDomain rd2 = null; private ReplicationServer rs1 = null; private ReplicationServer rs2 = null; // The tracer object for the debug logger private static final DebugTracer TRACER = getTracer(); private void debugInfo(String s) { logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); if (debugEnabled()) { TRACER.debugInfo("** TEST **" + s); } } private void debugInfo(String message, Exception e) { debugInfo(message + stackTraceToSingleLineString(e)); } private void initTest() { rs1Port = -1; rs2Port = -1; rd1 = null; rd2 = null; rs1 = null; rs2 = null; findFreePorts(); } private void endTest() { if (rd1 != null) { rd1.shutdown(); rd1 = null; } if (rd2 != null) { rd2.shutdown(); rd2 = null; } if (rs1 != null) { rs1.shutdown(); rs1 = null; } if (rs2 != null) { rs2.shutdown(); rs2 = null; } rs1Port = -1; rs2Port = -1; } /** * Test the failover feature when one RS fails: * 1 DS (DS1) and 2 RS (RS1 and RS2) in topology. * DS1 connected to RS1 (DS1<->RS1) * Both RS are connected together (RS1<->RS2) * RS1 fails, DS1 should be connected to RS2 * * @throws Exception If a problem occured */ @Test public void testFailOverSingle() throws Exception { String testCase = "testFailOverSingle"; debugInfo("Starting " + testCase); initTest(); // Start RS1 rs1 = createReplicationServer(RS1_ID, testCase); // Start RS2 rs2 = createReplicationServer(RS2_ID, testCase); // Start DS1 DN baseDn = DN.decode(BASEDN_STRING); rd1 = createReplicationDomain(baseDn, DS1_ID, testCase); // DS1 connected to RS1 ? String msg = "Before " + RS1_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); // Simulate RS1 failure rs1.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS2 ? msg = "After " + RS1_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); endTest(); } /** * Test the failover feature when one RS fails: * 2 DS (DS1 and DS2) and 2 RS (RS1 and RS2) in topology. * Each DS connected to its own RS (DS1<->RS1, DS2<->RS2) * Both RS are connected together (RS1<->RS2) * RS1 fails, DS1 and DS2 should be both connected to RS2 * RS1 comes back (no change) * RS2 fails, DS1 and DS2 should be both connected to RS1 * * @throws Exception If a problem occured */ @Test(enabled = false) // This test to be run in standalone, not in precommit // because the timing is important as we restart servers after they fail // and thus cannot warrenty that the recovering server is the right one if // the sleep time is not enough with regard to thread scheduling in heavy // precommit environment public void testFailOverMulti() throws Exception { String testCase = "testFailOverMulti"; debugInfo("Starting " + testCase); initTest(); // Start RS1 rs1 = createReplicationServer(RS1_ID, testCase); // Start RS2 rs2 = createReplicationServer(RS2_ID, testCase); // Start DS1 DN baseDn = DN.decode(BASEDN_STRING); rd1 = createReplicationDomain(baseDn, DS1_ID, testCase); // Start DS2 rd2 = createReplicationDomain(baseDn, DS2_ID, testCase); // DS1 connected to RS1 ? String msg = "Before " + RS1_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); // DS2 connected to RS2 ? checkConnection(DS2_ID, RS2_ID, msg); // Simulate RS1 failure rs1.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS2 ? msg = "After " + RS1_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); // DS2 connected to RS2 ? checkConnection(DS2_ID, RS2_ID, msg); // Restart RS1 rs1 = createReplicationServer(RS1_ID, testCase); // Let time for RS1 to restart sleep(5000); // DS1 connected to RS2 ? msg = "Before " + RS2_ID + " failure"; checkConnection(DS1_ID, RS2_ID, msg); // DS2 connected to RS2 ? checkConnection(DS2_ID, RS2_ID, msg); // Simulate RS2 failure rs2.shutdown(); // Let time for failover to happen sleep(5000); // DS1 connected to RS1 ? msg = "After " + RS2_ID + " failure"; checkConnection(DS1_ID, RS1_ID, msg); // DS2 connected to RS1 ? checkConnection(DS2_ID, RS1_ID, msg); // Restart RS2 rs2 = createReplicationServer(RS2_ID, testCase); // Let time for RS2 to restart sleep(5000); // DS1 connected to RS1 ? msg = "After " + RS2_ID + " restart"; checkConnection(DS1_ID, RS1_ID, msg); // DS2 connected to RS1 ? checkConnection(DS2_ID, RS1_ID, msg); endTest(); } private void sleep(long time) { try { Thread.sleep(time); } catch (InterruptedException ex) { fail("Error sleeping " + stackTraceToSingleLineString(ex)); } } /** * Check connection of the provided replication domain to the provided * replication server. */ private void checkConnection(short dsId, short rsId, String msg) { int rsPort = -1; ReplicationDomain rd = null; if (dsId == DS1_ID) { rd = rd1; } else if (dsId == DS2_ID) { rd = rd2; } else { fail("Unknown replication domain server id."); } if (rsId == RS1_ID) { rsPort = rs1Port; } else if (rsId == RS2_ID) { rsPort = rs2Port; } else { fail("Unknown replication server id."); } // Connected ? assertEquals(rd.isConnected(), true, "Replication domain " + dsId + " is not connected to a replication server (" + msg + ")"); // Right port ? String serverStr = rd.getReplicationServer(); int index = serverStr.lastIndexOf(':'); if ((index == -1) || (index >= serverStr.length())) fail("Enable to find port number in: " + serverStr); String rdPortStr = serverStr.substring(index + 1); int rdPort = -1; try { rdPort = (new Integer(rdPortStr)).intValue(); } catch (Exception e) { fail("Enable to get an int from: " + rdPortStr); } assertEquals(rdPort, rsPort, "Replication domain " + dsId + " is not connected to right replication server port (" + rdPort + ") was expecting " + rsPort + " (" + msg + ")"); } /** * Find needed free TCP ports. */ private void findFreePorts() { try { ServerSocket socket1 = TestCaseUtils.bindFreePort(); ServerSocket socket2 = TestCaseUtils.bindFreePort(); rs1Port = socket1.getLocalPort(); rs2Port = socket2.getLocalPort(); socket1.close(); socket2.close(); } catch (IOException e) { fail("Unable to determinate some free ports " + stackTraceToSingleLineString(e)); } } /** * Creates a new ReplicationServer. */ private ReplicationServer createReplicationServer(short serverId, String suffix) { SortedSet<String> replServers = new TreeSet<String>(); try { int port = -1; if (serverId == RS1_ID) { port = rs1Port; replServers.add("localhost:" + rs2Port); } else if (serverId == RS2_ID) { port = rs2Port; replServers.add("localhost:" + rs1Port); } else { fail("Unknown replication server id."); } String dir = "genid" + serverId + suffix + "Db"; ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100, replServers); ReplicationServer replicationServer = new ReplicationServer(conf); return replicationServer; } catch (Exception e) { fail("createReplicationServer " + stackTraceToSingleLineString(e)); } return null; } /** * Creates a new ReplicationDomain. */ private ReplicationDomain createReplicationDomain(DN baseDn, short serverId, String suffix) { SortedSet<String> replServers = new TreeSet<String>(); try { if (serverId == DS1_ID) { replServers.add("localhost:" + rs1Port); } else if (serverId == DS2_ID) { replServers.add("localhost:" + rs2Port); } else { fail("Unknown replication domain server id."); } DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, serverId, replServers); //domainConf.setHeartbeatInterval(500); ReplicationDomain replicationDomain = MultimasterReplication.createNewDomain(domainConf); // Add other server (doing that after connection insure we connect to // the right server) // WARNING: only works because for the moment, applying changes to conf // does not force reconnection in replication domain // when it is coded, the reconnect may 1 of both servers and we can not // guaranty anymore that we reach the server we want at the beginning. if (serverId == DS1_ID) { replServers.add("localhost:" + rs2Port); } else if (serverId == DS2_ID) { replServers.add("localhost:" + rs1Port); } else { fail("Unknown replication domain server id."); } domainConf = new DomainFakeCfg(baseDn, serverId, replServers); ConfigChangeResult chgRes = replicationDomain.applyConfigurationChange(domainConf); if ((chgRes == null) || (!chgRes.getResultCode().equals(ResultCode.SUCCESS))) { fail("Could not change replication domain config" + " (add some replication servers)."); } return replicationDomain; } catch (Exception e) { fail("createReplicationDomain " + stackTraceToSingleLineString(e)); } return null; } /** * Set up the environment. * * @throws Exception * If the environment could not be set up. */ @BeforeClass @Override public void setUp() throws Exception { super.setUp(); // In case we need to extend } /** * Clean up the environment. * * @throws Exception * If the environment could not be set up. */ @AfterClass @Override public void classCleanUp() throws Exception { super.classCleanUp(); // In case we need it extend } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -785,7 +785,8 @@ } else { fail("ReplicationServer transmission failed: no expected message class."); fail("ReplicationServer transmission failed: no expected message" + " class: " + msg2); break; } }