From 0f7f7b8d5e655ccd36aca7d9a3c425dfcd23ad62 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 23 Dec 2013 15:45:05 +0000
Subject: [PATCH] Fix feature envy between ListenerThread and ReplicationDomain
---
/dev/null | 157 --------------------------
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 4
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java | 3
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java | 3
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java | 3
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 4
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 114 ++++++++++++------
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java | 14 +-
9 files changed, 89 insertions(+), 216 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 754dcec..5dbbd5c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4535,7 +4535,7 @@
/** {@inheritDoc} */
@Override
- public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+ public boolean processUpdate(UpdateMsg updateMsg)
{
// Ignore message if fractional configuration is inconsistent and
// we have been passed into bad data set status
@@ -4569,7 +4569,7 @@
// Put update message into the replay queue
// (block until some place in the queue is available)
final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
- while (!shutdown.get())
+ while (!isListenerShuttingDown())
{
// loop until we can offer to the queue or shutdown was initiated
try
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java
deleted file mode 100644
index 88688a6..0000000
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
- */
-package org.opends.server.replication.service;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.opends.messages.Message;
-import org.opends.server.api.DirectoryThread;
-import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.protocol.UpdateMsg;
-
-/**
- * Thread that is used to get messages from the Replication servers
- * and replay them in the current server.
- */
-public class ListenerThread extends DirectoryThread
-{
- /**
- * The tracer object for the debug logger.
- */
- private static final DebugTracer TRACER = getTracer();
-
- private final ReplicationDomain repDomain;
- private AtomicBoolean shutdown = new AtomicBoolean(false);
- private volatile boolean done = false;
-
-
- /**
- * Constructor for the ListenerThread.
- *
- * @param repDomain the replication domain that created this thread
- */
- public ListenerThread(ReplicationDomain repDomain)
- {
- super("Replica DS(" + repDomain.getServerId() + ") listener for domain \""
- + repDomain.getBaseDNString() + "\"");
- this.repDomain = repDomain;
- }
-
- /**
- * Shutdown this listener thread.
- */
- public void shutdown()
- {
- shutdown.set(true);
- }
-
- /**
- * Run method for this class.
- */
- @Override
- public void run()
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("Replication Listener thread starting.");
- }
-
- while (!shutdown.get())
- {
- UpdateMsg updateMsg = null;
- try
- {
- // Loop receiving update messages and putting them in the update message
- // queue
- while (!shutdown.get() && ((updateMsg = repDomain.receive()) != null))
- {
- if (repDomain.processUpdate(updateMsg, shutdown))
- {
- repDomain.processUpdateDoneSynchronous(updateMsg);
- }
- }
-
- if (updateMsg == null)
- {
- shutdown.set(true);
- }
- }
- catch (Exception e)
- {
- /*
- * catch all exceptions happening in repDomain.receive so that the
- * thread never dies even in case of problems.
- */
- Message message = ERR_EXCEPTION_RECEIVING_REPLICATION_MESSAGE.get(
- stackTraceToSingleLineString(e));
- logError(message);
- }
- }
-
- done = true;
-
- if (debugEnabled())
- {
- TRACER.debugInfo("Replication Listener thread stopping.");
- }
- }
-
-
-
- /**
- * Wait for the completion of this thread.
- */
- public void waitForShutdown()
- {
- try
- {
- int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
- int n = 0;
- while (!done && this.isAlive())
- {
- Thread.sleep(50);
- n++;
- if (n >= FACTOR)
- {
- TRACER.debugInfo("Interrupting listener thread for dn "
- + repDomain.getBaseDNString() + " in DS "
- + repDomain.getServerId());
- this.interrupt();
- }
- }
- }
- catch (InterruptedException e)
- {
- // exit the loop if this thread is interrupted.
- }
- }
-}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index ae0a284..b01feaa 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -34,7 +34,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.opends.messages.Category;
@@ -79,7 +78,7 @@
* and which can start receiving updates.
* <p>
* When updates are received the Replication Service calls the
- * {@link #processUpdate(UpdateMsg, AtomicBoolean)} method.
+ * {@link #processUpdate(UpdateMsg)} method.
* ReplicationDomain implementation should implement the appropriate code
* for replaying the update on the local repository.
* When fully done the subclass must call the
@@ -156,7 +155,7 @@
* them to the global incoming update message queue for later processing by
* replay threads.
*/
- private ListenerThread listenerThread;
+ private volatile DirectoryThread listenerThread = null;
/**
* A Map used to store all the ReplicationDomains created on this server.
@@ -740,7 +739,7 @@
* Also responsible for updating the list of pending changes
* @return the received message - null if none
*/
- UpdateMsg receive()
+ private UpdateMsg receive()
{
UpdateMsg update = null;
@@ -2715,25 +2714,6 @@
}
/**
- * This method is called when the ReplicationDomain has completed the
- * processing of a received update synchronously.
- * In such cases the processUpdateDone () is called and the state
- * is updated automatically.
- *
- * @param msg The UpdateMessage that was processed.
- */
- void processUpdateDoneSynchronous(UpdateMsg msg)
- {
- /*
- Warning: in synchronous mode, no way to tell the replay of an update went
- wrong Just put null in processUpdateDone so that if assured replication
- is used the ack is sent without error at replay flag.
- */
- processUpdateDone(msg, null);
- state.update(msg.getCSN());
- }
-
- /**
* Check if the domain is connected to a ReplicationServer.
*
* @return true if the server is connected, false if not.
@@ -3000,7 +2980,7 @@
* Starts the receiver side of the Replication Service.
* <p>
* After this method has been called, the Replication Service will start
- * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
+ * calling the {@link #processUpdate(UpdateMsg)}.
* <p>
* This method must be called once and must be called after the
* {@link #startPublishService(ReplicationDomainCfg)}.
@@ -3009,8 +2989,48 @@
{
synchronized (sessionLock)
{
- // Create the listener thread
- listenerThread = new ListenerThread(this);
+ final String threadName = "Replica DS(" + getServerId()
+ + ") listener for domain \"" + getBaseDNString() + "\"";
+
+ listenerThread = new DirectoryThread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Replication Listener thread starting.");
+ }
+
+ // Loop processing any incoming update messages.
+ while (!listenerThread.isShutdownInitiated())
+ {
+ final UpdateMsg updateMsg = receive();
+ if (updateMsg == null)
+ {
+ // The server is shutting down.
+ listenerThread.initiateShutdown();
+ }
+ else if (processUpdate(updateMsg))
+ {
+ /*
+ * Warning: in synchronous mode, no way to tell the replay of an
+ * update went wrong Just put null in processUpdateDone so that if
+ * assured replication is used the ack is sent without error at
+ * replay flag.
+ */
+ processUpdateDone(updateMsg, null);
+ state.update(updateMsg.getCSN());
+ }
+ }
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Replication Listener thread stopping.");
+ }
+ }
+ }, threadName);
+
listenerThread.start();
}
}
@@ -3041,14 +3061,34 @@
// Stop the listener thread
if (listenerThread != null)
{
- listenerThread.shutdown();
- listenerThread.waitForShutdown();
+ listenerThread.initiateShutdown();
+ try
+ {
+ listenerThread.join();
+ }
+ catch (InterruptedException e)
+ {
+ // Give up waiting.
+ }
listenerThread = null;
}
}
}
/**
+ * Returns {@code true} if the listener thread is shutting down or has
+ * shutdown.
+ *
+ * @return {@code true} if the listener thread is shutting down or has
+ * shutdown.
+ */
+ protected final boolean isListenerShuttingDown()
+ {
+ final DirectoryThread tmp = listenerThread;
+ return tmp == null || tmp.isShutdownInitiated();
+ }
+
+ /**
* Restart the Replication service after a {@link #disableService()}.
* <p>
* The Replication Service will restart from the point indicated by the
@@ -3065,10 +3105,7 @@
synchronized (sessionLock)
{
broker.start();
-
- // Create the listener thread
- listenerThread = new ListenerThread(this);
- listenerThread.start();
+ startListenService();
}
}
@@ -3156,6 +3193,8 @@
*/
public abstract long countEntries() throws DirectoryException;
+
+
/**
* This method should handle the processing of {@link UpdateMsg} receive from
* remote replication entities.
@@ -3165,20 +3204,17 @@
*
* @param updateMsg
* The {@link UpdateMsg} that was received.
- * @param shutdown
- * whether the server initiated shutdown
* @return A boolean indicating if the processing is completed at return time.
* If <code> true </code> is returned, no further processing is
* necessary. If <code> false </code> is returned, the subclass should
* call the method {@link #processUpdateDone(UpdateMsg, String)} and
* update the ServerState When this processing is complete.
*/
- public abstract boolean processUpdate(UpdateMsg updateMsg,
- AtomicBoolean shutdown);
+ public abstract boolean processUpdate(UpdateMsg updateMsg);
/**
* This method must be called after each call to
- * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
+ * {@link #processUpdate(UpdateMsg)} when the processing of the
* update is completed.
* <p>
* It is useful for implementation needing to process the update in an
@@ -3192,7 +3228,7 @@
* this update, and this is the matching human readable message
* describing the problem.
*/
- public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
+ protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
{
broker.updateWindowAfterReplay();
@@ -3401,7 +3437,7 @@
* The Replication Service will handle the delivery of this {@link UpdateMsg}
* to all the participants of this Replication Domain. These members will be
* receive this {@link UpdateMsg} through a call of the
- * {@link #processUpdate(UpdateMsg, AtomicBoolean)} message.
+ * {@link #processUpdate(UpdateMsg)} message.
*
* @param msg The UpdateMsg that should be pushed.
*/
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
index 68db0b8..2a9b6c1 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -27,7 +27,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
@@ -88,7 +87,7 @@
}
@Override
- public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+ public boolean processUpdate(UpdateMsg updateMsg)
{
return false;
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index d46b539..59a7bb0 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -32,7 +32,6 @@
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -632,7 +631,7 @@
}
@Override
- public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+ public boolean processUpdate(UpdateMsg updateMsg)
{
if (queue != null)
queue.add(updateMsg);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
index 1b48882..8122a1a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
@@ -121,7 +121,7 @@
"uid=simultaneous2");
// Put the message in the replay queue
- domain.processUpdate(modDnMsg, SHUTDOWN);
+ domain.processUpdate(modDnMsg);
// Make the domain replay the change from the replay queue
domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -135,7 +135,7 @@
"uid=simulatneouswrong");
// Put the message in the replay queue
- domain.processUpdate(modDnMsg, SHUTDOWN);
+ domain.processUpdate(modDnMsg);
// Make the domain replay the change from the replay queue
// and resolve conflict
@@ -210,7 +210,7 @@
null);
// Put the message in the replay queue
- domain.processUpdate(addMsg, SHUTDOWN);
+ domain.processUpdate(addMsg);
// Make the domain replay the change from the replay queue
domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -294,7 +294,7 @@
null);
// Put the message in the replay queue
- domain.processUpdate(addMsg, SHUTDOWN);
+ domain.processUpdate(addMsg);
// Make the domain replay the change from the replay queue
domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -399,7 +399,7 @@
delMsg.setSubtreeDelete(true);
// Put the message in the replay queue
- domain.processUpdate(delMsg, SHUTDOWN);
+ domain.processUpdate(delMsg);
// Make the domain replay the change from the replay queue
domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -474,7 +474,7 @@
// NOT SUBTREE
// Put the message in the replay queue
- domain.processUpdate(delMsg, SHUTDOWN);
+ domain.processUpdate(delMsg);
// Make the domain replay the change from the replay queue
domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
@@ -557,7 +557,7 @@
new ArrayList<Attribute>());
// Put the message in the replay queue
- domain.processUpdate(addMsg, SHUTDOWN);
+ domain.processUpdate(addMsg);
// Make the domain replay the change from the replay queue
domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 9d213ba..282c423 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -33,7 +33,6 @@
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -57,7 +56,6 @@
import org.testng.annotations.Test;
import static java.util.Arrays.*;
-
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -504,7 +502,7 @@
}
@Override
- public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+ public boolean processUpdate(UpdateMsg updateMsg)
{
checkUpdateAssuredParameters(updateMsg);
nReceivedUpdates++;
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index df894f7..601a39b 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -31,7 +31,6 @@
import java.io.OutputStream;
import java.util.SortedSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.config.ConfigException;
import org.opends.server.replication.plugin.DomainFakeCfg;
@@ -151,7 +150,7 @@
}
@Override
- public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+ public boolean processUpdate(UpdateMsg updateMsg)
{
if (queue != null)
queue.add(updateMsg);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
index 12fbafa..b0a49c0 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -31,7 +31,6 @@
import java.io.OutputStream;
import java.util.SortedSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.config.ConfigException;
import org.opends.server.replication.plugin.DomainFakeCfg;
@@ -136,7 +135,7 @@
}
@Override
- public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
+ public boolean processUpdate(UpdateMsg updateMsg)
{
if (queue != null)
queue.add(updateMsg);
--
Gitblit v1.10.0