From 23b1e20ff9fe938572a0b62ec5a12f12154445df Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 04 Jul 2007 15:12:04 +0000
Subject: [PATCH] The problem was that the publisher thread is stuck waiting for the window to re-open on a connection that has been closed without notifying the publisher.
---
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 510 ++++++++++++++++++++++++++++++++------------------------
1 files changed, 294 insertions(+), 216 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index bb298eb..c1fdbb6 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -44,6 +44,7 @@
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -60,6 +61,7 @@
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
+import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ErrorLogCategory;
@@ -83,7 +85,6 @@
private boolean shutdown = false;
private Collection<String> servers;
private boolean connected = false;
- private final Object lock = new Object();
private String replicationServer = "Not connected";
private TreeSet<FakeOperation> replayOperations;
private ProtocolSession session = null;
@@ -120,7 +121,7 @@
private int numLostConnections = 0;
/**
- * When the broker cannort connect to any replication server
+ * When the broker cannot connect to any replication server
* it log an error and keeps continuing every second.
* This boolean is set when the first failure happens and is used
* to avoid repeating the error message for further failure to connect
@@ -129,6 +130,8 @@
*/
private boolean connectionError = false;
+ private Object connectPhaseLock = new Object();
+
/**
* Creates a new ReplicationServer Broker for a particular ReplicationDomain.
*
@@ -217,234 +220,243 @@
boolean checkState = true;
boolean receivedResponse = true;
- while ((!connected) && (!shutdown) && (receivedResponse))
+ synchronized (connectPhaseLock)
{
- receivedResponse = false;
- for (String server : servers)
+ while ((!connected) && (!shutdown) && (receivedResponse))
{
- int separator = server.lastIndexOf(':');
- String port = server.substring(separator + 1);
- String hostname = server.substring(0, separator);
-
- try
+ receivedResponse = false;
+ for (String server : servers)
{
- /*
- * Open a socket connection to the next candidate.
- */
- InetSocketAddress ServerAddr = new InetSocketAddress(
- InetAddress.getByName(hostname), Integer.parseInt(port));
- Socket socket = new Socket();
- socket.setReceiveBufferSize(1000000);
- socket.setTcpNoDelay(true);
- socket.connect(ServerAddr, 500);
- session = new SocketSession(socket);
+ int separator = server.lastIndexOf(':');
+ String port = server.substring(separator + 1);
+ String hostname = server.substring(0, separator);
- /*
- * Send our ServerStartMessage.
- */
- ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
- maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
- halfRcvWindow*2, heartbeatInterval, state,
- protocolVersion);
- session.publish(msg);
-
-
- /*
- * Read the ReplServerStartMessage that should come back.
- */
- session.setSoTimeout(1000);
- startMsg = (ReplServerStartMessage) session.receive();
- receivedResponse = true;
-
- /*
- * We have sent our own protocol version to the replication server.
- * The replication server will use the same one (or an older one
- * if it is an old replication server).
- */
- protocolVersion = ProtocolVersion.minWithCurrent(
- startMsg.getVersion());
- session.setSoTimeout(timeout);
-
- /*
- * We must not publish changes to a replicationServer that has not
- * seen all our previous changes because this could cause some
- * other ldap servers to miss those changes.
- * Check that the ReplicationServer has seen all our previous changes.
- * If not, try another replicationServer.
- * If no other replicationServer has seen all our changes, recover
- * those changes and send them again to any replicationServer.
- */
- ChangeNumber replServerMaxChangeNumber =
- startMsg.getServerState().getMaxChangeNumber(serverID);
- if (replServerMaxChangeNumber == null)
- replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
- ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID);
- if ((ourMaxChangeNumber == null) ||
- (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
+ try
{
- replicationServer = ServerAddr.toString();
- maxSendWindow = startMsg.getWindowSize();
- this.sendWindow = new Semaphore(maxSendWindow);
- connected = true;
- startHeartBeat();
- break;
- }
- else
- {
- if (checkState == true)
+ /*
+ * Open a socket connection to the next candidate.
+ */
+ InetSocketAddress ServerAddr = new InetSocketAddress(
+ InetAddress.getByName(hostname), Integer.parseInt(port));
+ Socket socket = new Socket();
+ socket.setReceiveBufferSize(1000000);
+ socket.setTcpNoDelay(true);
+ socket.connect(ServerAddr, 500);
+ session = new SocketSession(socket);
+
+ /*
+ * Send our ServerStartMessage.
+ */
+ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
+ maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+ halfRcvWindow*2, heartbeatInterval, state,
+ protocolVersion);
+ session.publish(msg);
+
+
+ /*
+ * Read the ReplServerStartMessage that should come back.
+ */
+ session.setSoTimeout(1000);
+ startMsg = (ReplServerStartMessage) session.receive();
+ receivedResponse = true;
+
+ /*
+ * We have sent our own protocol version to the replication server.
+ * The replication server will use the same one (or an older one
+ * if it is an old replication server).
+ */
+ protocolVersion = ProtocolVersion.minWithCurrent(
+ startMsg.getVersion());
+ session.setSoTimeout(timeout);
+
+ /*
+ * We must not publish changes to a replicationServer that has not
+ * seen all our previous changes because this could cause some
+ * other ldap servers to miss those changes.
+ * Check that the ReplicationServer has seen all our previous
+ * changes.
+ * If not, try another replicationServer.
+ * If no other replicationServer has seen all our changes, recover
+ * those changes and send them again to any replicationServer.
+ */
+ ChangeNumber replServerMaxChangeNumber =
+ startMsg.getServerState().getMaxChangeNumber(serverID);
+ if (replServerMaxChangeNumber == null)
+ replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
+ ChangeNumber ourMaxChangeNumber =
+ state.getMaxChangeNumber(serverID);
+ if ((ourMaxChangeNumber == null) ||
+ (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
{
- /* This replicationServer is missing some
- * of our changes, we are going to try another server
- * but before log a notice message
- */
- int msgID = MSGID_CHANGELOG_MISSING_CHANGES;
- String message = getMessage(msgID, server);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- message, msgID);
+ replicationServer = ServerAddr.toString();
+ maxSendWindow = startMsg.getWindowSize();
+ connected = true;
+ startHeartBeat();
+ break;
}
else
{
- replayOperations.clear();
- /*
- * Get all the changes that have not been seen by this
- * replicationServer and update it
- */
- InternalClientConnection conn =
- InternalClientConnection.getRootConnection();
- LDAPFilter filter = LDAPFilter.decode(
- "("+ Historical.HISTORICALATTRIBUTENAME +
- ">=dummy:" + replServerMaxChangeNumber + ")");
- LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
- attrs.add(Historical.HISTORICALATTRIBUTENAME);
- InternalSearchOperation op = conn.processSearch(
- new ASN1OctetString(baseDn.toString()),
- SearchScope.WHOLE_SUBTREE,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- 0, 0, false, filter,
- attrs, this);
- if (op.getResultCode() != ResultCode.SUCCESS)
+ if (checkState == true)
{
- /*
- * An error happened trying to search for the updates
- * This server therefore can't start acepting new updates.
- * TODO : should stop the LDAP server (how to ?)
+ /* This replicationServer is missing some
+ * of our changes, we are going to try another server
+ * but before log a notice message
*/
- int msgID = MSGID_CANNOT_RECOVER_CHANGES;
- String message = getMessage(msgID);
+ int msgID = MSGID_CHANGELOG_MISSING_CHANGES;
+ String message = getMessage(msgID, server);
logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.FATAL_ERROR,
- message, msgID);
+ ErrorLogSeverity.NOTICE,
+ message, msgID);
}
else
{
- replicationServer = ServerAddr.toString();
- maxSendWindow = startMsg.getWindowSize();
- this.sendWindow = new Semaphore(maxSendWindow);
- connected = true;
- for (FakeOperation replayOp : replayOperations)
+ replayOperations.clear();
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "going to search for changes", 1);
+ /*
+ * Get all the changes that have not been seen by this
+ * replicationServer and update it
+ */
+ InternalClientConnection conn =
+ InternalClientConnection.getRootConnection();
+ LDAPFilter filter = LDAPFilter.decode(
+ "("+ Historical.HISTORICALATTRIBUTENAME +
+ ">=dummy:" + replServerMaxChangeNumber + ")");
+ LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+ attrs.add(Historical.HISTORICALATTRIBUTENAME);
+ InternalSearchOperation op = conn.processSearch(
+ new ASN1OctetString(baseDn.toString()),
+ SearchScope.WHOLE_SUBTREE,
+ DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, 0, false, filter,
+ attrs, this);
+ if (op.getResultCode() != ResultCode.SUCCESS)
{
- publish(replayOp.generateMessage());
+ /*
+ * An error happened trying to search for the updates
+ * This server will start acepting again new updates but
+ * some inconsistencies will stay between servers.
+ * TODO : REPAIR : log an error for the repair tool
+ * that will need to resynchronize the servers.
+ */
+ int msgID = MSGID_CANNOT_RECOVER_CHANGES;
+ String message = getMessage(msgID);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.FATAL_ERROR,
+ message, msgID);
}
- startHeartBeat();
- break;
+ else
+ {
+ replicationServer = ServerAddr.toString();
+ maxSendWindow = startMsg.getWindowSize();
+ connected = true;
+ for (FakeOperation replayOp : replayOperations)
+ {
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "sendingChange", 1);
+ session.publish(replayOp.generateMessage());
+ }
+ startHeartBeat();
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "changes sent", 1);
+ break;
+ }
+ }
+ }
+ }
+ catch (ConnectException e)
+ {
+ /*
+ * There was no server waiting on this host:port
+ * Log a notice and try the next replicationServer in the list
+ */
+ if (!connectionError )
+ {
+ // the error message is only logged once to avoid overflowing
+ // the error log
+ int msgID = MSGID_NO_CHANGELOG_SERVER_LISTENING;
+ String message = getMessage(msgID, server);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ message, msgID);
+ }
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_EXCEPTION_STARTING_SESSION;
+ String message = getMessage(msgID);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message + stackTraceToSingleLineString(e), msgID);
+ }
+ finally
+ {
+ if (connected == false)
+ {
+ if (session != null)
+ {
+ session.close();
+ session = null;
}
}
}
}
- catch (ConnectException e)
+
+ if ((!connected) && (checkState == true) && receivedResponse)
{
/*
- * There was no server waiting on this host:port
- * Log a notice and try the next replicationServer in the list
+ * We could not find a replicationServer that has seen all the
+ * changes that this server has already processed, start again
+ * the loop looking for any replicationServer.
*/
- if (!connectionError )
- {
- // the error message is only logged once to avoid overflowing
- // the error log
- int msgID = MSGID_NO_CHANGELOG_SERVER_LISTENING;
- String message = getMessage(msgID, server);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- message, msgID);
- }
- }
- catch (Exception e)
- {
- int msgID = MSGID_EXCEPTION_STARTING_SESSION;
- String message = getMessage(msgID) + stackTraceToSingleLineString(e);
+ int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
+ String message = getMessage(msgID);
logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- }
- finally
- {
- if (connected == false)
- {
- if (session != null)
- {
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- "Broker : connect closing session" , 1);
-
- session.close();
- session = null;
- }
- }
+ ErrorLogSeverity.NOTICE,
+ message, msgID);
+ checkState = false;
}
}
- if ((!connected) && (checkState == true) && receivedResponse)
+ if (connected)
+ {
+ // This server has connected correctly.
+ // Log a message to let the administrator know that the failure was
+ // resolved.
+ // wakeup all the thread that were waiting on the window
+ // on the previous connection.
+ connectionError = false;
+ if (sendWindow != null)
+ sendWindow.release(Integer.MAX_VALUE);
+ this.sendWindow = new Semaphore(maxSendWindow);
+ connectPhaseLock.notify();
+ int msgID = MSGID_NOW_FOUND_CHANGELOG;
+ String message =
+ getMessage(msgID, replicationServer, baseDn.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE, message, msgID);
+ }
+ else
{
/*
- * We could not find a replicationServer that has seen all the
- * changes that this server has already processed, start again
- * the loop looking for any replicationServer.
+ * This server could not find any replicationServer
+ * It's going to start in degraded mode.
+ * Log a message
*/
- int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
- String message = getMessage(msgID);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE,
- message, msgID);
- try
+ if (!connectionError)
{
- Thread.sleep(500);
- } catch (InterruptedException e)
- {
+ checkState = false;
+ connectionError = true;
+ connectPhaseLock.notify();
+ int msgID = MSGID_COULD_NOT_FIND_CHANGELOG;
+ String message = getMessage(msgID, baseDn.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE, message, msgID);
}
- checkState = false;
- }
- }
-
- if (connected)
- {
- // This server has connected correctly.
- // let's check if it was previosuly on error, in this case log
- // a message to let the administratot know that the failure was resolved.
- if (connectionError)
- {
- connectionError = false;
- int msgID = MSGID_NOW_FOUND_CHANGELOG;
- String message = getMessage(msgID, baseDn.toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE, message, msgID);
- }
- }
- else
- {
- /*
- * This server could not find any replicationServer
- * It's going to start in degraded mode.
- * Log a message
- */
- if (!connectionError)
- {
- checkState = false;
- connectionError = true;
- int msgID = MSGID_COULD_NOT_FIND_CHANGELOG;
- String message = getMessage(msgID, baseDn.toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.NOTICE, message, msgID);
}
}
}
@@ -530,30 +542,90 @@
public void publish(ReplicationMessage msg)
{
boolean done = false;
- ProtocolSession failingSession = session;
while (!done)
{
if (connectionError)
- return;
- synchronized (lock)
{
- try
+ // It was not possible to connect to any replication server.
+ // Since the operation was already processed, we have no other
+ // choice than to return without sending the ReplicationMessage
+ // and relying on the resend procedure of the connect phase to
+ // fix the problem when we finally connect.
+ return;
+ }
+
+ try
+ {
+ boolean credit;
+ ProtocolSession current_session;
+ Semaphore currentWindowSemaphore;
+
+ // save the session at the time when we acquire the
+ // sendwindow credit so that we can make sure later
+ // that the session did not change in between.
+ // This is necessary to make sure that we don't publish a message
+ // on a session with a credit that was acquired from a previous
+ // session.
+ synchronized (connectPhaseLock)
{
- if (this.connected == false)
- this.reStart(failingSession);
- if (msg instanceof UpdateMessage)
- sendWindow.acquire();
- session.publish(msg);
- done = true;
- } catch (IOException e)
- {
- this.reStart(failingSession);
+ current_session = session;
+ currentWindowSemaphore = sendWindow;
}
- catch (InterruptedException e)
+
+ if (msg instanceof UpdateMessage)
{
- this.reStart(failingSession);
+ // Acquiring the window credit must be done outside of the
+ // connectPhaseLock because it can be blocking and we don't
+ // want to hold off reconnection in case the connection dropped.
+ credit =
+ currentWindowSemaphore.tryAcquire(
+ (long) 500, TimeUnit.MILLISECONDS);
}
+ else
+ {
+ credit = true;
+ }
+ if (credit)
+ {
+ synchronized (connectPhaseLock)
+ {
+ // check the session. If it has changed, some
+ // deconnection/reconnection happened and we need to restart from
+ // scratch.
+ if (session == current_session)
+ {
+ session.publish(msg);
+ done = true;
+ }
+ }
+ }
+ if (!credit)
+ {
+ // the window is still closed.
+ // Send a WindowProbe message to wakeup the receiver in case the
+ // window update message was lost somehow...
+ // then loop to check again if connection was closed.
+ session.publish(new WindowProbe());
+ }
+ } catch (IOException e)
+ {
+ // The receive threads should handle reconnection or
+ // mark this broker in error. Just retry.
+ synchronized (connectPhaseLock)
+ {
+ try
+ {
+ connectPhaseLock.wait(100);
+ } catch (InterruptedException e1)
+ {
+ // ignore
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // just loop.
}
}
}
@@ -561,6 +633,10 @@
/**
* Receive a message.
+ * This method is not multithread safe and should either always be
+ * called in a single thread or protected by a locking mechanism
+ * before being called.
+ *
* @return the received message
* @throws SocketTimeoutException if the timeout set by setSoTimeout
* has expired
@@ -603,10 +679,12 @@
{
if (shutdown == false)
{
- synchronized (lock)
- {
- this.reStart(failingSession);
- }
+ int msgID = MSGID_DISCONNECTED_FROM_CHANGELOG;
+ String message = getMessage(msgID, replicationServer);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ message + " " + e.getMessage(), msgID);
+ this.reStart(failingSession);
}
}
}
--
Gitblit v1.10.0