From eb1275dc2c85f9e3db21bfd5a151d4bc0740d336 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 18 Jan 2008 10:18:24 +0000
Subject: [PATCH] Fix for 1288: Synchronization plugin should not create 10 listener threads for each baseDn
---
opends/resource/schema/02-config.ldif | 15
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java | 4
opends/src/server/org/opends/server/replication/plugin/ListenerThread.java | 56 ++
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 114 ++++++
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java | 2
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 565 ++++++++++++++++----------------
opends/src/messages/messages/replication.properties | 2
opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml | 26 +
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java | 142 ++++++++
opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java | 73 ++++
10 files changed, 692 insertions(+), 307 deletions(-)
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index e818a99..8bf2a14 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -1543,11 +1543,6 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.26027.1.1.445
- NAME 'ds-cfg-cache-level'
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
- SINGLE-VALUE
- X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.315
NAME 'ds-cfg-allow-retrieving-membership'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
@@ -2182,11 +2177,20 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.445
+ NAME 'ds-cfg-cache-level'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+ SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.446
NAME 'ds-cfg-cache-preload'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.447
+ NAME 'ds-cfg-num-update-replay-threads'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+ X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
NAME 'ds-cfg-access-control-handler'
SUP top
@@ -3087,6 +3091,7 @@
NAME 'ds-cfg-replication-synchronization-provider'
SUP ds-cfg-synchronization-provider
STRUCTURAL
+ MAY ( ds-cfg-num-update-replay-threads )
X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.94
NAME 'ds-cfg-dictionary-password-validator'
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml
index 330ed53..41258c9 100644
--- a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml
+++ b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml
@@ -23,7 +23,7 @@
! CDDL HEADER END
!
!
- ! Portions Copyright 2007 Sun Microsystems, Inc.
+ ! Portions Copyright 2007-2008 Sun Microsystems, Inc.
! -->
<adm:managed-object name="replication-synchronization-provider"
plural-name="replication-synchronization-providers"
@@ -79,4 +79,28 @@
</adm:defined>
</adm:default-behavior>
</adm:property-override>
+ <adm:property name="num-update-replay-threads" mandatory="false" read-only="false">
+ <adm:synopsis>
+ Specifies the number of update replay threads
+ </adm:synopsis>
+ <adm:description>
+ This is the number of threads created for replaying every updates
+ received for all the replication domains.
+ </adm:description>
+ <adm:default-behavior>
+ <adm:defined>
+ <adm:value>
+ 10
+ </adm:value>
+ </adm:defined>
+ </adm:default-behavior>
+ <adm:syntax>
+ <adm:integer lower-limit="1" upper-limit="65535"></adm:integer>
+ </adm:syntax>
+ <adm:profile name="ldap">
+ <ldap:attribute>
+ <ldap:name>ds-cfg-num-update-replay-threads</ldap:name>
+ </ldap:attribute>
+ </adm:profile>
+ </adm:property>
</adm:managed-object>
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index d47fa34..dba9531 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -257,4 +257,6 @@
are missing due to a processing error : %s
SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \
sending request to get remote monitor data
+SEVERE_ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE_109=An Exception was caught \
+ while replaying replication message : %s
diff --git a/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java b/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
index 9be5ae9..d92eb19 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -22,9 +22,11 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.opends.messages.Message;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -48,22 +50,27 @@
*/
private static final DebugTracer TRACER = getTracer();
- private ReplicationDomain listener;
+ private ReplicationDomain repDomain;
private boolean shutdown = false;
private boolean done = false;
+ private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
/**
* Constructor for the ListenerThread.
*
- * @param listener the Plugin that created this thread
+ * @param repDomain the replication domain that created this thread
+ * @param updateToReplayQueue The update messages queue we must
+ * store messages in
*/
- public ListenerThread(ReplicationDomain listener)
+ public ListenerThread(ReplicationDomain repDomain,
+ LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
{
super("Replication Listener thread " +
- "serverID=" + listener.serverId +
- " domain=" + listener.getName());
- this.listener = listener;
+ "serverID=" + repDomain.serverId +
+ " domain=" + repDomain.getName());
+ this.repDomain = repDomain;
+ this.updateToReplayQueue = updateToReplayQueue;
}
/**
@@ -77,31 +84,50 @@
/**
* Run method for this class.
*/
+ @Override
public void run()
{
- UpdateMessage msg;
+ UpdateMessage updateMsg = null;
if (debugEnabled())
{
TRACER.debugInfo("Replication Listener thread starting.");
}
- while (shutdown == false)
+ while (!shutdown)
{
try
{
- while (((msg = listener.receive()) != null) && (shutdown == false))
+ // Loop receiving update messages and puting them in the update message
+ // queue
+ while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
{
- listener.replay(msg);
+ // Put update message into the queue (block until some place in the
+ // queue is available)
+ UpdateToReplay updateToReplay =
+ new UpdateToReplay(updateMsg, repDomain);
+ boolean queued = false;
+ while (!queued && !shutdown)
+ {
+ // Use timedout method (offer) instead of put for being able to
+ // shutdown the thread
+ queued = updateToReplayQueue.offer(updateToReplay,
+ 1L, TimeUnit.SECONDS);
+ }
+ if (!queued)
+ {
+ // Shutdown requested but could not push message: ensure this one is
+ // not lost and put it in the queue before dying
+ updateToReplayQueue.offer(updateToReplay);
+ }
}
- if (msg == null)
+ if (updateMsg == null)
shutdown = true;
} catch (Exception e)
{
/*
- * catch all exceptions happening in listener.receive and
- * listener.replay so that the thread never dies even in case
- * of problems.
+ * catch all exceptions happening in repDomain.receive so that the
+ * thread never dies even in case of problems.
*/
Message message = ERR_EXCEPTION_RECEIVING_REPLICATION_MESSAGE.get(
stackTraceToSingleLineString(e));
diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index 001484d..a69f1b4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,19 +22,22 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
+import java.util.ArrayList;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
+import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
@@ -82,6 +85,8 @@
extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
implements ConfigurationAddListener<ReplicationDomainCfg>,
ConfigurationDeleteListener<ReplicationDomainCfg>,
+ ConfigurationChangeListener
+ <ReplicationSynchronizationProviderCfg>,
BackupTaskListener, RestoreTaskListener, ImportTaskListener,
ExportTaskListener
{
@@ -89,6 +94,23 @@
private static Map<DN, ReplicationDomain> domains =
new HashMap<DN, ReplicationDomain>() ;
+ /**
+ * The queue of received update messages, to be treated by the ReplayThread
+ * threads.
+ */
+ private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
+ new LinkedBlockingQueue<UpdateToReplay>();
+
+ /**
+ * The list of ReplayThread threads.
+ */
+ private static List<ReplayThread> replayThreads =
+ new ArrayList<ReplayThread>();
+
+ /**
+ * The configurable number of replay threads.
+ */
+ private static int replayThreadNumber = 10;
/**
* Finds the domain for a given DN.
@@ -167,7 +189,16 @@
throws ConfigException
{
ReplicationDomain domain;
- domain = new ReplicationDomain(configuration);
+ domain = new ReplicationDomain(configuration, updateToReplayQueue);
+
+ if (domains.size() == 0)
+ {
+ /*
+ * Create the threads that will process incoming update messages
+ */
+ createReplayThreads();
+ }
+
domains.put(domain.getBaseDN(), domain);
domain.start();
return domain;
@@ -180,6 +211,12 @@
public static void deleteDomain(DN dn)
{
ReplicationDomain domain = domains.remove(dn);
+
+ // No replay threads running if no replication need
+ if (domains.size() == 0) {
+ stopReplayThreads();
+ }
+
if (domain != null)
domain.shutdown();
}
@@ -200,6 +237,12 @@
configuration.addReplicationDomainAddListener(this);
configuration.addReplicationDomainDeleteListener(this);
+ // Register as a root configuration listener so that we can be notified if
+ // number of replay threads is changed and apply changes.
+ configuration.addReplicationChangeListener(this);
+
+ replayThreadNumber = configuration.getNumUpdateReplayThreads();
+
// Create the list of domains that are already defined.
for (String name : configuration.listReplicationDomains())
{
@@ -228,6 +271,39 @@
}
/**
+ * Create the threads that will wait for incoming update messages.
+ */
+ private synchronized static void createReplayThreads()
+ {
+ replayThreads.clear();
+
+ for (int i = 0; i < replayThreadNumber; i++)
+ {
+ ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
+ replayThread.start();
+ replayThreads.add(replayThread);
+ }
+ }
+
+ /**
+ * Stope the threads that are waiting for incoming update messages.
+ */
+ private synchronized static void stopReplayThreads()
+ {
+ // stop the replay threads
+ for (ReplayThread replayThread : replayThreads)
+ {
+ replayThread.shutdown();
+ }
+
+ for (ReplayThread replayThread : replayThreads)
+ {
+ replayThread.waitForShutdown();
+ }
+ replayThreads.clear();
+ }
+
+ /**
* {@inheritDoc}
*/
public boolean isConfigurationAddAcceptable(
@@ -431,6 +507,9 @@
@Override
public void finalizeSynchronizationProvider()
{
+ // Stop replay threads
+ stopReplayThreads();
+
// shutdown all the domains
for (ReplicationDomain domain : domains.values())
{
@@ -621,6 +700,37 @@
{
return replicationServerListener;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean
+ isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
+ configuration,
+ List<Message> unacceptableReasons)
+ {
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public ConfigChangeResult
+ applyConfigurationChange
+ (ReplicationSynchronizationProviderCfg configuration)
+ {
+ int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
+
+ // Stop threads then restart new number of threads
+ stopReplayThreads();
+ replayThreadNumber = numUpdateRepayThread;
+ if (domains.size() > 0)
+ {
+ createReplayThreads();
+ }
+
+ return new ConfigChangeResult(ResultCode.SUCCESS, false);
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index 9327406..72056f9 100644
--- a/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -158,7 +158,7 @@
{
/*
* Parse the list of Update with dependencies and check if the dependencies
- * are now cleared until an Update withour dependencies is found.
+ * are now cleared until an Update without dependencies is found.
*/
for (PendingChange change : dependentChanges)
{
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
new file mode 100644
index 0000000..3774933
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -0,0 +1,142 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.opends.messages.Message;
+
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.protocol.UpdateMessage;
+
+/**
+ * Thread that is used to get message from the replication servers (stored
+ * in the updates queue) and replay them in the current server. A configurable
+ * number of this thread is created for the whole MultimasterReplication object
+ * (i.e: these threads are shared accross the ReplicationDomain objects for
+ * replaying the updates they receive)
+ */
+public class ReplayThread extends DirectoryThread
+{
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
+ private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
+ private boolean shutdown = false;
+ private boolean done = false;
+
+ /**
+ * Constructor for the ReplayThread.
+ *
+ * @param updateToReplayQueue The queue of update messages we have to replay
+ */
+ public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
+ {
+ super("Replication Replay thread");
+ this.updateToReplayQueue = updateToReplayQueue;
+ }
+
+ /**
+ * Shutdown this replay thread.
+ */
+ public void shutdown()
+ {
+ shutdown = true;
+ }
+
+ /**
+ * Run method for this class.
+ */
+ @Override
+ public void run()
+ {
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Replication Replay thread starting.");
+ }
+
+ UpdateToReplay updateToreplay = null;
+
+ while (!shutdown)
+ {
+ try
+ {
+ // Loop getting an updateToReplayQueue from the update message queue and
+ // replaying matching changes
+ while ( (!shutdown) &&
+ ((updateToreplay = updateToReplayQueue.poll(1L,
+ TimeUnit.SECONDS)) != null))
+ {
+ // Find replication domain for that update message
+ UpdateMessage updateMsg = updateToreplay.getUpdateMessage();
+ ReplicationDomain domain = updateToreplay.getReplicationDomain();
+ domain.replay(updateMsg);
+ }
+ } catch (Exception e)
+ {
+ /*
+ * catch all exceptions happening so that the thread never dies even
+ * in case of problems.
+ */
+ Message message = ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE.get(
+ stackTraceToSingleLineString(e));
+ logError(message);
+ }
+ }
+ done = true;
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Replication Replay thread stopping.");
+ }
+ }
+
+ /**
+ * Wait for the completion of this thread.
+ */
+ public void waitForShutdown()
+ {
+ try
+ {
+ while (done == false)
+ {
+ Thread.sleep(50);
+ }
+ } catch (InterruptedException e)
+ {
+ // exit the loop if this thread is interrupted.
+ }
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 391c238..8e4efd0 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
@@ -51,6 +51,7 @@
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
@@ -172,18 +173,16 @@
*/
private static final DebugTracer TRACER = getTracer();
- /**
- * on shutdown, the server will wait for existing threads to stop
- * during this timeout (in ms).
- */
- private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
-
private ReplicationMonitor monitor;
private ReplicationBroker broker;
-
- private List<ListenerThread> synchroThreads =
- new ArrayList<ListenerThread>();
+ // Thread waiting for incoming update messages for this domain and pushing
+ // them to the global incoming update message queue for later processing by
+ // replay threads.
+ private ListenerThread listenerThread;
+ // The update to replay message queue where the listener thread is going to
+ // push incoming update messages.
+ private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
new TreeMap<ChangeNumber, UpdateMessage>();
private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
@@ -233,8 +232,6 @@
// Null when none is being processed.
private IEContext ieContext = null;
- private int listenerThreadNumber = 10;
-
private Collection<String> replicationServers;
private DN baseDN;
@@ -355,12 +352,59 @@
}
/**
+ * This thread is launched when we want to export data to another server that
+ * has requested to be initialized with the data of our backend.
+ */
+ private class ExportThread extends DirectoryThread
+ {
+ // Id of server that will receive updates
+ private short target;
+
+ /**
+ * Constructor for the ExportThread.
+ *
+ * @param target Id of server that will receive updates
+ */
+ public ExportThread(short target)
+ {
+ super("Export thread");
+ this.target = target;
+ }
+
+ /**
+ * Run method for this class.
+ */
+ public void run()
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Export thread starting.");
+ }
+
+ try
+ {
+ initializeRemote(target, target, null);
+ } catch (DirectoryException de)
+ {
+ // An error message has been sent to the peer
+ // Nothing more to do locally
+ }
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Export thread stopping.");
+ }
+ }
+ }
+
+ /**
* Creates a new ReplicationDomain using configuration from configEntry.
*
* @param configuration The configuration of this ReplicationDomain.
+ * @param updateToReplayQueue The queue for update messages to replay.
* @throws ConfigException In case of invalid configuration.
*/
- public ReplicationDomain(ReplicationDomainCfg configuration)
+ public ReplicationDomain(ReplicationDomainCfg configuration,
+ LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
throws ConfigException
{
super("replicationDomain_" + configuration.getBaseDN());
@@ -373,6 +417,7 @@
heartbeatInterval = configuration.getHeartbeatInterval();
isolationpolicy = configuration.getIsolationPolicy();
configDn = configuration.dn();
+ this.updateToReplayQueue = updateToReplayQueue;
/*
* Modify conflicts are solved for all suffixes but the schema suffix
@@ -819,122 +864,112 @@
*/
public UpdateMessage receive()
{
- UpdateMessage update = remotePendingChanges.getNextUpdate();
+ UpdateMessage update = null;
- if (update == null)
+ while (update == null)
{
- while (update == null)
+ InitializeRequestMessage initMsg = null;
+ synchronized (broker)
{
- InitializeRequestMessage initMsg = null;
- synchronized (broker)
+ ReplicationMessage msg;
+ try
{
- ReplicationMessage msg;
- try
+ msg = broker.receive();
+ if (msg == null)
{
- msg = broker.receive();
- if (msg == null)
- {
- // The server is in the shutdown process
- return null;
- }
+ // The server is in the shutdown process
+ return null;
+ }
- if (debugEnabled())
- if (!(msg instanceof HeartbeatMessage))
- TRACER.debugVerbose("Message received <" + msg + ">");
+ if (debugEnabled())
+ if (!(msg instanceof HeartbeatMessage))
+ TRACER.debugVerbose("Message received <" + msg + ">");
- if (msg instanceof AckMessage)
- {
- AckMessage ack = (AckMessage) msg;
- receiveAck(ack);
+ if (msg instanceof AckMessage)
+ {
+ AckMessage ack = (AckMessage) msg;
+ receiveAck(ack);
}
else if (msg instanceof InitializeRequestMessage)
- {
- // Another server requests us to provide entries
- // for a total update
+ {
+ // Another server requests us to provide entries
+ // for a total update
initMsg = (InitializeRequestMessage)msg;
}
else if (msg instanceof InitializeTargetMessage)
- {
- // Another server is exporting its entries to us
- InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
+ {
+ // Another server is exporting its entries to us
+ InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
- try
- {
- // This must be done while we are still holding the
- // broker lock because we are now going to receive a
- // bunch of entries from the remote server and we
- // want the import thread to catch them and
- // not the ListenerThread.
- initialize(importMsg);
+ try
+ {
+ // This must be done while we are still holding the
+ // broker lock because we are now going to receive a
+ // bunch of entries from the remote server and we
+ // want the import thread to catch them and
+ // not the ListenerThread.
+ initialize(importMsg);
}
catch(DirectoryException de)
- {
- // Returns an error message to notify the sender
- ErrorMessage errorMsg =
- new ErrorMessage(importMsg.getsenderID(),
- de.getMessageObject());
- MessageBuilder mb = new MessageBuilder();
- mb.append(de.getMessageObject());
- TRACER.debugInfo(Message.toString(mb.toMessage()));
- broker.publish(errorMsg);
- }
+ {
+ // Returns an error message to notify the sender
+ ErrorMessage errorMsg =
+ new ErrorMessage(importMsg.getsenderID(),
+ de.getMessageObject());
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(de.getMessageObject());
+ TRACER.debugInfo(Message.toString(mb.toMessage()));
+ broker.publish(errorMsg);
+ }
}
else if (msg instanceof ErrorMessage)
+ {
+ if (ieContext != null)
{
- if (ieContext != null)
- {
- // This is an error termination for the 2 following cases :
- // - either during an export
- // - or before an import really started
- // For example, when we publish a request and the
- // replicationServer did not find any import source.
+ // This is an error termination for the 2 following cases :
+ // - either during an export
+ // - or before an import really started
+ // For example, when we publish a request and the
+ // replicationServer did not find any import source.
abandonImportExport((ErrorMessage)msg);
}
else
- {
- /* We can receive an error message from the replication server
- * in the following cases :
- * - we connected with an incorrect generation id
- */
+ {
+ /* We can receive an error message from the replication server
+ * in the following cases :
+ * - we connected with an incorrect generation id
+ */
ErrorMessage errorMsg = (ErrorMessage)msg;
- logError(ERR_ERROR_MSG_RECEIVED.get(
- errorMsg.getDetails()));
- }
+ logError(ERR_ERROR_MSG_RECEIVED.get(
+ errorMsg.getDetails()));
+ }
}
else if (msg instanceof UpdateMessage)
- {
- update = (UpdateMessage) msg;
- receiveUpdate(update);
- }
+ {
+ update = (UpdateMessage) msg;
+ receiveUpdate(update);
+ }
}
catch (SocketTimeoutException e)
- {
- // just retry
- }
- }
- // Test if we have received and export request message and
- // if that's the case handle it now.
- // This must be done outside of the portion of code protected
- // by the broker lock so that we keep receiveing update
- // when we are doing and export and so that a possible
- // closure of the socket happening when we are publishing the
- // entries to the remote can be handled by the other
- // ListenerThread when they call this method and therefore the
- // broker.receive() method.
- if (initMsg != null)
{
- try
- {
- initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
- null);
- }
- catch(DirectoryException de)
- {
- // An error message has been sent to the peer
- // Nothing more to do locally
- }
+ // just retry
}
}
+ // Test if we have received and export request message and
+ // if that's the case handle it now.
+ // This must be done outside of the portion of code protected
+ // by the broker lock so that we keep receiveing update
+ // when we are doing and export and so that a possible
+ // closure of the socket happening when we are publishing the
+ // entries to the remote can be handled by the other
+ // replay thread when they call this method and therefore the
+ // broker.receive() method.
+ if (initMsg != null)
+ {
+ // Do this work in a thread to allow replay thread continue working
+ ExportThread exportThread = new ExportThread(initMsg.getsenderID());
+ exportThread.start();
+ }
}
return update;
}
@@ -1190,10 +1225,9 @@
@Override
public void run()
{
- /*
- * create the threads that will wait for incoming changes.
- */
- createListeners();
+ // Create the listener thread
+ listenerThread = new ListenerThread(this, updateToReplayQueue);
+ listenerThread.start();
while (shutdown == false)
{
@@ -1217,28 +1251,6 @@
}
/**
- * create the threads that will wait for incoming changes.
- * TODO : should use a pool of threads shared between all the servers
- * TODO : need to make number of thread configurable
- */
- private void createListeners()
- {
- synchronized (synchroThreads)
- {
- if (!shutdown)
- {
- synchroThreads.clear();
- for (int i=0; i<listenerThreadNumber; i++)
- {
- ListenerThread myThread = new ListenerThread(this);
- myThread.start();
- synchroThreads.add(myThread);
- }
- }
- }
- }
-
- /**
* Shutdown this ReplicationDomain.
*/
public void shutdown()
@@ -1246,14 +1258,8 @@
// stop the flush thread
shutdown = true;
- synchronized (synchroThreads)
- {
- // stop the listener threads
- for (ListenerThread thread : synchroThreads)
- {
- thread.shutdown();
- }
- }
+ // Stop the listener thread
+ listenerThread.shutdown();
synchronized (this)
{
@@ -1267,11 +1273,8 @@
// stop the ReplicationBroker
broker.stop();
- // wait for the listener thread to stop
- for (ListenerThread thread : synchroThreads)
- {
- thread.waitForShutdown();
- }
+ // Wait for the listener thread to stop
+ listenerThread.waitForShutdown();
// wait for completion of the persistentServerState thread.
try
@@ -1315,140 +1318,148 @@
int retryCount = 10;
boolean firstTry = true;
- try
+ // Try replay the operation, then flush (replaying) any pending operation
+ // whose dependency has been replayed until no more left.
+ do
{
- while ((!dependency) && (!done) && (retryCount-- > 0))
+ try
{
- op = msg.createOperation(conn);
-
- op.setInternalOperation(true);
- op.setSynchronizationOperation(true);
- changeNumber = OperationContext.getChangeNumber(op);
- ((AbstractOperation)op).run();
-
- ResultCode result = op.getResultCode();
-
- if (result != ResultCode.SUCCESS)
+ while ((!dependency) && (!done) && (retryCount-- > 0))
{
- if (op instanceof ModifyOperation)
- {
- ModifyOperation newOp = (ModifyOperation) op;
- dependency = remotePendingChanges.checkDependencies(newOp);
- if (!dependency)
- {
- done = solveNamingConflict(newOp, msg);
- }
- }
- else if (op instanceof DeleteOperation)
- {
- DeleteOperation newOp = (DeleteOperation) op;
- dependency = remotePendingChanges.checkDependencies(newOp);
- if ((!dependency) && (!firstTry))
- {
- done = solveNamingConflict(newOp, msg);
- }
- }
- else if (op instanceof AddOperation)
- {
- AddOperation newOp = (AddOperation) op;
- AddMsg addMsg = (AddMsg) msg;
- dependency = remotePendingChanges.checkDependencies(newOp);
- if (!dependency)
- {
- done = solveNamingConflict(newOp, addMsg);
- }
- }
- else if (op instanceof ModifyDNOperationBasis)
- {
- ModifyDNMsg newMsg = (ModifyDNMsg) msg;
- dependency = remotePendingChanges.checkDependencies(newMsg);
- if (!dependency)
- {
- ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
- done = solveNamingConflict(newOp, msg);
- }
- }
- else
- {
- done = true; // unknown type of operation ?!
- }
- if (done)
- {
- // the update became a dummy update and the result
- // of the conflict resolution phase is to do nothing.
- // however we still need to push this change to the serverState
- updateError(changeNumber);
- }
- }
- else
- {
- done = true;
- }
- firstTry = false;
- }
+ op = msg.createOperation(conn);
- if (!done && !dependency)
- {
- // Continue with the next change but the servers could now become
- // inconsistent.
- // Let the repair tool know about this.
- Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
+ op.setInternalOperation(true);
+ op.setSynchronizationOperation(true);
+ changeNumber = OperationContext.getChangeNumber(op);
+ ((AbstractOperation) op).run();
+
+ // Try replay the operation
+ ResultCode result = op.getResultCode();
+
+ if (result != ResultCode.SUCCESS)
+ {
+ if (op instanceof ModifyOperation)
+ {
+ ModifyOperation newOp = (ModifyOperation) op;
+ dependency = remotePendingChanges.checkDependencies(newOp);
+ if (!dependency)
+ {
+ done = solveNamingConflict(newOp, msg);
+ }
+ } else if (op instanceof DeleteOperation)
+ {
+ DeleteOperation newOp = (DeleteOperation) op;
+ dependency = remotePendingChanges.checkDependencies(newOp);
+ if ((!dependency) && (!firstTry))
+ {
+ done = solveNamingConflict(newOp, msg);
+ }
+ } else if (op instanceof AddOperation)
+ {
+ AddOperation newOp = (AddOperation) op;
+ AddMsg addMsg = (AddMsg) msg;
+ dependency = remotePendingChanges.checkDependencies(newOp);
+ if (!dependency)
+ {
+ done = solveNamingConflict(newOp, addMsg);
+ }
+ } else if (op instanceof ModifyDNOperationBasis)
+ {
+ ModifyDNMsg newMsg = (ModifyDNMsg) msg;
+ dependency = remotePendingChanges.checkDependencies(newMsg);
+ if (!dependency)
+ {
+ ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
+ done = solveNamingConflict(newOp, msg);
+ }
+ } else
+ {
+ done = true; // unknown type of operation ?!
+ }
+ if (done)
+ {
+ // the update became a dummy update and the result
+ // of the conflict resolution phase is to do nothing.
+ // however we still need to push this change to the serverState
+ updateError(changeNumber);
+ }
+ } else
+ {
+ done = true;
+ }
+ firstTry = false;
+ }
+
+ if (!done && !dependency)
+ {
+ // Continue with the next change but the servers could now become
+ // inconsistent.
+ // Let the repair tool know about this.
+ Message message = ERR_LOOP_REPLAYING_OPERATION.get(op.toString(),
op.getErrorMessage().toString());
- logError(message);
- numUnresolvedNamingConflicts.incrementAndGet();
+ logError(message);
+ numUnresolvedNamingConflicts.incrementAndGet();
- updateError(changeNumber);
- }
- }
- catch (ASN1Exception e)
- {
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- }
- catch (LDAPException e)
- {
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- }
- catch (DataFormatException e)
- {
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- }
- catch (Exception e)
- {
- if (changeNumber != null)
- {
- /*
- * An Exception happened during the replay process.
- * Continue with the next change but the servers will now start
- * to be inconsistent.
- * Let the repair tool know about this.
- */
- Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
- stackTraceToSingleLineString(e), op.toString());
- logError(message);
- updateError(changeNumber);
- }
- else
+ updateError(changeNumber);
+ }
+ } catch (ASN1Exception e)
{
Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
+ String.valueOf(msg) + stackTraceToSingleLineString(e));
logError(message);
- }
- }
- finally
- {
- if (!dependency)
+ } catch (LDAPException e)
{
- if (msg.isAssured())
- ack(msg.getChangeNumber());
- incProcessedUpdates();
+ Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+ String.valueOf(msg) + stackTraceToSingleLineString(e));
+ logError(message);
+ } catch (DataFormatException e)
+ {
+ Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+ String.valueOf(msg) + stackTraceToSingleLineString(e));
+ logError(message);
+ } catch (Exception e)
+ {
+ if (changeNumber != null)
+ {
+ /*
+ * An Exception happened during the replay process.
+ * Continue with the next change but the servers will now start
+ * to be inconsistent.
+ * Let the repair tool know about this.
+ */
+ Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
+ stackTraceToSingleLineString(e), op.toString());
+ logError(message);
+ updateError(changeNumber);
+ } else
+ {
+ Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+ String.valueOf(msg) + stackTraceToSingleLineString(e));
+ logError(message);
+ }
+ } finally
+ {
+ if (!dependency)
+ {
+ if (msg.isAssured())
+ ack(msg.getChangeNumber());
+ incProcessedUpdates();
+ }
}
- }
+
+ // Now replay any pending update that had a dependency and whose
+ // dependency has been replayed, do that until no more updates of that
+ // type left...
+ msg = remotePendingChanges.getNextUpdate();
+
+ // Prepare restart of loop
+ done = false;
+ dependency = false;
+ changeNumber = null;
+ retryCount = 10;
+ firstTry = true;
+
+ } while (msg != null);
}
/**
@@ -2224,7 +2235,7 @@
* The session to the replication server will be stopped.
* The domain will not be destroyed but call to the pre-operation
* methods will result in failure.
- * The listener threads will be destroyed.
+ * The listener thread will be destroyed.
* The monitor informations will still be accessible.
*/
public void disable()
@@ -2232,23 +2243,14 @@
state.save();
state.clearInMemory();
disabled = true;
- // stop the listener threads
- for (ListenerThread thread : synchroThreads)
- {
- thread.shutdown();
- }
- broker.stop(); // this will cut the session and wake-up the listeners
- for (ListenerThread thread : synchroThreads)
- {
- try
- {
- thread.join(SHUTDOWN_JOIN_TIMEOUT);
- } catch (InterruptedException e)
- {
- // ignore
- }
- }
+ // Stop the listener thread
+ listenerThread.shutdown();
+
+ broker.stop(); // This will cut the session and wake up the listener
+
+ // Wait for the listener thread to stop
+ listenerThread.waitForShutdown();
}
/**
@@ -2265,7 +2267,6 @@
state.loadState();
disabled = false;
-
try
{
generationId = loadGenerationId();
@@ -2288,7 +2289,9 @@
broker.start(replicationServers);
- createListeners();
+ // Create the listener thread
+ listenerThread = new ListenerThread(this, updateToReplayQueue);
+ listenerThread.start();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java b/opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java
new file mode 100644
index 0000000..ff78d5a
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java
@@ -0,0 +1,73 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import org.opends.server.replication.protocol.UpdateMessage;
+
+/**
+ * This is a bag class to hold an update to replay in the queue of updates to
+ * be replayed by the replay threads.
+ * It associates an update message to replay with the matching
+ * ReplicationDomain.
+ */
+public class UpdateToReplay
+{
+ private UpdateMessage updateMessage = null;
+ private ReplicationDomain replicationDomain = null;
+
+ /**
+ * Construct the object associating the update message with the replication
+ * domain that must be used to replay it (the on it comes from).
+ * @param updateMessage The update message
+ * @param replicationDomain The replication domain to use for replaying the
+ * change from the update message
+ */
+ public UpdateToReplay(UpdateMessage updateMessage,
+ ReplicationDomain replicationDomain)
+ {
+ this.updateMessage = updateMessage;
+ this.replicationDomain = replicationDomain;
+ }
+
+ /**
+ * Getter for update message.
+ * @return The update message
+ */
+ public UpdateMessage getUpdateMessage()
+ {
+ return updateMessage;
+ }
+
+ /**
+ * Getter for replication domain.
+ * @return The replication domain
+ */
+ public ReplicationDomain getReplicationDomain()
+ {
+ return replicationDomain;
+ }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
index be9671e..3f02ee0 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
@@ -420,7 +420,7 @@
* To increase the risks of failures a loop of add/del/add is done.
*/
@SuppressWarnings("unchecked")
- @Test(enabled=false, groups="slow")
+ @Test(enabled=true, groups="slow")
public void addDelAddDependencyTest() throws Exception
{
ReplicationServer replServer = null;
@@ -552,7 +552,7 @@
* issuing a set of Add operation followed by a modrdn of the added entry.
*/
@SuppressWarnings("unchecked")
- @Test(enabled=false, groups="slow")
+ @Test(enabled=true, groups="slow")
public void addModdnDependencyTest() throws Exception
{
ReplicationServer replServer = null;
--
Gitblit v1.10.0