From a733cedcf54ab0b979f7f1b762d086e49bd59d72 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 27 Oct 2009 09:34:04 +0000
Subject: [PATCH] Fix for issue 4316 : Replication takes too much time to shutdown
---
opends/src/server/org/opends/server/replication/server/ServerWriter.java | 2
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 6 -
opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java | 2
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 2
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java | 2
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 2
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 62 ++++++++++-----
opends/src/server/org/opends/server/replication/server/DbHandler.java | 4
opends/src/server/org/opends/server/replication/server/ServerReader.java | 14 +-
opends/src/server/org/opends/server/replication/common/MutableBoolean.java | 67 ++++++++++++++++
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 23 +++++
12 files changed, 145 insertions(+), 45 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/MutableBoolean.java b/opends/src/server/org/opends/server/replication/common/MutableBoolean.java
new file mode 100644
index 0000000..19c9322
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/common/MutableBoolean.java
@@ -0,0 +1,67 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.common;
+
+/**
+ * The MutableBoolean wraps a boolean in a mutable way.
+ * This can be usable when one wishes to use a boolean object with condition
+ * variables.
+ */
+public class MutableBoolean
+{
+ boolean value;
+
+ /**
+ * A MutableBoolean with the given initial value.
+ *
+ * @param value The initial value of the mutable Boolean
+ */
+ public MutableBoolean(boolean value)
+ {
+ this.value = value;
+ }
+
+ /**
+ * Retrieves the current value of this MutableBoolean.
+ *
+ * @return The current value of this MutableBoolean.
+ */
+ public boolean get()
+ {
+ return value;
+ }
+
+ /**
+ * Sets the current value of this MutableBoolean.
+ *
+ * @param value The new value of this MutableBoolean.
+ */
+ public void set(boolean value)
+ {
+ this.value = value;
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 96a48b7..c92f881 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -321,9 +321,9 @@
}
shutdown = true;
- synchronized (this)
+ synchronized (msgQueue)
{
- this.notifyAll();
+ msgQueue.notifyAll();
}
synchronized (this)
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 660521f..1d53436 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -988,7 +988,7 @@
sendWindow = new Semaphore(sendWindowSize);
// create reader
- reader = new ServerReader(session, serverId, this);
+ reader = new ServerReader(session, this);
reader.start();
if (writer == null)
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 31ce932..ee10192 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -194,7 +194,7 @@
}
}
if (replicationServerDomain!=null)
- replicationServerDomain.stopServer(handler);
+ replicationServerDomain.stopServer(handler, false);
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java b/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
index b66e8d0..bd52427 100644
--- a/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
+++ b/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
@@ -93,6 +93,6 @@
public void close()
{
if (handler.getDomain() != null)
- handler.getDomain().stopServer(handler);
+ handler.getDomain().stopServer(handler, false);
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index c36cc3b..c0c1655 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -569,10 +569,6 @@
serverId , this);
connectThread.start();
- // FIXME : Is it better to have the time to receive the ReplServerInfo
- // from all the other replication servers since this info is necessary
- // to route an early received total update request.
- try { Thread.sleep(300);} catch(Exception e) {}
if (debugEnabled())
TRACER.debugInfo("RS " +getMonitorInstanceName()+
" creates listen thread");
@@ -1048,7 +1044,7 @@
// Have a new group id: Disconnect every servers.
for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
{
- replicationServerDomain.stopAllServers();
+ replicationServerDomain.stopAllServers(true);
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index d41ee2c..4ef072c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -808,7 +808,7 @@
cn.toString(), baseDn));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- stopServer(origServer);
+ stopServer(origServer, false);
}
// Mark the ack info object as completed to prevent potential timeout
// code parallel run
@@ -887,7 +887,7 @@
cn.toString(), baseDn));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- stopServer(origServer);
+ stopServer(origServer, false);
}
// Increment assured counters
boolean safeRead =
@@ -979,25 +979,28 @@
for (ReplicationServerHandler handler : replicationServers.values())
{
if (replServers.contains(handler.getServerAddressURL()))
- stopServer(handler);
+ stopServer(handler, false);
}
}
/**
* Stop operations with all servers this domain is connected with (RS and DS).
+ *
+ * @param shutdown A boolean indicating if the stop is due to a
+ * shutdown condition.
*/
- public void stopAllServers()
+ public void stopAllServers(boolean shutdown)
{
// Close session with other replication servers
for (ReplicationServerHandler serverHandler : replicationServers.values())
{
- stopServer(serverHandler);
+ stopServer(serverHandler, shutdown);
}
// Close session with other LDAP servers
for (DataServerHandler serverHandler : directoryServers.values())
{
- stopServer(serverHandler);
+ stopServer(serverHandler, shutdown);
}
}
@@ -1026,9 +1029,11 @@
/**
* Stop operations with a given server.
*
- * @param handler the server for which we want to stop operations
+ * @param handler the server for which we want to stop operations.
+ * @param shutdown A boolean indicating if the stop is due to a
+ * shutdown condition.
*/
- public void stopServer(ServerHandler handler)
+ public void stopServer(ServerHandler handler, boolean shutdown)
{
if (debugEnabled())
TRACER.debugInfo(
@@ -1049,9 +1054,13 @@
{
try
{
+
// Acquire lock on domain (see more details in comment of start()
// method of ServerHandler)
- lock();
+ if (!shutdown)
+ {
+ lock();
+ }
} catch (InterruptedException ex)
{
// Try doing job anyway...
@@ -1066,9 +1075,12 @@
// Check if generation id has to be reset
mayResetGenerationId();
- // Warn our DSs that a RS or DS has quit (does not use this
- // handler as already removed from list)
- buildAndSendTopoInfoToDSs(null);
+ if (!shutdown)
+ {
+ // Warn our DSs that a RS or DS has quit (does not use this
+ // handler as already removed from list)
+ buildAndSendTopoInfoToDSs(null);
+ }
}
} else
{
@@ -1093,10 +1105,13 @@
mayResetGenerationId();
// Update the remote replication servers with our list
// of connected LDAP servers
- buildAndSendTopoInfoToRSs();
- // Warn our DSs that a RS or DS has quit (does not use this
- // handler as already removed from list)
- buildAndSendTopoInfoToDSs(null);
+ if (!shutdown)
+ {
+ buildAndSendTopoInfoToRSs();
+ // Warn our DSs that a RS or DS has quit (does not use this
+ // handler as already removed from list)
+ buildAndSendTopoInfoToDSs(null);
+ }
}
else if (otherHandlers.contains(handler))
{
@@ -1113,7 +1128,10 @@
}
finally
{
- release();
+ if (!shutdown)
+ {
+ release();
+ }
}
}
}
@@ -1710,7 +1728,7 @@
mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
mb2.append(stackTraceToSingleLineString(ioe));
logError(mb2.toMessage());
- stopServer(senderHandler);
+ stopServer(senderHandler, false);
}
} else
{
@@ -1746,8 +1764,8 @@
// an error happened on the sender session trying to recover
// from an error on the receiver session.
// We don't have much solution left beside closing the sessions.
- stopServer(senderHandler);
- stopServer(targetHandler);
+ stopServer(senderHandler, false);
+ stopServer(targetHandler, false);
}
// TODO Handle error properly (sender timeout in addition)
}
@@ -1766,7 +1784,7 @@
// Terminate the assured timer
assuredTimeoutTimer.cancel();
- stopAllServers();
+ stopAllServers(true);
stopDbHandlers();
}
@@ -3163,7 +3181,7 @@
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(rsHandler.getName()));
- stopServer(rsHandler);
+ stopServer(rsHandler, false);
}
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 22bb668..22adad3 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -375,7 +375,7 @@
writer = new ServerWriter(session, serverId,
this, replicationServerDomain);
- reader = new ServerReader(session, serverId, this);
+ reader = new ServerReader(session, this);
reader.start();
writer.start();
@@ -1421,6 +1421,6 @@
public void doStop()
{
if (replicationServerDomain!=null)
- replicationServerDomain.stopServer(this);
+ replicationServerDomain.stopServer(this, false);
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 395b5a8..8121a0d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -58,7 +58,6 @@
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
- private int serverId;
private ProtocolSession session;
private ServerHandler handler;
@@ -66,16 +65,14 @@
* Constructor for the LDAP server reader part of the replicationServer.
*
* @param session The ProtocolSession from which to read the data.
- * @param serverId The server ID of the server from which we read messages.
* @param handler The server handler for this server reader.
*/
- public ServerReader(ProtocolSession session, int serverId,
+ public ServerReader(ProtocolSession session,
ServerHandler handler)
{
super("Replication Reader Thread for RS handler " +
handler.getMonitorInstanceName());
this.session = session;
- this.serverId = serverId;
this.handler = handler;
}
@@ -302,9 +299,12 @@
if (debugEnabled())
TRACER.debugInfo(
"In " + this.getName() + " " + stackTraceToSingleLineString(e));
- errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
- Integer.toString(handler.getReplicationServerId()));
- logError(errMessage);
+ if (!handler.shuttingDown())
+ {
+ errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
+ Integer.toString(handler.getReplicationServerId()));
+ logError(errMessage);
+ }
}
catch (ClassNotFoundException e)
{
diff --git a/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 80ad900..1bdce8c 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -246,7 +246,7 @@
{
// Can't do much more : ignore
}
- replicationServerDomain.stopServer(handler);
+ replicationServerDomain.stopServer(handler, false);
if (debugEnabled())
{
TRACER.debugInfo(this.getName() + " stopped " + errMessage);
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 9b8f67d..992fe7c 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -79,6 +79,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
@@ -309,7 +310,7 @@
* This object is used as a conditional event to be notified about
* the reception of monitor information from the Replication Server.
*/
- private Object monitorResponse = new Object();
+ private final MutableBoolean monitorResponse = new MutableBoolean(false);
/**
@@ -585,6 +586,8 @@
*/
public Map<Integer, ServerState> getReplicaStates()
{
+ monitorResponse.set(false);
+
// publish Monitor Request Message to the Replication Server
broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
@@ -593,7 +596,10 @@
{
synchronized (monitorResponse)
{
- monitorResponse.wait(10000);
+ if (monitorResponse.get() == false)
+ {
+ monitorResponse.wait(10000);
+ }
}
} catch (InterruptedException e)
{}
@@ -844,6 +850,7 @@
// Notify the sender that the response was received.
synchronized (monitorResponse)
{
+ monitorResponse.set(true);
monitorResponse.notify();
}
}
@@ -1901,6 +1908,18 @@
disableService();
enableService();
+ // wait for the domain to reconnect.
+ int count = 0;
+ while (!isConnected() && (count < 10))
+ {
+ try
+ {
+ Thread.sleep(100);
+ } catch (InterruptedException e)
+ {
+ }
+ }
+
resetGenerationId(getGenerationID());
// check that at least one ReplicationServer did change its generation-id
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index d4e3302..0620f0a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -419,7 +419,7 @@
openReplicationSession(DN.decode(TEST_ROOT_DN_STRING), 3,
100, replicationServerPort, 5000, state);
- assertTrue(broker.isConnected());
+ assertTrue(broker.isConnected(), "Broker could not connect to RS");
ReplicationMsg msg2 = broker.receive();
broker.updateWindowAfterReplay();
--
Gitblit v1.10.0