opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -368,6 +368,22 @@ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 51; /** * A replication server received a null messsage from * another server. */ public static final int MSGID_READER_NULL_MSG = CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 52; /** * A server disconnected from the replication server. * (this is an informational message) */ public static final int MSGID_READER_EXCEPTION = CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 53; /** * Register the messages from this class in the core server. * */ @@ -376,7 +392,7 @@ registerMessage(MSGID_SYNC_INVALID_DN, "The configured DN is already used by another domain"); registerMessage(MSGID_INVALID_CHANGELOG_SERVER, "Invalid changelog server configuration"); "Invalid replication server configuration"); registerMessage(MSGID_UNKNOWN_HOSTNAME, "Changelog failed to start because the hostname is unknown"); registerMessage(MSGID_COULD_NOT_BIND_CHANGELOG, @@ -398,31 +414,31 @@ registerMessage(MSGID_EXCEPTION_REPLAYING_OPERATION, "An Exception was caught while replaying operation %s : %s"); registerMessage(MSGID_NEED_CHANGELOG_PORT, "The Changelog server port must be defined"); "The replication server port must be defined"); registerMessage(MSGID_ERROR_UPDATING_RUV, "Error %s when updating server state %s : %s base dn : %s"); registerMessage(MSGID_ERROR_SEARCHING_RUV, "Error %s when searching for server state %s : %s base dn : %s"); registerMessage(MSGID_SERVER_DISCONNECT, "%s has disconnected from this changelog server"); "%s has disconnected from this replication server"); registerMessage(MSGID_NO_CHANGELOG_SERVER_LISTENING, "There is no changelog server listening on %s"); "There is no replication server listening on %s"); registerMessage(MSGID_CHANGELOG_MISSING_CHANGES, "The changelog server %s is missing some changes that this server" + "The replication server %s is missing some changes that this server" + " has already processed"); registerMessage(MSGID_NEED_MORE_THAN_ONE_CHANGELOG_SERVER, "More than one changelog server should be configured"); "More than one replication server should be configured"); registerMessage(MSGID_EXCEPTION_STARTING_SESSION, "Caught Exception during initial communication with " + "changelog server : "); "replication server : "); registerMessage(MSGID_CANNOT_RECOVER_CHANGES, "Error when searching old changes from the database. "); registerMessage( MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES, "Could not find a changelog server that has seen all the local" + "Could not find a replication server that has seen all the local" + " changes. Going to replay changes"); registerMessage(MSGID_COULD_NOT_FIND_CHANGELOG, "Could not connect to any changelog server, retrying..."); "Could not connect to any replication server, retrying..."); registerMessage(MSGID_EXCEPTION_CLOSING_DATABASE, "Error closing changelog database %s : "); registerMessage(MSGID_EXCEPTION_DECODING_OPERATION, @@ -458,33 +474,33 @@ "An Exception was caught while testing existence or trying " + " to create the directory for the changelog database : %s"); registerMessage(MSGID_CHANGELOG_SERVER_ATTR, "Specifies the list of Changelog Servers to which this" + " Changelog Server should connect. Each value of this attribute" + "Specifies the list of replication servers to which this" + " replication server should connect. Each value of this attribute" + " should contain a values build with the hostname and the port" + " number of the remote server separated with a \":\""); registerMessage(MSGID_SERVER_ID_ATTR, "Specifies the server ID. Each Changelog Server in the topology" + "Specifies the server ID. Each replication server in the topology" + " Must be assigned a unique server ID in the topology"); registerMessage(MSGID_CHANGELOG_PORT_ATTR, "Specifies the port number that the changelog server will use to" + "Specifies the port number that the replication server will use to" + " listen for connections from LDAP servers"); registerMessage(MSGID_WINDOW_SIZE_ATTR, "Specifies the receive window size of the changelog server"); "Specifies the receive window size of the replication server"); registerMessage(MSGID_QUEUE_SIZE_ATTR, "Specifies the receive queue size of the changelog server." + " The Changelog servers will queue up to this number of messages" + "Specifies the receive queue size of the replication server." + " The replication servers will queue up to this number of messages" + " in its memory queue and save the older messages to persistent" + " storage. Using a larger size may improve performances when" + " The replication delay is larger than this size but at the cost" + " of using more memory"); registerMessage(MSGID_CHANGELOG_DIR_PATH_ATTR, "Specifies the Changelog Server directory. The Changelog server" + "Specifies the replication server directory. The replication server" + " will create all persistent storage below this path"); registerMessage(MSGID_PURGE_DELAY_ATTR, "Specifies the Changelog Purge Delay, The Changelog servers will" + "Specifies the Changelog Purge Delay, The replication servers will" + " keep all changes up to this amount of time before deleting them." + " This values defines the maximum age of a backup that can be" + " restored because changelog servers would not be able to refresh" + " restored because replication servers would not be able to refresh" + " LDAP servers with older versions of the data. A zero value" + " can be used to specify an infinite delay (or never purge)"); registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED, @@ -504,5 +520,9 @@ "The provider class does not allow the operation requested"); registerMessage(MSGID_COULD_NOT_SOLVE_HOSTNAME, "The hostname %s could not be resolved as an IP address"); registerMessage(MSGID_READER_NULL_MSG, "Received a Null Msg from %s"); registerMessage(MSGID_READER_EXCEPTION, "Exception when reading messages from %s"); } } opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -27,11 +27,14 @@ package org.opends.server.replication.plugin; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; @@ -42,6 +45,11 @@ */ public class ListenerThread extends DirectoryThread { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private ReplicationDomain listener; private boolean shutdown = false; @@ -70,23 +78,38 @@ public void run() { UpdateMessage msg; boolean done = false; try if (debugEnabled()) { while (((msg = listener.receive()) != null) && (shutdown == false)) TRACER.debugInfo("Replication Listener thread starting."); } while (!done) { try { listener.replay(msg); while (((msg = listener.receive()) != null) && (shutdown == false)) { listener.replay(msg); } done = true; } catch (Exception e) { /* * catch all exceptions happening in listener.receive and * listener.replay so that the thread never dies even in case * of problems. */ int msgID = MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE; String message = getMessage(msgID, stackTraceToSingleLineString(e)); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } } catch (Exception e) } if (debugEnabled()) { /* * catch all exceptions happening in listener.receive and listener.replay * so that the thread never dies even in case of problems. */ int msgID = MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE; String message = getMessage(msgID, stackTraceToSingleLineString(e)); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); TRACER.debugInfo("Replication Listener thread stopping."); } } } opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -207,7 +207,7 @@ } boolean checkState = true; while( !connected) while ((!connected) && (!shutdown)) { for (String server : servers) { opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -126,6 +126,12 @@ */ private static final DebugTracer TRACER = getTracer(); /** * on shutdown, the server will wait for existing threads to stop * during this timeout (in ms). */ private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; private ReplicationMonitor monitor; private ReplicationBroker broker; @@ -1047,6 +1053,12 @@ // stop the ReplicationBroker broker.stop(); // wait for the listener thread to stop for (ListenerThread thread : synchroThreads) { thread.shutdown(); } } /** @@ -1745,6 +1757,17 @@ thread.shutdown(); } broker.stop(); // this will cut the session and wake-up the listeners for (ListenerThread thread : synchroThreads) { try { thread.join(SHUTDOWN_JOIN_TIMEOUT); } catch (InterruptedException e) { // ignore } } } /** opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -29,6 +29,7 @@ import org.opends.server.api.DirectoryThread; import static org.opends.server.loggers.debug.DebugLogger.*; import org.opends.server.loggers.debug.DebugTracer; import java.io.IOException; @@ -66,7 +67,7 @@ /** * Set this to stop the thread. */ private boolean shutdown = false; private Boolean shutdown = false; /** @@ -133,7 +134,14 @@ { TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime); } Thread.sleep(sleepTime); synchronized (shutdown) { if (!shutdown) { shutdown.wait(sleepTime); } } } catch (InterruptedException e) { @@ -161,10 +169,23 @@ /** * Call this method to stop the thread. * This method is blocking until the thread has stopped. */ public void shutdown() { shutdown = true; synchronized (shutdown) { shutdown.notifyAll(); shutdown = true; if (debugEnabled()) { TRACER.debugInfo("Going to notify Heartbeat thread."); } } if (debugEnabled()) { TRACER.debugInfo("Returning from Heartbeat shutdown."); } } opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -26,6 +26,9 @@ */ package org.opends.server.replication.protocol; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -33,6 +36,8 @@ import java.net.SocketException; import java.util.zip.DataFormatException; import org.opends.server.loggers.debug.DebugTracer; /** * This class Implement a protocol session using a basic socket and relying on * the innate encoding/decoding capabilities of the ReplicationMessage @@ -44,6 +49,11 @@ */ public class SocketSession implements ProtocolSession { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private Socket socket; private InputStream input; private OutputStream output; @@ -83,6 +93,10 @@ */ public void close() throws IOException { if (debugEnabled()) { TRACER.debugVerbose("Closing SocketSession."); } socket.close(); } opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -28,6 +28,7 @@ import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.*; import org.opends.server.loggers.debug.DebugTracer; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.messages.ReplicationMessages.*; @@ -79,6 +80,12 @@ */ private static final DebugTracer TRACER = getTracer(); /** * Time during which the server will wait for existing thread to stop * during the shutdown. */ private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; private short serverId; private ProtocolSession session; private final MsgQueue msgQueue = new MsgQueue(); @@ -747,7 +754,7 @@ private UpdateMessage getnextMessage() { UpdateMessage msg; do while (active == true) { if (following == false) { @@ -884,7 +891,7 @@ * the first check at the beginning of this method * and the second check just above. */ } while (active == true); } return null; } @@ -905,6 +912,15 @@ public void stopHandler() { active = false; try { session.close(); } catch (IOException e) { // ignore. } synchronized (msgQueue) { /* wake up the writer thread on an empty queue so that it disappear */ @@ -1218,7 +1234,17 @@ { // Service is closing. } stopHandler(); try { writer.join(SHUTDOWN_JOIN_TIMEOUT); reader.join(SHUTDOWN_JOIN_TIMEOUT); } catch (InterruptedException e) { // don't try anymore to join and return. } } /** opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -29,6 +29,8 @@ import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.messages.ReplicationMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; import java.io.IOException; @@ -45,6 +47,7 @@ import org.opends.server.replication.protocol.WindowMessage; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.loggers.debug.DebugTracer; /** @@ -59,6 +62,11 @@ */ public class ServerReader extends DirectoryThread { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private short serverId; private ProtocolSession session; private ServerHandler handler; @@ -87,8 +95,18 @@ */ public void run() { if (debugEnabled()) { if (handler.isReplicationServer()) { TRACER.debugInfo("Replication server reader starting " + serverId); } else { TRACER.debugInfo("LDAP server reader starting " + serverId); } } /* * TODO : catch exceptions in case of bugs * wait on input stream * grab all incoming messages and publish them to the replicationCache */ @@ -98,12 +116,6 @@ { ReplicationMessage msg = session.receive(); if (msg == null) { // TODO : generate error in the log // make sure that connection is closed return; } if (msg instanceof AckMessage) { AckMessage ack = (AckMessage) msg; @@ -147,7 +159,19 @@ ErrorMessage errorMsg = (ErrorMessage) msg; handler.process(errorMsg); } else if (msg == null) { /* * The remote server has sent an unknown message, * close the conenction. */ int msgID = MSGID_READER_NULL_MSG; String message = getMessage(msgID, handler.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); return; } } } catch (IOException e) { @@ -160,7 +184,7 @@ String message = getMessage(msgID, handler.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.NOTICE, message, msgID); message + e.getMessage(), msgID); } catch (ClassNotFoundException e) { /* @@ -174,7 +198,15 @@ message, msgID); } catch (Exception e) { /* * The remote server has sent an unknown message, * close the conenction. */ int msgID = MSGID_READER_EXCEPTION; String message = getMessage(msgID, handler.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } finally { @@ -192,5 +224,16 @@ } replicationCache.stopServer(handler); } if (debugEnabled()) { if (handler.isReplicationServer()) { TRACER.debugInfo("Replication server reader stopping " + serverId); } else { TRACER.debugInfo("LDAP server reader stopping " + serverId); } } } } opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -27,6 +27,8 @@ package org.opends.server.replication.server; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.messages.ReplicationMessages.*; @@ -35,6 +37,7 @@ import java.util.NoSuchElementException; import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.types.ErrorLogCategory; @@ -47,9 +50,15 @@ */ public class ServerWriter extends DirectoryThread { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private ProtocolSession session; private ServerHandler handler; private ReplicationCache replicationCache; private short serverId; /** * Create a ServerWriter. @@ -66,6 +75,7 @@ { super(handler.toString() + " writer"); this.serverId = serverId; this.session = session; this.handler = handler; this.replicationCache = replicationCache; @@ -78,6 +88,17 @@ */ public void run() { if (debugEnabled()) { if (handler.isReplicationServer()) { TRACER.debugInfo("Replication server writer starting " + serverId); } else { TRACER.debugInfo("LDAP server writer starting " + serverId); } } try { while (true) { @@ -132,6 +153,18 @@ // Can't do much more : ignore } replicationCache.stopServer(handler); if (debugEnabled()) { if (handler.isReplicationServer()) { TRACER.debugInfo("Replication server writer stopping " + serverId); } else { TRACER.debugInfo("LDAP server writer stopping " + serverId); } } } } }