From a6be5db964ffa77a68b91966d99f6fa0b36b532e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 29 May 2007 09:27:46 +0000
Subject: [PATCH] Fix for 1561 : ReplicationDomain.disable() should wait for all threads to die be fore returning
---
opends/src/server/org/opends/server/messages/ReplicationMessages.java | 58 +++++++---
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 2
opends/src/server/org/opends/server/replication/protocol/SocketSession.java | 14 ++
opends/src/server/org/opends/server/replication/server/ServerWriter.java | 33 ++++++
opends/src/server/org/opends/server/replication/plugin/ListenerThread.java | 47 +++++++--
opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java | 27 ++++
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 30 +++++
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 23 ++++
opends/src/server/org/opends/server/replication/server/ServerReader.java | 63 ++++++++++--
9 files changed, 250 insertions(+), 47 deletions(-)
diff --git a/opends/src/server/org/opends/server/messages/ReplicationMessages.java b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
index 48f7704e..68bcf69 100644
--- a/opends/src/server/org/opends/server/messages/ReplicationMessages.java
+++ b/opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -368,6 +368,22 @@
CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 51;
/**
+ * A replication server received a null messsage from
+ * another server.
+ */
+ public static final int MSGID_READER_NULL_MSG =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 52;
+
+ /**
+ * A server disconnected from the replication server.
+ * (this is an informational message)
+ */
+ public static final int MSGID_READER_EXCEPTION =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 53;
+
+
+
+ /**
* Register the messages from this class in the core server.
*
*/
@@ -376,7 +392,7 @@
registerMessage(MSGID_SYNC_INVALID_DN,
"The configured DN is already used by another domain");
registerMessage(MSGID_INVALID_CHANGELOG_SERVER,
- "Invalid changelog server configuration");
+ "Invalid replication server configuration");
registerMessage(MSGID_UNKNOWN_HOSTNAME,
"Changelog failed to start because the hostname is unknown");
registerMessage(MSGID_COULD_NOT_BIND_CHANGELOG,
@@ -398,31 +414,31 @@
registerMessage(MSGID_EXCEPTION_REPLAYING_OPERATION,
"An Exception was caught while replaying operation %s : %s");
registerMessage(MSGID_NEED_CHANGELOG_PORT,
- "The Changelog server port must be defined");
+ "The replication server port must be defined");
registerMessage(MSGID_ERROR_UPDATING_RUV,
"Error %s when updating server state %s : %s base dn : %s");
registerMessage(MSGID_ERROR_SEARCHING_RUV,
"Error %s when searching for server state %s : %s base dn : %s");
registerMessage(MSGID_SERVER_DISCONNECT,
- "%s has disconnected from this changelog server");
+ "%s has disconnected from this replication server");
registerMessage(MSGID_NO_CHANGELOG_SERVER_LISTENING,
- "There is no changelog server listening on %s");
+ "There is no replication server listening on %s");
registerMessage(MSGID_CHANGELOG_MISSING_CHANGES,
- "The changelog server %s is missing some changes that this server" +
+ "The replication server %s is missing some changes that this server" +
" has already processed");
registerMessage(MSGID_NEED_MORE_THAN_ONE_CHANGELOG_SERVER,
- "More than one changelog server should be configured");
+ "More than one replication server should be configured");
registerMessage(MSGID_EXCEPTION_STARTING_SESSION,
"Caught Exception during initial communication with " +
- "changelog server : ");
+ "replication server : ");
registerMessage(MSGID_CANNOT_RECOVER_CHANGES,
"Error when searching old changes from the database. ");
registerMessage(
MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES,
- "Could not find a changelog server that has seen all the local" +
+ "Could not find a replication server that has seen all the local" +
" changes. Going to replay changes");
registerMessage(MSGID_COULD_NOT_FIND_CHANGELOG,
- "Could not connect to any changelog server, retrying...");
+ "Could not connect to any replication server, retrying...");
registerMessage(MSGID_EXCEPTION_CLOSING_DATABASE,
"Error closing changelog database %s : ");
registerMessage(MSGID_EXCEPTION_DECODING_OPERATION,
@@ -458,33 +474,33 @@
"An Exception was caught while testing existence or trying " +
" to create the directory for the changelog database : %s");
registerMessage(MSGID_CHANGELOG_SERVER_ATTR,
- "Specifies the list of Changelog Servers to which this" +
- " Changelog Server should connect. Each value of this attribute" +
+ "Specifies the list of replication servers to which this" +
+ " replication server should connect. Each value of this attribute" +
" should contain a values build with the hostname and the port" +
" number of the remote server separated with a \":\"");
registerMessage(MSGID_SERVER_ID_ATTR,
- "Specifies the server ID. Each Changelog Server in the topology" +
+ "Specifies the server ID. Each replication server in the topology" +
" Must be assigned a unique server ID in the topology");
registerMessage(MSGID_CHANGELOG_PORT_ATTR,
- "Specifies the port number that the changelog server will use to" +
+ "Specifies the port number that the replication server will use to" +
" listen for connections from LDAP servers");
registerMessage(MSGID_WINDOW_SIZE_ATTR,
- "Specifies the receive window size of the changelog server");
+ "Specifies the receive window size of the replication server");
registerMessage(MSGID_QUEUE_SIZE_ATTR,
- "Specifies the receive queue size of the changelog server." +
- " The Changelog servers will queue up to this number of messages" +
+ "Specifies the receive queue size of the replication server." +
+ " The replication servers will queue up to this number of messages" +
" in its memory queue and save the older messages to persistent" +
" storage. Using a larger size may improve performances when" +
" The replication delay is larger than this size but at the cost" +
" of using more memory");
registerMessage(MSGID_CHANGELOG_DIR_PATH_ATTR,
- "Specifies the Changelog Server directory. The Changelog server" +
+ "Specifies the replication server directory. The replication server" +
" will create all persistent storage below this path");
registerMessage(MSGID_PURGE_DELAY_ATTR,
- "Specifies the Changelog Purge Delay, The Changelog servers will" +
+ "Specifies the Changelog Purge Delay, The replication servers will" +
" keep all changes up to this amount of time before deleting them." +
" This values defines the maximum age of a backup that can be" +
- " restored because changelog servers would not be able to refresh" +
+ " restored because replication servers would not be able to refresh" +
" LDAP servers with older versions of the data. A zero value" +
" can be used to specify an infinite delay (or never purge)");
registerMessage(MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED,
@@ -504,5 +520,9 @@
"The provider class does not allow the operation requested");
registerMessage(MSGID_COULD_NOT_SOLVE_HOSTNAME,
"The hostname %s could not be resolved as an IP address");
+ registerMessage(MSGID_READER_NULL_MSG,
+ "Received a Null Msg from %s");
+ registerMessage(MSGID_READER_EXCEPTION,
+ "Exception when reading messages from %s");
}
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java b/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
index 5300d58..1e626cf 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -27,11 +27,14 @@
package org.opends.server.replication.plugin;
import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.api.DirectoryThread;
+import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -42,6 +45,11 @@
*/
public class ListenerThread extends DirectoryThread
{
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
private ReplicationDomain listener;
private boolean shutdown = false;
@@ -70,23 +78,38 @@
public void run()
{
UpdateMessage msg;
+ boolean done = false;
- try
+ if (debugEnabled())
{
- while (((msg = listener.receive()) != null) && (shutdown == false))
+ TRACER.debugInfo("Replication Listener thread starting.");
+ }
+
+ while (!done)
+ {
+ try
{
- listener.replay(msg);
+ while (((msg = listener.receive()) != null) && (shutdown == false))
+ {
+ listener.replay(msg);
+ }
+ done = true;
+ } catch (Exception e)
+ {
+ /*
+ * catch all exceptions happening in listener.receive and
+ * listener.replay so that the thread never dies even in case
+ * of problems.
+ */
+ int msgID = MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE;
+ String message = getMessage(msgID, stackTraceToSingleLineString(e));
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR, message, msgID);
}
- } catch (Exception e)
+ }
+ if (debugEnabled())
{
- /*
- * catch all exceptions happening in listener.receive and listener.replay
- * so that the thread never dies even in case of problems.
- */
- int msgID = MSGID_EXCEPTION_RECEIVING_REPLICATION_MESSAGE;
- String message = getMessage(msgID, stackTraceToSingleLineString(e));
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+ TRACER.debugInfo("Replication Listener thread stopping.");
}
}
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index a27dc96..a67fdc3 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -207,7 +207,7 @@
}
boolean checkState = true;
- while( !connected)
+ while ((!connected) && (!shutdown))
{
for (String server : servers)
{
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 985d2f0..e601e1f 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -126,6 +126,12 @@
*/
private static final DebugTracer TRACER = getTracer();
+ /**
+ * on shutdown, the server will wait for existing threads to stop
+ * during this timeout (in ms).
+ */
+ private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
+
private ReplicationMonitor monitor;
private ReplicationBroker broker;
@@ -1047,6 +1053,12 @@
// stop the ReplicationBroker
broker.stop();
+
+ // wait for the listener thread to stop
+ for (ListenerThread thread : synchroThreads)
+ {
+ thread.shutdown();
+ }
}
/**
@@ -1745,6 +1757,17 @@
thread.shutdown();
}
broker.stop(); // this will cut the session and wake-up the listeners
+
+ for (ListenerThread thread : synchroThreads)
+ {
+ try
+ {
+ thread.join(SHUTDOWN_JOIN_TIMEOUT);
+ } catch (InterruptedException e)
+ {
+ // ignore
+ }
+ }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java b/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
index 9723814..c246b4c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
+++ b/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -29,6 +29,7 @@
import org.opends.server.api.DirectoryThread;
import static org.opends.server.loggers.debug.DebugLogger.*;
+
import org.opends.server.loggers.debug.DebugTracer;
import java.io.IOException;
@@ -66,7 +67,7 @@
/**
* Set this to stop the thread.
*/
- private boolean shutdown = false;
+ private Boolean shutdown = false;
/**
@@ -133,7 +134,14 @@
{
TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
}
- Thread.sleep(sleepTime);
+
+ synchronized (shutdown)
+ {
+ if (!shutdown)
+ {
+ shutdown.wait(sleepTime);
+ }
+ }
}
catch (InterruptedException e)
{
@@ -161,10 +169,23 @@
/**
* Call this method to stop the thread.
+ * This method is blocking until the thread has stopped.
*/
public void shutdown()
{
- shutdown = true;
+ synchronized (shutdown)
+ {
+ shutdown.notifyAll();
+ shutdown = true;
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Going to notify Heartbeat thread.");
+ }
+ }
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Returning from Heartbeat shutdown.");
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 0c53005..7d0a90f 100644
--- a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -26,6 +26,9 @@
*/
package org.opends.server.replication.protocol;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -33,6 +36,8 @@
import java.net.SocketException;
import java.util.zip.DataFormatException;
+import org.opends.server.loggers.debug.DebugTracer;
+
/**
* This class Implement a protocol session using a basic socket and relying on
* the innate encoding/decoding capabilities of the ReplicationMessage
@@ -44,6 +49,11 @@
*/
public class SocketSession implements ProtocolSession
{
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
private Socket socket;
private InputStream input;
private OutputStream output;
@@ -83,6 +93,10 @@
*/
public void close() throws IOException
{
+ if (debugEnabled())
+ {
+ TRACER.debugVerbose("Closing SocketSession.");
+ }
socket.close();
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a098c4b..c9d2835 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -28,6 +28,7 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
+
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ReplicationMessages.*;
@@ -79,6 +80,12 @@
*/
private static final DebugTracer TRACER = getTracer();
+ /**
+ * Time during which the server will wait for existing thread to stop
+ * during the shutdown.
+ */
+ private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
+
private short serverId;
private ProtocolSession session;
private final MsgQueue msgQueue = new MsgQueue();
@@ -747,7 +754,7 @@
private UpdateMessage getnextMessage()
{
UpdateMessage msg;
- do
+ while (active == true)
{
if (following == false)
{
@@ -884,7 +891,7 @@
* the first check at the beginning of this method
* and the second check just above.
*/
- } while (active == true);
+ }
return null;
}
@@ -905,6 +912,15 @@
public void stopHandler()
{
active = false;
+
+ try
+ {
+ session.close();
+ } catch (IOException e)
+ {
+ // ignore.
+ }
+
synchronized (msgQueue)
{
/* wake up the writer thread on an empty queue so that it disappear */
@@ -1218,7 +1234,17 @@
{
// Service is closing.
}
+
stopHandler();
+
+ try
+ {
+ writer.join(SHUTDOWN_JOIN_TIMEOUT);
+ reader.join(SHUTDOWN_JOIN_TIMEOUT);
+ } catch (InterruptedException e)
+ {
+ // don't try anymore to join and return.
+ }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 3c72463..41fb483 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -29,6 +29,8 @@
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
import java.io.IOException;
@@ -45,6 +47,7 @@
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.loggers.debug.DebugTracer;
/**
@@ -59,6 +62,11 @@
*/
public class ServerReader extends DirectoryThread
{
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
private short serverId;
private ProtocolSession session;
private ServerHandler handler;
@@ -87,8 +95,18 @@
*/
public void run()
{
+ if (debugEnabled())
+ {
+ if (handler.isReplicationServer())
+ {
+ TRACER.debugInfo("Replication server reader starting " + serverId);
+ }
+ else
+ {
+ TRACER.debugInfo("LDAP server reader starting " + serverId);
+ }
+ }
/*
- * TODO : catch exceptions in case of bugs
* wait on input stream
* grab all incoming messages and publish them to the replicationCache
*/
@@ -98,12 +116,6 @@
{
ReplicationMessage msg = session.receive();
- if (msg == null)
- {
- // TODO : generate error in the log
- // make sure that connection is closed
- return;
- }
if (msg instanceof AckMessage)
{
AckMessage ack = (AckMessage) msg;
@@ -147,7 +159,19 @@
ErrorMessage errorMsg = (ErrorMessage) msg;
handler.process(errorMsg);
}
-
+ else if (msg == null)
+ {
+ /*
+ * The remote server has sent an unknown message,
+ * close the conenction.
+ */
+ int msgID = MSGID_READER_NULL_MSG;
+ String message = getMessage(msgID, handler.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ return;
+ }
}
} catch (IOException e)
{
@@ -160,7 +184,7 @@
String message = getMessage(msgID, handler.toString());
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.NOTICE,
- message, msgID);
+ message + e.getMessage(), msgID);
} catch (ClassNotFoundException e)
{
/*
@@ -174,7 +198,15 @@
message, msgID);
} catch (Exception e)
{
-
+ /*
+ * The remote server has sent an unknown message,
+ * close the conenction.
+ */
+ int msgID = MSGID_READER_EXCEPTION;
+ String message = getMessage(msgID, handler.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
}
finally
{
@@ -192,5 +224,16 @@
}
replicationCache.stopServer(handler);
}
+ if (debugEnabled())
+ {
+ if (handler.isReplicationServer())
+ {
+ TRACER.debugInfo("Replication server reader stopping " + serverId);
+ }
+ else
+ {
+ TRACER.debugInfo("LDAP server reader stopping " + serverId);
+ }
+ }
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 5344e20..6018ed9 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -27,6 +27,8 @@
package org.opends.server.replication.server;
import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ReplicationMessages.*;
@@ -35,6 +37,7 @@
import java.util.NoSuchElementException;
import org.opends.server.api.DirectoryThread;
+import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.types.ErrorLogCategory;
@@ -47,9 +50,15 @@
*/
public class ServerWriter extends DirectoryThread
{
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
private ProtocolSession session;
private ServerHandler handler;
private ReplicationCache replicationCache;
+ private short serverId;
/**
* Create a ServerWriter.
@@ -66,6 +75,7 @@
{
super(handler.toString() + " writer");
+ this.serverId = serverId;
this.session = session;
this.handler = handler;
this.replicationCache = replicationCache;
@@ -78,6 +88,17 @@
*/
public void run()
{
+ if (debugEnabled())
+ {
+ if (handler.isReplicationServer())
+ {
+ TRACER.debugInfo("Replication server writer starting " + serverId);
+ }
+ else
+ {
+ TRACER.debugInfo("LDAP server writer starting " + serverId);
+ }
+ }
try {
while (true)
{
@@ -132,6 +153,18 @@
// Can't do much more : ignore
}
replicationCache.stopServer(handler);
+
+ if (debugEnabled())
+ {
+ if (handler.isReplicationServer())
+ {
+ TRACER.debugInfo("Replication server writer stopping " + serverId);
+ }
+ else
+ {
+ TRACER.debugInfo("LDAP server writer stopping " + serverId);
+ }
+ }
}
}
}
--
Gitblit v1.10.0