From 62c7d9970ee34d102a78ce72a459274a97721cc6 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 06 Jun 2014 14:50:38 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3697) Change time heart beat change numbers should be synced with updates
---
opendj3-server-dev/src/server/org/opends/server/core/DirectoryServer.java | 93 +++++--------
opendj3-server-dev/src/server/org/opends/server/replication/common/CSNGenerator.java | 16 +-
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 35 ++--
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java | 4
opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java | 59 +++-----
opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 7
opendj3-server-dev/src/server/org/opends/server/core/ServerShutdownMonitor.java | 15 +
opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java | 91 ++++++------
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java | 12 +
opendj3-server-dev/src/server/org/opends/server/extensions/TraditionalWorkerThread.java | 12 -
opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java | 58 +++-----
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java | 2
12 files changed, 188 insertions(+), 216 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/core/DirectoryServer.java b/opendj3-server-dev/src/server/org/opends/server/core/DirectoryServer.java
index 350b017..6c0a918 100644
--- a/opendj3-server-dev/src/server/org/opends/server/core/DirectoryServer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/core/DirectoryServer.java
@@ -276,43 +276,43 @@
* Returned when the user specified the --checkStartability option with other
* options like printing the usage, dumping messages, displaying version, etc.
*/
- private static int NOTHING_TO_DO = 0;
+ private static final int NOTHING_TO_DO = 0;
/**
* Returned when the user specified the --checkStartability option with
* some incompatible arguments.
*/
- private static int CHECK_ERROR = 1;
+ private static final int CHECK_ERROR = 1;
/**
* The server is already started.
*/
- private static int SERVER_ALREADY_STARTED = 98;
+ private static final int SERVER_ALREADY_STARTED = 98;
/**
* The server must be started as detached process.
*/
- private static int START_AS_DETACH = 99;
+ private static final int START_AS_DETACH = 99;
/**
* The server must be started as a non-detached process.
*/
- private static int START_AS_NON_DETACH = 100;
+ private static final int START_AS_NON_DETACH = 100;
/**
* The server must be started as a window service.
*/
- private static int START_AS_WINDOWS_SERVICE = 101;
+ private static final int START_AS_WINDOWS_SERVICE = 101;
/**
* The server must be started as detached and it is being called from the
* Windows Service.
*/
- private static int START_AS_DETACH_CALLED_FROM_WINDOWS_SERVICE = 102;
+ private static final int START_AS_DETACH_CALLED_FROM_WINDOWS_SERVICE = 102;
/**
* The server must be started as detached process and should not produce any
* output.
*/
- private static int START_AS_DETACH_QUIET = 103;
+ private static final int START_AS_DETACH_QUIET = 103;
/**
* The server must be started as non-detached process and should not produce
* any output.
*/
- private static int START_AS_NON_DETACH_QUIET = 104;
+ private static final int START_AS_NON_DETACH_QUIET = 104;
/** Temporary context object, to provide instance methods instead of static methods. */
private final DirectoryServerContext serverContext;
@@ -665,7 +665,7 @@
private int lookthroughLimit;
/** The current active persistent searches. */
- private AtomicInteger activePSearches = new AtomicInteger(0);
+ private final AtomicInteger activePSearches = new AtomicInteger(0);
/** The maximum number of concurrent persistent searches. */
private int maxPSearches;
@@ -1173,8 +1173,7 @@
* @throws InitializationException If a problem occurs while attempting to
* bootstrap the server.
*/
- public void bootstrapServer()
- throws InitializationException
+ private void bootstrapServer() throws InitializationException
{
// First, make sure that the server isn't currently running. If it isn't,
// then make sure that no other thread will try to start or bootstrap the
@@ -2134,7 +2133,7 @@
* the backends that is not related to the
* server configuration.
*/
- public void initializeBackends() throws ConfigException, InitializationException
+ private void initializeBackends() throws ConfigException, InitializationException
{
backendConfigManager = new BackendConfigManager(serverContext);
backendConfigManager.initializeBackendConfig();
@@ -2231,9 +2230,8 @@
* workflow conflicts with the workflow
* ID of an existing workflow.
*/
- public static void createAndRegisterWorkflowsWithDefaultNetworkGroup(
- Backend backend
- ) throws DirectoryException
+ private static void createAndRegisterWorkflowsWithDefaultNetworkGroup(
+ Backend backend) throws DirectoryException
{
// Create a workflow for each backend base DN and register the workflow
// with the default/internal/admin network group.
@@ -2264,10 +2262,8 @@
* workflow conflicts with the workflow
* ID of an existing workflow.
*/
- public static WorkflowImpl createWorkflow(
- DN baseDN,
- Backend backend
- ) throws DirectoryException
+ private static WorkflowImpl createWorkflow(DN baseDN, Backend backend)
+ throws DirectoryException
{
String backendID = backend.getBackendID();
@@ -2458,7 +2454,7 @@
* attempting to initialize and start the
* Directory Server.
*/
- public void configureWorkflowsManual()
+ private void configureWorkflowsManual()
throws ConfigException, InitializationException
{
// First of all re-initialize the current workflow configuration
@@ -2545,7 +2541,7 @@
* the group manager that is not related to
* the server configuration.
*/
- public void initializeGroupManager()
+ private void initializeGroupManager()
throws ConfigException, InitializationException
{
try
@@ -7550,16 +7546,9 @@
directoryServer.shuttingDown = true;
}
- try {
- directoryServer.configHandler.getConfigRootEntry();
- } catch (Exception e) {
-
- }
-
// Send an alert notification that the server is shutting down.
- LocalizableMessage message = NOTE_SERVER_SHUTDOWN.get(className, reason);
sendAlertNotification(directoryServer, ALERT_TYPE_SERVER_SHUTDOWN,
- message);
+ NOTE_SERVER_SHUTDOWN.get(className, reason));
// Create a shutdown monitor that will watch the rest of the shutdown
@@ -7583,7 +7572,18 @@
}
directoryServer.connectionHandlers.clear();
+ if (directoryServer.workQueue != null)
+ {
+ directoryServer.workQueue.finalizeWorkQueue(reason);
+ directoryServer.workQueue.waitUntilIdle(ServerShutdownMonitor.WAIT_TIME);
+ }
+ // shutdown replication
+ for (SynchronizationProvider provider :
+ directoryServer.synchronizationProviders)
+ {
+ provider.finalizeSynchronizationProvider();
+ }
// Call the shutdown plugins, and then finalize all the plugins defined in
// the server.
@@ -7593,14 +7593,6 @@
directoryServer.pluginConfigManager.finalizePlugins();
}
-
- // shutdown the Synchronization Providers
- for (SynchronizationProvider provider :
- directoryServer.synchronizationProviders)
- {
- provider.finalizeSynchronizationProvider();
- }
-
// Deregister the shutdown hook.
if (directoryServer.shutdownHook != null)
{
@@ -7612,13 +7604,6 @@
}
- // Stop the work queue.
- if (directoryServer.workQueue != null)
- {
- directoryServer.workQueue.finalizeWorkQueue(reason);
- }
-
-
// Notify all the shutdown listeners.
for (ServerShutdownListener shutdownListener :
directoryServer.shutdownListeners)
@@ -7776,8 +7761,8 @@
{
logger.traceException(e2);
- logger.warn(WARN_SHUTDOWN_CANNOT_RELEASE_SHARED_BACKEND_LOCK, backend
- .getBackendID(), stackTraceToSingleLineString(e2));
+ logger.warn(WARN_SHUTDOWN_CANNOT_RELEASE_SHARED_BACKEND_LOCK,
+ backend.getBackendID(), stackTraceToSingleLineString(e2));
// FIXME -- Do we need to send an admin alert?
}
}
@@ -7795,12 +7780,10 @@
}
// Release exclusive lock held on server.lock file
- String serverLockFileName = LockFileManager.getServerLockFileName();
- StringBuilder failureReason = new StringBuilder();
-
try {
- if (!LockFileManager.releaseLock(serverLockFileName,
- failureReason)) {
+ String serverLockFileName = LockFileManager.getServerLockFileName();
+ StringBuilder failureReason = new StringBuilder();
+ if (!LockFileManager.releaseLock(serverLockFileName, failureReason)) {
logger.info(NOTE_SERVER_SHUTDOWN, className, failureReason);
}
} catch (Exception e) {
@@ -9008,13 +8991,11 @@
* @return Returns the class loader to be used with this directory
* server application.
*/
- public static ClassLoader getClassLoader()
+ private static ClassLoader getClassLoader()
{
return ClassLoaderProvider.getInstance().getClassLoader();
}
-
-
/**
* Loads the named class using this directory server application's
* class loader.
@@ -9211,7 +9192,7 @@
*
* @return the workflow configuration mode
*/
- public static boolean workflowConfigurationModeIsAuto()
+ private static boolean workflowConfigurationModeIsAuto()
{
return directoryServer.workflowConfigurationMode
== WorkflowConfigurationMode.AUTO;
diff --git a/opendj3-server-dev/src/server/org/opends/server/core/ServerShutdownMonitor.java b/opendj3-server-dev/src/server/org/opends/server/core/ServerShutdownMonitor.java
index c7fe4f2..44b55d0 100644
--- a/opendj3-server-dev/src/server/org/opends/server/core/ServerShutdownMonitor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/core/ServerShutdownMonitor.java
@@ -36,9 +36,18 @@
* This class defines a daemon thread that will be used to monitor the server
* shutdown process and may help nudge it along if it appears to get hung.
*/
-public class ServerShutdownMonitor extends DirectoryThread
+class ServerShutdownMonitor extends DirectoryThread
{
/**
+ * Time in milliseconds for the shutdown monitor to:
+ * <ol>
+ * <li>wait before sending interrupt to threads</li>
+ * <li>wait before final shutdown</li>
+ * </ol>
+ */
+ static final long WAIT_TIME = 30000;
+
+ /**
* Indicates whether the monitor has completed and the shutdown may be
* finalized with a call to {@link System#exit()}.
*/
@@ -110,7 +119,7 @@
// For the first milestone, we'll run for up to 30 seconds just checking
// to see whether all threads have stopped yet.
- if (waitAllThreadsDied(30000))
+ if (waitAllThreadsDied(WAIT_TIME))
{
return;
}
@@ -129,7 +138,7 @@
} catch (Exception e) {}
}
- if (waitAllThreadsDied(30000))
+ if (waitAllThreadsDied(WAIT_TIME))
{
return;
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/extensions/TraditionalWorkerThread.java b/opendj3-server-dev/src/server/org/opends/server/extensions/TraditionalWorkerThread.java
index 9508ff9..81cdcbd 100644
--- a/opendj3-server-dev/src/server/org/opends/server/extensions/TraditionalWorkerThread.java
+++ b/opendj3-server-dev/src/server/org/opends/server/extensions/TraditionalWorkerThread.java
@@ -26,7 +26,6 @@
*/
package org.opends.server.extensions;
-
import java.util.Map;
import org.forgerock.i18n.LocalizableMessage;
@@ -40,7 +39,6 @@
import static org.opends.messages.CoreMessages.*;
import static org.opends.server.util.StaticUtils.*;
-
/**
* This class defines a data structure for storing and interacting with a
* Directory Server worker thread.
@@ -57,7 +55,7 @@
private volatile boolean shutdownRequested;
/**
- * Indicates whether this thread was stopped because the server threadnumber
+ * Indicates whether this thread was stopped because the server thread number
* was reduced.
*/
private boolean stoppedByReducedThreadNumber;
@@ -66,13 +64,13 @@
private boolean waitingForWork;
/** The operation that this worker thread is currently processing. */
- private Operation operation;
+ private volatile Operation operation;
/** The handle to the actual thread for this worker thread. */
private Thread workerThread;
/** The work queue that this worker thread will service. */
- private TraditionalWorkQueue workQueue;
+ private final TraditionalWorkQueue workQueue;
@@ -123,7 +121,7 @@
*/
public boolean isActive()
{
- return (isAlive() && (operation != null));
+ return isAlive() && operation != null;
}
@@ -142,7 +140,7 @@
try
{
waitingForWork = true;
- operation = null;
+ operation = null; // this line is necessary because next line can block
operation = workQueue.nextOperation(this);
waitingForWork = false;
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/common/CSNGenerator.java b/opendj3-server-dev/src/server/org/opends/server/replication/common/CSNGenerator.java
index 205f0dc..33ea83d 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/common/CSNGenerator.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/common/CSNGenerator.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.common;
@@ -44,7 +44,7 @@
* @see #lastTime
*/
private int seqnum;
- private int serverId;
+ private final int serverId;
/**
* Create a new {@link CSNGenerator}.
@@ -64,12 +64,12 @@
/**
* Create a new {@link CSNGenerator}.
*
- * @param id id to use when creating {@link CSN}s.
+ * @param serverId serverId to use when creating {@link CSN}s.
* @param state This generator will be created in a way that makes sure that
* all {@link CSN}s generated will be larger than all the
* {@link CSN}s currently in state.
*/
- public CSNGenerator(int id, ServerState state)
+ public CSNGenerator(int serverId, ServerState state)
{
this.lastTime = TimeThread.getTime();
for (CSN csn : state)
@@ -78,12 +78,12 @@
{
this.lastTime = csn.getTime();
}
- if (csn.getServerId() == id)
+ if (csn.getServerId() == serverId)
{
this.seqnum = csn.getSeqnum();
}
}
- this.serverId = id;
+ this.serverId = serverId;
}
/**
@@ -104,7 +104,7 @@
lastTime = curTime;
}
- if (++seqnum <= 0)
+ if (++seqnum <= 0) // check no underflow happened
{
seqnum = 0;
lastTime++;
@@ -155,7 +155,7 @@
lastTime = ++rcvdTime;
}
- if ((serverId == changeServerId) && (seqnum < changeSeqNum))
+ if (serverId == changeServerId && seqnum < changeSeqNum)
{
seqnum = changeSeqNum;
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index dfd3cd6..4b15777 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -104,6 +104,9 @@
* processing a change received from the replicationServer service,
* handle conflict resolution,
* handle protocol messages from the replicationServer.
+ * <p>
+ * FIXME Move this class to org.opends.server.replication.service
+ * or the equivalent package once this code is moved to a maven module.
*/
public final class LDAPReplicationDomain extends ReplicationDomain
implements ConfigurationChangeListener<ReplicationDomainCfg>,
@@ -4067,9 +4070,7 @@
// Now for bad data set status if needed
if (forceBadDataSet)
{
- // Go into bad data set status
- setNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
- broker.signalStatusChange(status);
+ signalNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
logger.info(NOTE_FRACTIONAL_BAD_DATA_SET_NEED_RESYNC, getBaseDNString());
return; // Do not send changes to the replication server
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
index b9f8203..e694ab4 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -22,28 +22,26 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS
+ * Portions copyright 2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
-import org.opends.server.types.DN;
import org.opends.server.types.operation.PluginOperation;
/**
* This class is use to store an operation currently
* in progress and not yet committed in the database.
*/
-public class PendingChange implements Comparable<PendingChange>
+class PendingChange implements Comparable<PendingChange>
{
- private CSN csn;
+ private final CSN csn;
private boolean committed;
private LDAPUpdateMsg msg;
- private PluginOperation op;
+ private final PluginOperation op;
private ServerState dependencyState;
- private DN targetDN;
/**
* Construct a new PendingChange.
@@ -51,7 +49,7 @@
* @param op the operation to use
* @param msg the message to use (can be null for local operations)
*/
- public PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
+ PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
{
this.csn = csn;
this.committed = false;
@@ -115,15 +113,6 @@
}
/**
- * Set the operation associated to this PendingChange.
- * @param op The operation associated to this PendingChange.
- */
- public void setOp(PluginOperation op)
- {
- this.op = op;
- }
-
- /**
* Add the given CSN to the list of dependencies of this PendingChange.
*
* @param csn
@@ -152,29 +141,24 @@
return state.cover(dependencyState);
}
- /**
- * Get the Target DN of this message.
- *
- * @return The target DN of this message.
- */
- public DN getTargetDN()
- {
- synchronized (this)
- {
- if (targetDN == null)
- {
- targetDN = msg.getDN();
- }
- return targetDN;
- }
- }
-
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public int compareTo(PendingChange o)
{
- return getCSN().compareTo(o.getCSN());
+ return csn.compareTo(o.csn);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName()
+ + " committed=" + committed
+ + ", csn=" + csn.toStringUI()
+ + ", msg=[" + msg
+ + "], isOperationSynchronized="
+ + (op != null ? op.isSynchronizationOperation() : "false")
+ + ", dependencyState="
+ + (dependencyState != null ? dependencyState : "");
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
index b66f7b6..42ee696 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -26,8 +26,8 @@
*/
package org.opends.server.replication.plugin;
+import java.util.Map.Entry;
import java.util.NoSuchElementException;
-import java.util.SortedMap;
import java.util.TreeMap;
import org.opends.server.replication.common.CSN;
@@ -51,19 +51,19 @@
/**
* A map used to store the pending changes.
*/
- private SortedMap<CSN, PendingChange> pendingChanges =
- new TreeMap<CSN, PendingChange>();
+ private final TreeMap<CSN, PendingChange> pendingChanges =
+ new TreeMap<CSN, PendingChange>();
/**
* The {@link CSNGenerator} to use to create new unique CSNs
* for each operation done on the replication domain.
*/
- private CSNGenerator csnGenerator;
+ private final CSNGenerator csnGenerator;
/**
* The ReplicationDomain that will be used to send UpdateMsg.
*/
- private ReplicationDomain domain;
+ private final ReplicationDomain domain;
private boolean recoveringOldChanges = false;
@@ -128,34 +128,32 @@
synchronized CSN putLocalOperation(PluginOperation operation)
{
final CSN csn = csnGenerator.newCSN();
- final PendingChange change = new PendingChange(csn, operation, null);
- pendingChanges.put(csn, change);
+ if (!operation.isSynchronizationOperation())
+ {
+ pendingChanges.put(csn, new PendingChange(csn, operation, null));
+ }
return csn;
}
/**
* Push all committed local changes to the replicationServer service.
- *
- * @return The number of pushed updates.
*/
- synchronized int pushCommittedChanges()
+ synchronized void pushCommittedChanges()
{
- int numSentUpdates = 0;
- if (pendingChanges.isEmpty())
+ // peek the oldest change
+ Entry<CSN, PendingChange> firstEntry = pendingChanges.firstEntry();
+ if (firstEntry == null)
{
- return numSentUpdates;
+ return;
}
- // peek the oldest CSN
- CSN firstCSN = pendingChanges.firstKey();
- PendingChange firstChange = pendingChanges.get(firstCSN);
+ PendingChange firstChange = firstEntry.getValue();
while (firstChange != null && firstChange.isCommitted())
{
final PluginOperation op = firstChange.getOp();
if (op != null && !op.isSynchronizationOperation())
{
- numSentUpdates++;
final LDAPUpdateMsg updateMsg = firstChange.getMsg();
if (!recoveringOldChanges)
{
@@ -168,20 +166,14 @@
domain.getServerState().update(updateMsg.getCSN());
}
}
- pendingChanges.remove(firstCSN);
- if (pendingChanges.isEmpty())
- {
- firstChange = null;
- }
- else
- {
- // peek the oldest CSN
- firstCSN = pendingChanges.firstKey();
- firstChange = pendingChanges.get(firstCSN);
- }
+ // false warning: firstEntry will not be null if firstChange is not null
+ pendingChanges.remove(firstEntry.getKey());
+
+ // peek the oldest change
+ firstEntry = pendingChanges.firstEntry();
+ firstChange = firstEntry != null ? firstEntry.getValue() : null;
}
- return numSentUpdates;
}
/**
@@ -189,16 +181,13 @@
* push all committed local changes to the replicationServer service
* in a single atomic operation.
*
- *
* @param csn The CSN of the update message that must be set as committed.
- * @param msg The message associated to the update.
- *
- * @return The number of pushed updates.
+ * @param msg The message associated to the update.
*/
- synchronized int commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
+ synchronized void commitAndPushCommittedChanges(CSN csn, LDAPUpdateMsg msg)
{
commit(csn, msg);
- return pushCommittedChanges();
+ pushCommittedChanges();
}
/**
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index bb7dd46..5126be4 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -48,7 +48,7 @@
*
* One of this object is instantiated for each ReplicationDomain.
*/
-public final class RemotePendingChanges
+final class RemotePendingChanges
{
/**
* A map used to store the pending changes.
@@ -124,7 +124,7 @@
CSN firstCSN = pendingChanges.firstKey();
PendingChange firstChange = pendingChanges.get(firstCSN);
- while ((firstChange != null) && firstChange.isCommitted())
+ while (firstChange != null && firstChange.isCommitted())
{
state.update(firstCSN);
pendingChanges.remove(firstCSN);
@@ -196,17 +196,19 @@
public synchronized boolean checkDependencies(AddOperation op)
{
boolean hasDependencies = false;
- DN targetDn = op.getEntryDN();
- CSN csn = OperationContext.getCSN(op);
- PendingChange change = pendingChanges.get(csn);
+ final DN targetDN = op.getEntryDN();
+ final CSN csn = OperationContext.getCSN(op);
+ final PendingChange change = pendingChanges.get(csn);
if (change == null)
+ {
return false;
+ }
for (PendingChange pendingChange : pendingChanges.values())
{
if (pendingChange.getCSN().isOlderThan(csn))
{
- LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+ final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
if (pendingMsg != null)
{
if (pendingMsg instanceof DeleteMsg)
@@ -215,7 +217,7 @@
* Check is the operation to be run is a deleteOperation on the
* same DN.
*/
- if (pendingChange.getTargetDN().equals(targetDn))
+ if (pendingMsg.getDN().equals(targetDN))
{
hasDependencies = true;
addDependency(change, pendingChange);
@@ -227,7 +229,7 @@
* Check if the operation to be run is an addOperation on a
* parent of the current AddOperation.
*/
- if (pendingChange.getTargetDN().isAncestorOf(targetDn))
+ if (pendingMsg.getDN().isAncestorOf(targetDN))
{
hasDependencies = true;
addDependency(change, pendingChange);
@@ -240,15 +242,15 @@
* the same target DN as the ADD DN
* or a ModifyDnOperation with new DN equals to the ADD DN parent
*/
- if (pendingChange.getTargetDN().equals(targetDn))
+ if (pendingMsg.getDN().equals(targetDN))
{
hasDependencies = true;
addDependency(change, pendingChange);
}
else
{
- ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
- if (pendingModDn.newDNIsParent(targetDn))
+ final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
+ if (pendingModDn.newDNIsParent(targetDN))
{
hasDependencies = true;
addDependency(change, pendingChange);
@@ -286,30 +288,26 @@
public synchronized boolean checkDependencies(ModifyOperation op)
{
boolean hasDependencies = false;
- DN targetDn = op.getEntryDN();
- CSN csn = OperationContext.getCSN(op);
- PendingChange change = pendingChanges.get(csn);
+ final DN targetDN = op.getEntryDN();
+ final CSN csn = OperationContext.getCSN(op);
+ final PendingChange change = pendingChanges.get(csn);
if (change == null)
+ {
return false;
+ }
for (PendingChange pendingChange : pendingChanges.values())
{
if (pendingChange.getCSN().isOlderThan(csn))
{
- LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
- if (pendingMsg != null)
+ final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+ if (pendingMsg instanceof AddMsg)
{
- if (pendingMsg instanceof AddMsg)
+ // Check if the operation to be run is an addOperation on a same DN.
+ if (pendingMsg.getDN().equals(targetDN))
{
- /*
- * Check if the operation to be run is an addOperation on a
- * same DN.
- */
- if (pendingChange.getTargetDN().equals(targetDn))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
+ hasDependencies = true;
+ addDependency(change, pendingChange);
}
}
}
@@ -342,29 +340,30 @@
*
* @return A boolean indicating if this operation has some dependencies.
*/
- public synchronized boolean checkDependencies(ModifyDNMsg msg)
+ private synchronized boolean checkDependencies(ModifyDNMsg msg)
{
boolean hasDependencies = false;
- CSN csn = msg.getCSN();
- PendingChange change = pendingChanges.get(csn);
+ final CSN csn = msg.getCSN();
+ final PendingChange change = pendingChanges.get(csn);
if (change == null)
+ {
return false;
+ }
- DN targetDn = change.getTargetDN();
-
+ final DN targetDN = change.getMsg().getDN();
for (PendingChange pendingChange : pendingChanges.values())
{
if (pendingChange.getCSN().isOlderThan(csn))
{
- LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+ final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
if (pendingMsg != null)
{
if (pendingMsg instanceof DeleteMsg)
{
// Check if the target of the Delete is the same
// as the new DN of this ModifyDN
- if (msg.newDNIsEqual(pendingChange.getTargetDN()))
+ if (msg.newDNIsEqual(pendingMsg.getDN()))
{
hasDependencies = true;
addDependency(change, pendingChange);
@@ -374,14 +373,14 @@
{
// Check if the Add Operation was done on the new parent of
// the MODDN operation
- if (msg.newParentIsEqual(pendingChange.getTargetDN()))
+ if (msg.newParentIsEqual(pendingMsg.getDN()))
{
hasDependencies = true;
addDependency(change, pendingChange);
}
// Check if the AddOperation was done on the same DN as the
// target DN of the MODDN operation
- if (pendingChange.getTargetDN().equals(targetDn))
+ if (pendingMsg.getDN().equals(targetDN))
{
hasDependencies = true;
addDependency(change, pendingChange);
@@ -391,7 +390,7 @@
{
// Check if the ModifyDNOperation was done from the new DN of
// the MODDN operation
- if (msg.newDNIsEqual(pendingChange.getTargetDN()))
+ if (msg.newDNIsEqual(pendingMsg.getDN()))
{
hasDependencies = true;
addDependency(change, pendingChange);
@@ -431,17 +430,19 @@
public synchronized boolean checkDependencies(DeleteOperation op)
{
boolean hasDependencies = false;
- DN targetDn = op.getEntryDN();
- CSN csn = OperationContext.getCSN(op);
- PendingChange change = pendingChanges.get(csn);
+ final DN targetDN = op.getEntryDN();
+ final CSN csn = OperationContext.getCSN(op);
+ final PendingChange change = pendingChanges.get(csn);
if (change == null)
+ {
return false;
+ }
for (PendingChange pendingChange : pendingChanges.values())
{
if (pendingChange.getCSN().isOlderThan(csn))
{
- LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+ final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
if (pendingMsg != null)
{
if (pendingMsg instanceof DeleteMsg)
@@ -450,7 +451,7 @@
* Check if the operation to be run is a deleteOperation on a
* children of the current DeleteOperation.
*/
- if (pendingChange.getTargetDN().isDescendantOf(targetDn))
+ if (pendingMsg.getDN().isDescendantOf(targetDN))
{
hasDependencies = true;
addDependency(change, pendingChange);
@@ -462,7 +463,7 @@
* Check if the operation to be run is an addOperation on a
* parent of the current DeleteOperation.
*/
- if (pendingChange.getTargetDN().equals(targetDn))
+ if (pendingMsg.getDN().equals(targetDN))
{
hasDependencies = true;
addDependency(change, pendingChange);
@@ -470,13 +471,13 @@
}
else if (pendingMsg instanceof ModifyDNMsg)
{
- ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
+ final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
/*
* Check if the operation to be run is an ModifyDNOperation
* on a children of the current DeleteOperation
*/
- if ((pendingChange.getTargetDN().isDescendantOf(targetDn)) ||
- (pendingModDn.newDNIsParent(targetDn)))
+ if (pendingMsg.getDN().isDescendantOf(targetDN)
+ || pendingModDn.newDNIsParent(targetDN))
{
hasDependencies = true;
addDependency(change, pendingChange);
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 9aefb0b..dc93aa1 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -86,12 +86,12 @@
* topology. Using an AtomicReference to avoid leaking references to costly
* threads.
*/
- private AtomicReference<MonitoringPublisher> monitoringPublisher =
+ private final AtomicReference<MonitoringPublisher> monitoringPublisher =
new AtomicReference<MonitoringPublisher>();
/**
* Maintains monitor data for the current domain.
*/
- private ReplicationDomainMonitor domainMonitor =
+ private final ReplicationDomainMonitor domainMonitor =
new ReplicationDomainMonitor(this);
/**
@@ -116,7 +116,7 @@
private final ReplicationDomainDB domainDB;
/** The ReplicationServer that created the current instance. */
- private ReplicationServer localReplicationServer;
+ private final ReplicationServer localReplicationServer;
/**
* The generationId of the current replication domain. The generationId is
@@ -158,7 +158,7 @@
* The timer used to run the timeout code (timer tasks) for the assured update
* messages we are waiting acks for.
*/
- private Timer assuredTimeoutTimer;
+ private final Timer assuredTimeoutTimer;
/**
* Counter used to purge the timer tasks references in assuredTimeoutTimer,
* every n number of treated assured messages.
@@ -186,8 +186,6 @@
private boolean sendDSTopologyMsg;
private int excludedDSForTopologyMsg = -1;
-
-
/**
* Enqueues a TopologyMsg for all the connected directory servers in order
* to let them know the topology (every known DSs and RSs).
@@ -212,8 +210,6 @@
}
}
-
-
/**
* Enqueues a TopologyMsg for all the connected replication servers in order
* to let them know our connected LDAP servers.
@@ -223,8 +219,6 @@
sendRSTopologyMsg = true;
}
-
-
/**
* Enqueues a ChangeTimeHeartbeatMsg received from a DS for forwarding to
* all other RS instances.
@@ -237,19 +231,28 @@
pendingHeartbeats.put(msg.getCSN().getServerId(), msg);
}
-
-
private void enqueueDSMonitorMsg(int dsServerId, MonitorMsg msg)
{
pendingDSMonitorMsgs.put(dsServerId, msg);
}
-
-
private void enqueueRSMonitorMsg(int rsServerId, MonitorMsg msg)
{
pendingRSMonitorMsgs.put(rsServerId, msg);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName()
+ + " pendingHeartbeats=" + pendingHeartbeats
+ + ", pendingDSMonitorMsgs=" + pendingDSMonitorMsgs
+ + ", pendingRSMonitorMsgs=" + pendingRSMonitorMsgs
+ + ", sendRSTopologyMsg=" + sendRSTopologyMsg
+ + ", sendDSTopologyMsg=" + sendDSTopologyMsg
+ + ", excludedDSForTopologyMsg=" + excludedDSForTopologyMsg;
+ }
}
private final Object pendingStatusMessagesLock = new Object();
@@ -2086,7 +2089,7 @@
/**
* Clears the Db associated with that domain.
*/
- public void clearDbs()
+ private void clearDbs()
{
try
{
@@ -2768,7 +2771,7 @@
}
}
- if (pendingMsgs.sendRSTopologyMsg)
+ if (pendingMsgs.sendRSTopologyMsg && !connectedRSs.isEmpty())
{
final TopologyMsg topoMsg = createTopologyMsgForRS();
for (ServerHandler handler : connectedRSs.values())
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c6b4149..5d05008 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2777,10 +2777,10 @@
synchronized (startStopLock)
{
+ stopChangeTimeHeartBeatPublishing();
+ stopRSHeartBeatMonitoring();
shutdown = true;
setConnectedRS(ConnectedRS.stopped());
- stopRSHeartBeatMonitoring();
- stopChangeTimeHeartBeatPublishing();
deregisterReplicationMonitor();
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index de653f3..ce9d714 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -210,7 +210,7 @@
/**
* Current status for this replicated domain.
*/
- protected ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
+ private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
/** The configuration of the replication domain. */
@@ -2432,7 +2432,13 @@
* event.
* @param event The event that may make the status be changed
*/
- protected void setNewStatus(StatusMachineEvent event)
+ protected void signalNewStatus(StatusMachineEvent event)
+ {
+ setNewStatus(event);
+ broker.signalStatusChange(status);
+ }
+
+ private void setNewStatus(StatusMachineEvent event)
{
ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
if (newStatus == ServerStatus.INVALID_STATUS)
@@ -3400,7 +3406,7 @@
* receive this {@link UpdateMsg} through a call of the
* {@link #processUpdate(UpdateMsg)} message.
*
- * @param msg The UpdateMsg that should be pushed.
+ * @param msg The UpdateMsg that should be published.
*/
public void publish(UpdateMsg msg)
{
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
index cae6348..86eda32 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -52,7 +52,7 @@
}
@Override
- protected void setNewStatus(StatusMachineEvent event)
+ protected void signalNewStatus(StatusMachineEvent event)
{
}
--
Gitblit v1.10.0