opends/src/server/org/opends/server/replication/common/MutableBoolean.java
New file @@ -0,0 +1,67 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.common; /** * The MutableBoolean wraps a boolean in a mutable way. * This can be usable when one wishes to use a boolean object with condition * variables. */ public class MutableBoolean { boolean value; /** * A MutableBoolean with the given initial value. * * @param value The initial value of the mutable Boolean */ public MutableBoolean(boolean value) { this.value = value; } /** * Retrieves the current value of this MutableBoolean. * * @return The current value of this MutableBoolean. */ public boolean get() { return value; } /** * Sets the current value of this MutableBoolean. * * @param value The new value of this MutableBoolean. */ public void set(boolean value) { this.value = value; } } opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -321,9 +321,9 @@ } shutdown = true; synchronized (this) synchronized (msgQueue) { this.notifyAll(); msgQueue.notifyAll(); } synchronized (this) opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -988,7 +988,7 @@ sendWindow = new Semaphore(sendWindowSize); // create reader reader = new ServerReader(session, serverId, this); reader = new ServerReader(session, this); reader.start(); if (writer == null) opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -194,7 +194,7 @@ } } if (replicationServerDomain!=null) replicationServerDomain.stopServer(handler); replicationServerDomain.stopServer(handler, false); } } opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
@@ -93,6 +93,6 @@ public void close() { if (handler.getDomain() != null) handler.getDomain().stopServer(handler); handler.getDomain().stopServer(handler, false); } } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -569,10 +569,6 @@ serverId , this); connectThread.start(); // FIXME : Is it better to have the time to receive the ReplServerInfo // from all the other replication servers since this info is necessary // to route an early received total update request. try { Thread.sleep(300);} catch(Exception e) {} if (debugEnabled()) TRACER.debugInfo("RS " +getMonitorInstanceName()+ " creates listen thread"); @@ -1048,7 +1044,7 @@ // Have a new group id: Disconnect every servers. for (ReplicationServerDomain replicationServerDomain : baseDNs.values()) { replicationServerDomain.stopAllServers(); replicationServerDomain.stopAllServers(true); } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -808,7 +808,7 @@ cn.toString(), baseDn)); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); stopServer(origServer); stopServer(origServer, false); } // Mark the ack info object as completed to prevent potential timeout // code parallel run @@ -887,7 +887,7 @@ cn.toString(), baseDn)); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); stopServer(origServer); stopServer(origServer, false); } // Increment assured counters boolean safeRead = @@ -979,25 +979,28 @@ for (ReplicationServerHandler handler : replicationServers.values()) { if (replServers.contains(handler.getServerAddressURL())) stopServer(handler); stopServer(handler, false); } } /** * Stop operations with all servers this domain is connected with (RS and DS). * * @param shutdown A boolean indicating if the stop is due to a * shutdown condition. */ public void stopAllServers() public void stopAllServers(boolean shutdown) { // Close session with other replication servers for (ReplicationServerHandler serverHandler : replicationServers.values()) { stopServer(serverHandler); stopServer(serverHandler, shutdown); } // Close session with other LDAP servers for (DataServerHandler serverHandler : directoryServers.values()) { stopServer(serverHandler); stopServer(serverHandler, shutdown); } } @@ -1026,9 +1029,11 @@ /** * Stop operations with a given server. * * @param handler the server for which we want to stop operations * @param handler the server for which we want to stop operations. * @param shutdown A boolean indicating if the stop is due to a * shutdown condition. */ public void stopServer(ServerHandler handler) public void stopServer(ServerHandler handler, boolean shutdown) { if (debugEnabled()) TRACER.debugInfo( @@ -1049,9 +1054,13 @@ { try { // Acquire lock on domain (see more details in comment of start() // method of ServerHandler) lock(); if (!shutdown) { lock(); } } catch (InterruptedException ex) { // Try doing job anyway... @@ -1066,9 +1075,12 @@ // Check if generation id has to be reset mayResetGenerationId(); // Warn our DSs that a RS or DS has quit (does not use this // handler as already removed from list) buildAndSendTopoInfoToDSs(null); if (!shutdown) { // Warn our DSs that a RS or DS has quit (does not use this // handler as already removed from list) buildAndSendTopoInfoToDSs(null); } } } else { @@ -1093,10 +1105,13 @@ mayResetGenerationId(); // Update the remote replication servers with our list // of connected LDAP servers buildAndSendTopoInfoToRSs(); // Warn our DSs that a RS or DS has quit (does not use this // handler as already removed from list) buildAndSendTopoInfoToDSs(null); if (!shutdown) { buildAndSendTopoInfoToRSs(); // Warn our DSs that a RS or DS has quit (does not use this // handler as already removed from list) buildAndSendTopoInfoToDSs(null); } } else if (otherHandlers.contains(handler)) { @@ -1113,7 +1128,10 @@ } finally { release(); if (!shutdown) { release(); } } } } @@ -1710,7 +1728,7 @@ mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString())); mb2.append(stackTraceToSingleLineString(ioe)); logError(mb2.toMessage()); stopServer(senderHandler); stopServer(senderHandler, false); } } else { @@ -1746,8 +1764,8 @@ // an error happened on the sender session trying to recover // from an error on the receiver session. // We don't have much solution left beside closing the sessions. stopServer(senderHandler); stopServer(targetHandler); stopServer(senderHandler, false); stopServer(targetHandler, false); } // TODO Handle error properly (sender timeout in addition) } @@ -1766,7 +1784,7 @@ // Terminate the assured timer assuredTimeoutTimer.cancel(); stopAllServers(); stopAllServers(true); stopDbHandlers(); } @@ -3163,7 +3181,7 @@ { TRACER.debugCaught(DebugLogLevel.ERROR, e); logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(rsHandler.getName())); stopServer(rsHandler); stopServer(rsHandler, false); } } } opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -375,7 +375,7 @@ writer = new ServerWriter(session, serverId, this, replicationServerDomain); reader = new ServerReader(session, serverId, this); reader = new ServerReader(session, this); reader.start(); writer.start(); @@ -1421,6 +1421,6 @@ public void doStop() { if (replicationServerDomain!=null) replicationServerDomain.stopServer(this); replicationServerDomain.stopServer(this, false); } } opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -58,7 +58,6 @@ * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private int serverId; private ProtocolSession session; private ServerHandler handler; @@ -66,16 +65,14 @@ * Constructor for the LDAP server reader part of the replicationServer. * * @param session The ProtocolSession from which to read the data. * @param serverId The server ID of the server from which we read messages. * @param handler The server handler for this server reader. */ public ServerReader(ProtocolSession session, int serverId, public ServerReader(ProtocolSession session, ServerHandler handler) { super("Replication Reader Thread for RS handler " + handler.getMonitorInstanceName()); this.session = session; this.serverId = serverId; this.handler = handler; } @@ -302,9 +299,12 @@ if (debugEnabled()) TRACER.debugInfo( "In " + this.getName() + " " + stackTraceToSingleLineString(e)); errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(), Integer.toString(handler.getReplicationServerId())); logError(errMessage); if (!handler.shuttingDown()) { errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(), Integer.toString(handler.getReplicationServerId())); logError(errMessage); } } catch (ClassNotFoundException e) { opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -246,7 +246,7 @@ { // Can't do much more : ignore } replicationServerDomain.stopServer(handler); replicationServerDomain.stopServer(handler, false); if (debugEnabled()) { TRACER.debugInfo(this.getName() + " stopped " + errMessage); opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -79,6 +79,7 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.AssuredMode; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.MutableBoolean; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.common.StatusMachine; @@ -309,7 +310,7 @@ * This object is used as a conditional event to be notified about * the reception of monitor information from the Replication Server. */ private Object monitorResponse = new Object(); private final MutableBoolean monitorResponse = new MutableBoolean(false); /** @@ -585,6 +586,8 @@ */ public Map<Integer, ServerState> getReplicaStates() { monitorResponse.set(false); // publish Monitor Request Message to the Replication Server broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId())); @@ -593,7 +596,10 @@ { synchronized (monitorResponse) { monitorResponse.wait(10000); if (monitorResponse.get() == false) { monitorResponse.wait(10000); } } } catch (InterruptedException e) {} @@ -844,6 +850,7 @@ // Notify the sender that the response was received. synchronized (monitorResponse) { monitorResponse.set(true); monitorResponse.notify(); } } @@ -1901,6 +1908,18 @@ disableService(); enableService(); // wait for the domain to reconnect. int count = 0; while (!isConnected() && (count < 10)) { try { Thread.sleep(100); } catch (InterruptedException e) { } } resetGenerationId(getGenerationID()); // check that at least one ReplicationServer did change its generation-id opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -419,7 +419,7 @@ openReplicationSession(DN.decode(TEST_ROOT_DN_STRING), 3, 100, replicationServerPort, 5000, state); assertTrue(broker.isConnected()); assertTrue(broker.isConnected(), "Broker could not connect to RS"); ReplicationMsg msg2 = broker.receive(); broker.updateWindowAfterReplay();