From eb1275dc2c85f9e3db21bfd5a151d4bc0740d336 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 18 Jan 2008 10:18:24 +0000
Subject: [PATCH] Fix for 1288: Synchronization plugin should not create 10 listener threads for each baseDn
---
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 565 ++++++++++++++++++++++++++++----------------------------
1 files changed, 284 insertions(+), 281 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 391c238..8e4efd0 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
@@ -51,6 +51,7 @@
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
@@ -172,18 +173,16 @@
*/
private static final DebugTracer TRACER = getTracer();
- /**
- * on shutdown, the server will wait for existing threads to stop
- * during this timeout (in ms).
- */
- private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
-
private ReplicationMonitor monitor;
private ReplicationBroker broker;
-
- private List<ListenerThread> synchroThreads =
- new ArrayList<ListenerThread>();
+ // Thread waiting for incoming update messages for this domain and pushing
+ // them to the global incoming update message queue for later processing by
+ // replay threads.
+ private ListenerThread listenerThread;
+ // The update to replay message queue where the listener thread is going to
+ // push incoming update messages.
+ private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
new TreeMap<ChangeNumber, UpdateMessage>();
private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
@@ -233,8 +232,6 @@
// Null when none is being processed.
private IEContext ieContext = null;
- private int listenerThreadNumber = 10;
-
private Collection<String> replicationServers;
private DN baseDN;
@@ -355,12 +352,59 @@
}
/**
+ * This thread is launched when we want to export data to another server that
+ * has requested to be initialized with the data of our backend.
+ */
+ private class ExportThread extends DirectoryThread
+ {
+ // Id of server that will receive updates
+ private short target;
+
+ /**
+ * Constructor for the ExportThread.
+ *
+ * @param target Id of server that will receive updates
+ */
+ public ExportThread(short target)
+ {
+ super("Export thread");
+ this.target = target;
+ }
+
+ /**
+ * Run method for this class.
+ */
+ public void run()
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Export thread starting.");
+ }
+
+ try
+ {
+ initializeRemote(target, target, null);
+ } catch (DirectoryException de)
+ {
+ // An error message has been sent to the peer
+ // Nothing more to do locally
+ }
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Export thread stopping.");
+ }
+ }
+ }
+
+ /**
* Creates a new ReplicationDomain using configuration from configEntry.
*
* @param configuration The configuration of this ReplicationDomain.
+ * @param updateToReplayQueue The queue for update messages to replay.
* @throws ConfigException In case of invalid configuration.
*/
- public ReplicationDomain(ReplicationDomainCfg configuration)
+ public ReplicationDomain(ReplicationDomainCfg configuration,
+ LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
throws ConfigException
{
super("replicationDomain_" + configuration.getBaseDN());
@@ -373,6 +417,7 @@
heartbeatInterval = configuration.getHeartbeatInterval();
isolationpolicy = configuration.getIsolationPolicy();
configDn = configuration.dn();
+ this.updateToReplayQueue = updateToReplayQueue;
/*
* Modify conflicts are solved for all suffixes but the schema suffix
@@ -819,122 +864,112 @@
*/
public UpdateMessage receive()
{
- UpdateMessage update = remotePendingChanges.getNextUpdate();
+ UpdateMessage update = null;
- if (update == null)
+ while (update == null)
{
- while (update == null)
+ InitializeRequestMessage initMsg = null;
+ synchronized (broker)
{
- InitializeRequestMessage initMsg = null;
- synchronized (broker)
+ ReplicationMessage msg;
+ try
{
- ReplicationMessage msg;
- try
+ msg = broker.receive();
+ if (msg == null)
{
- msg = broker.receive();
- if (msg == null)
- {
- // The server is in the shutdown process
- return null;
- }
+ // The server is in the shutdown process
+ return null;
+ }
- if (debugEnabled())
- if (!(msg instanceof HeartbeatMessage))
- TRACER.debugVerbose("Message received <" + msg + ">");
+ if (debugEnabled())
+ if (!(msg instanceof HeartbeatMessage))
+ TRACER.debugVerbose("Message received <" + msg + ">");
- if (msg instanceof AckMessage)
- {
- AckMessage ack = (AckMessage) msg;
- receiveAck(ack);
+ if (msg instanceof AckMessage)
+ {
+ AckMessage ack = (AckMessage) msg;
+ receiveAck(ack);
}
else if (msg instanceof InitializeRequestMessage)
- {
- // Another server requests us to provide entries
- // for a total update
+ {
+ // Another server requests us to provide entries
+ // for a total update
initMsg = (InitializeRequestMessage)msg;
}
else if (msg instanceof InitializeTargetMessage)
- {
- // Another server is exporting its entries to us
- InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
+ {
+ // Another server is exporting its entries to us
+ InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
- try
- {
- // This must be done while we are still holding the
- // broker lock because we are now going to receive a
- // bunch of entries from the remote server and we
- // want the import thread to catch them and
- // not the ListenerThread.
- initialize(importMsg);
+ try
+ {
+ // This must be done while we are still holding the
+ // broker lock because we are now going to receive a
+ // bunch of entries from the remote server and we
+ // want the import thread to catch them and
+ // not the ListenerThread.
+ initialize(importMsg);
}
catch(DirectoryException de)
- {
- // Returns an error message to notify the sender
- ErrorMessage errorMsg =
- new ErrorMessage(importMsg.getsenderID(),
- de.getMessageObject());
- MessageBuilder mb = new MessageBuilder();
- mb.append(de.getMessageObject());
- TRACER.debugInfo(Message.toString(mb.toMessage()));
- broker.publish(errorMsg);
- }
+ {
+ // Returns an error message to notify the sender
+ ErrorMessage errorMsg =
+ new ErrorMessage(importMsg.getsenderID(),
+ de.getMessageObject());
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(de.getMessageObject());
+ TRACER.debugInfo(Message.toString(mb.toMessage()));
+ broker.publish(errorMsg);
+ }
}
else if (msg instanceof ErrorMessage)
+ {
+ if (ieContext != null)
{
- if (ieContext != null)
- {
- // This is an error termination for the 2 following cases :
- // - either during an export
- // - or before an import really started
- // For example, when we publish a request and the
- // replicationServer did not find any import source.
+ // This is an error termination for the 2 following cases :
+ // - either during an export
+ // - or before an import really started
+ // For example, when we publish a request and the
+ // replicationServer did not find any import source.
abandonImportExport((ErrorMessage)msg);
}
else
- {
- /* We can receive an error message from the replication server
- * in the following cases :
- * - we connected with an incorrect generation id
- */
+ {
+ /* We can receive an error message from the replication server
+ * in the following cases :
+ * - we connected with an incorrect generation id
+ */
ErrorMessage errorMsg = (ErrorMessage)msg;
- logError(ERR_ERROR_MSG_RECEIVED.get(
- errorMsg.getDetails()));
- }
+ logError(ERR_ERROR_MSG_RECEIVED.get(
+ errorMsg.getDetails()));
+ }
}
else if (msg instanceof UpdateMessage)
- {
- update = (UpdateMessage) msg;
- receiveUpdate(update);
- }
+ {
+ update = (UpdateMessage) msg;
+ receiveUpdate(update);
+ }
}
catch (SocketTimeoutException e)
- {
- // just retry
- }
- }
- // Test if we have received and export request message and
- // if that's the case handle it now.
- // This must be done outside of the portion of code protected
- // by the broker lock so that we keep receiveing update
- // when we are doing and export and so that a possible
- // closure of the socket happening when we are publishing the
- // entries to the remote can be handled by the other
- // ListenerThread when they call this method and therefore the
- // broker.receive() method.
- if (initMsg != null)
{
- try
- {
- initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
- null);
- }
- catch(DirectoryException de)
- {
- // An error message has been sent to the peer
- // Nothing more to do locally
- }
+ // just retry
}
}
+ // Test if we have received and export request message and
+ // if that's the case handle it now.
+ // This must be done outside of the portion of code protected
+ // by the broker lock so that we keep receiveing update
+ // when we are doing and export and so that a possible
+ // closure of the socket happening when we are publishing the
+ // entries to the remote can be handled by the other
+ // replay thread when they call this method and therefore the
+ // broker.receive() method.
+ if (initMsg != null)
+ {
+ // Do this work in a thread to allow replay thread continue working
+ ExportThread exportThread = new ExportThread(initMsg.getsenderID());
+ exportThread.start();
+ }
}
return update;
}
@@ -1190,10 +1225,9 @@
@Override
public void run()
{
- /*
- * create the threads that will wait for incoming changes.
- */
- createListeners();
+ // Create the listener thread
+ listenerThread = new ListenerThread(this, updateToReplayQueue);
+ listenerThread.start();
while (shutdown == false)
{
@@ -1217,28 +1251,6 @@
}
/**
- * create the threads that will wait for incoming changes.
- * TODO : should use a pool of threads shared between all the servers
- * TODO : need to make number of thread configurable
- */
- private void createListeners()
- {
- synchronized (synchroThreads)
- {
- if (!shutdown)
- {
- synchroThreads.clear();
- for (int i=0; i<listenerThreadNumber; i++)
- {
- ListenerThread myThread = new ListenerThread(this);
- myThread.start();
- synchroThreads.add(myThread);
- }
- }
- }
- }
-
- /**
* Shutdown this ReplicationDomain.
*/
public void shutdown()
@@ -1246,14 +1258,8 @@
// stop the flush thread
shutdown = true;
- synchronized (synchroThreads)
- {
- // stop the listener threads
- for (ListenerThread thread : synchroThreads)
- {
- thread.shutdown();
- }
- }
+ // Stop the listener thread
+ listenerThread.shutdown();
synchronized (this)
{
@@ -1267,11 +1273,8 @@
// stop the ReplicationBroker
broker.stop();
- // wait for the listener thread to stop
- for (ListenerThread thread : synchroThreads)
- {
- thread.waitForShutdown();
- }
+ // Wait for the listener thread to stop
+ listenerThread.waitForShutdown();
// wait for completion of the persistentServerState thread.
try
@@ -1315,140 +1318,148 @@
int retryCount = 10;
boolean firstTry = true;
- try
+ // Try replay the operation, then flush (replaying) any pending operation
+ // whose dependency has been replayed until no more left.
+ do
{
- while ((!dependency) && (!done) && (retryCount-- > 0))
+ try
{
- op = msg.createOperation(conn);
-
- op.setInternalOperation(true);
- op.setSynchronizationOperation(true);
- changeNumber = OperationContext.getChangeNumber(op);
- ((AbstractOperation)op).run();
-
- ResultCode result = op.getResultCode();
-
- if (result != ResultCode.SUCCESS)
+ while ((!dependency) && (!done) && (retryCount-- > 0))
{
- if (op instanceof ModifyOperation)
- {
- ModifyOperation newOp = (ModifyOperation) op;
- dependency = remotePendingChanges.checkDependencies(newOp);
- if (!dependency)
- {
- done = solveNamingConflict(newOp, msg);
- }
- }
- else if (op instanceof DeleteOperation)
- {
- DeleteOperation newOp = (DeleteOperation) op;
- dependency = remotePendingChanges.checkDependencies(newOp);
- if ((!dependency) && (!firstTry))
- {
- done = solveNamingConflict(newOp, msg);
- }
- }
- else if (op instanceof AddOperation)
- {
- AddOperation newOp = (AddOperation) op;
- AddMsg addMsg = (AddMsg) msg;
- dependency = remotePendingChanges.checkDependencies(newOp);
- if (!dependency)
- {
- done = solveNamingConflict(newOp, addMsg);
- }
- }
- else if (op instanceof ModifyDNOperationBasis)
- {
- ModifyDNMsg newMsg = (ModifyDNMsg) msg;
- dependency = remotePendingChanges.checkDependencies(newMsg);
- if (!dependency)
- {
- ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
- done = solveNamingConflict(newOp, msg);
- }
- }
- else
- {
- done = true; // unknown type of operation ?!
- }
- if (done)
- {
- // the update became a dummy update and the result
- // of the conflict resolution phase is to do nothing.
- // however we still need to push this change to the serverState
- updateError(changeNumber);
- }
- }
- else
- {
- done = true;
- }
- firstTry = false;
- }
+ op = msg.createOperation(conn);
- if (!done && !dependency)
- {
- // Continue with the next change but the servers could now become
- // inconsistent.
- // Let the repair tool know about this.
- Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
+ op.setInternalOperation(true);
+ op.setSynchronizationOperation(true);
+ changeNumber = OperationContext.getChangeNumber(op);
+ ((AbstractOperation) op).run();
+
+ // Try replay the operation
+ ResultCode result = op.getResultCode();
+
+ if (result != ResultCode.SUCCESS)
+ {
+ if (op instanceof ModifyOperation)
+ {
+ ModifyOperation newOp = (ModifyOperation) op;
+ dependency = remotePendingChanges.checkDependencies(newOp);
+ if (!dependency)
+ {
+ done = solveNamingConflict(newOp, msg);
+ }
+ } else if (op instanceof DeleteOperation)
+ {
+ DeleteOperation newOp = (DeleteOperation) op;
+ dependency = remotePendingChanges.checkDependencies(newOp);
+ if ((!dependency) && (!firstTry))
+ {
+ done = solveNamingConflict(newOp, msg);
+ }
+ } else if (op instanceof AddOperation)
+ {
+ AddOperation newOp = (AddOperation) op;
+ AddMsg addMsg = (AddMsg) msg;
+ dependency = remotePendingChanges.checkDependencies(newOp);
+ if (!dependency)
+ {
+ done = solveNamingConflict(newOp, addMsg);
+ }
+ } else if (op instanceof ModifyDNOperationBasis)
+ {
+ ModifyDNMsg newMsg = (ModifyDNMsg) msg;
+ dependency = remotePendingChanges.checkDependencies(newMsg);
+ if (!dependency)
+ {
+ ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
+ done = solveNamingConflict(newOp, msg);
+ }
+ } else
+ {
+ done = true; // unknown type of operation ?!
+ }
+ if (done)
+ {
+ // the update became a dummy update and the result
+ // of the conflict resolution phase is to do nothing.
+ // however we still need to push this change to the serverState
+ updateError(changeNumber);
+ }
+ } else
+ {
+ done = true;
+ }
+ firstTry = false;
+ }
+
+ if (!done && !dependency)
+ {
+ // Continue with the next change but the servers could now become
+ // inconsistent.
+ // Let the repair tool know about this.
+ Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
op.getErrorMessage().toString());
- logError(message);
- numUnresolvedNamingConflicts.incrementAndGet();
+ logError(message);
+ numUnresolvedNamingConflicts.incrementAndGet();
- updateError(changeNumber);
- }
- }
- catch (ASN1Exception e)
- {
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- }
- catch (LDAPException e)
- {
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- }
- catch (DataFormatException e)
- {
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- }
- catch (Exception e)
- {
- if (changeNumber != null)
- {
- /*
- * An Exception happened during the replay process.
- * Continue with the next change but the servers will now start
- * to be inconsistent.
- * Let the repair tool know about this.
- */
- Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
- stackTraceToSingleLineString(e), op.toString());
- logError(message);
- updateError(changeNumber);
- }
- else
+ updateError(changeNumber);
+ }
+ } catch (ASN1Exception e)
{
Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
+ String.valueOf(msg) + stackTraceToSingleLineString(e));
logError(message);
- }
- }
- finally
- {
- if (!dependency)
+ } catch (LDAPException e)
{
- if (msg.isAssured())
- ack(msg.getChangeNumber());
- incProcessedUpdates();
+ Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+ String.valueOf(msg) + stackTraceToSingleLineString(e));
+ logError(message);
+ } catch (DataFormatException e)
+ {
+ Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+ String.valueOf(msg) + stackTraceToSingleLineString(e));
+ logError(message);
+ } catch (Exception e)
+ {
+ if (changeNumber != null)
+ {
+ /*
+ * An Exception happened during the replay process.
+ * Continue with the next change but the servers will now start
+ * to be inconsistent.
+ * Let the repair tool know about this.
+ */
+ Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
+ stackTraceToSingleLineString(e), op.toString());
+ logError(message);
+ updateError(changeNumber);
+ } else
+ {
+ Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+ String.valueOf(msg) + stackTraceToSingleLineString(e));
+ logError(message);
+ }
+ } finally
+ {
+ if (!dependency)
+ {
+ if (msg.isAssured())
+ ack(msg.getChangeNumber());
+ incProcessedUpdates();
+ }
}
- }
+
+ // Now replay any pending update that had a dependency and whose
+ // dependency has been replayed, do that until no more updates of that
+ // type left...
+ msg = remotePendingChanges.getNextUpdate();
+
+ // Prepare restart of loop
+ done = false;
+ dependency = false;
+ changeNumber = null;
+ retryCount = 10;
+ firstTry = true;
+
+ } while (msg != null);
}
/**
@@ -2224,7 +2235,7 @@
* The session to the replication server will be stopped.
* The domain will not be destroyed but call to the pre-operation
* methods will result in failure.
- * The listener threads will be destroyed.
+ * The listener thread will be destroyed.
* The monitor informations will still be accessible.
*/
public void disable()
@@ -2232,23 +2243,14 @@
state.save();
state.clearInMemory();
disabled = true;
- // stop the listener threads
- for (ListenerThread thread : synchroThreads)
- {
- thread.shutdown();
- }
- broker.stop(); // this will cut the session and wake-up the listeners
- for (ListenerThread thread : synchroThreads)
- {
- try
- {
- thread.join(SHUTDOWN_JOIN_TIMEOUT);
- } catch (InterruptedException e)
- {
- // ignore
- }
- }
+ // Stop the listener thread
+ listenerThread.shutdown();
+
+ broker.stop(); // This will cut the session and wake up the listener
+
+ // Wait for the listener thread to stop
+ listenerThread.waitForShutdown();
}
/**
@@ -2265,7 +2267,6 @@
state.loadState();
disabled = false;
-
try
{
generationId = loadGenerationId();
@@ -2288,7 +2289,9 @@
broker.start(replicationServers);
- createListeners();
+ // Create the listener thread
+ listenerThread = new ListenerThread(this, updateToReplayQueue);
+ listenerThread.start();
}
/**
--
Gitblit v1.10.0