From 856fdd0571358c660afaf379f8e774ab8b24f05c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 24 Jun 2013 15:20:37 +0000
Subject: [PATCH] OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java | 57 ++--
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 87 +++----
opends/src/server/org/opends/server/replication/service/ListenerThread.java | 28 +-
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 194 ++++++++-------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java | 37 +-
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java | 116 +++-----
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java | 49 +--
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java | 27 +-
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 85 ++----
9 files changed, 311 insertions(+), 369 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 4a7af7e..c86bc38 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -25,7 +25,6 @@
* Copyright 2006-2010 Sun Microsystems, Inc.
* Portions Copyright 2011-2013 ForgeRock AS
*/
-
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
@@ -37,14 +36,12 @@
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
-import java.io.File;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.StringReader;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
import java.util.*;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
@@ -63,14 +60,7 @@
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
-import org.opends.server.core.AddOperation;
-import org.opends.server.core.DeleteOperation;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.LockFileManager;
-import org.opends.server.core.ModifyDNOperation;
-import org.opends.server.core.ModifyDNOperationBasis;
-import org.opends.server.core.ModifyOperation;
-import org.opends.server.core.ModifyOperationBasis;
+import org.opends.server.core.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -80,12 +70,7 @@
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
@@ -93,16 +78,7 @@
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.*;
-import org.opends.server.types.operation.PluginOperation;
-import org.opends.server.types.operation.PostOperationAddOperation;
-import org.opends.server.types.operation.PostOperationDeleteOperation;
-import org.opends.server.types.operation.PostOperationModifyDNOperation;
-import org.opends.server.types.operation.PostOperationModifyOperation;
-import org.opends.server.types.operation.PostOperationOperation;
-import org.opends.server.types.operation.PreOperationAddOperation;
-import org.opends.server.types.operation.PreOperationDeleteOperation;
-import org.opends.server.types.operation.PreOperationModifyDNOperation;
-import org.opends.server.types.operation.PreOperationModifyOperation;
+import org.opends.server.types.operation.*;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -191,8 +167,10 @@
*/
private static final DebugTracer TRACER = getTracer();
- // The update to replay message queue where the listener thread is going to
- // push incoming update messages.
+ /**
+ * The update to replay message queue where the listener thread is going to
+ * push incoming update messages.
+ */
private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
@@ -238,9 +216,11 @@
private volatile boolean disabled = false;
private volatile boolean stateSavingDisabled = false;
- // This list is used to temporary store operations that needs
- // to be replayed at session establishment time.
- private final TreeMap<ChangeNumber, FakeOperation> replayOperations =
+ /**
+ * This list is used to temporary store operations that needs to be replayed
+ * at session establishment time.
+ */
+ private final SortedMap<ChangeNumber, FakeOperation> replayOperations =
new TreeMap<ChangeNumber, FakeOperation>();
/**
@@ -288,7 +268,7 @@
* Fractional replication variables.
*/
- // Holds the fractional configuration for this domain, if any.
+ /** Holds the fractional configuration for this domain, if any. */
private FractionalConfig fractionalConfig = null;
/**
@@ -341,29 +321,39 @@
* fractionalFilterOperation(PreOperationModifyOperation
* modifyOperation, boolean performFiltering) method
*/
- // The operation contains attributes subject to fractional filtering according
- // to the fractional configuration
+ /**
+ * The operation contains attributes subject to fractional filtering according
+ * to the fractional configuration.
+ */
private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1;
- // The operation contains no attributes subject to fractional filtering
- // according to the fractional configuration
+ /**
+ * The operation contains no attributes subject to fractional filtering
+ * according to the fractional configuration.
+ */
private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2;
- // The operation should become a no-op
+ /** The operation should become a no-op. */
private static final int FRACTIONAL_BECOME_NO_OP = 3;
- // This configuration boolean indicates if this ReplicationDomain should log
- // ChangeNumbers.
+ /**
+ * This configuration boolean indicates if this ReplicationDomain should log
+ * ChangeNumbers.
+ */
private boolean logChangeNumber = false;
- // This configuration integer indicates the time the domain keeps the
- // historical information necessary to solve conflicts.
- // When a change stored in the historical part of the user entry has a date
- // (from its replication ChangeNumber) older than this delay, it is candidate
- // to be purged.
+ /**
+ * This configuration integer indicates the time the domain keeps the
+ * historical information necessary to solve conflicts.<br>
+ * When a change stored in the historical part of the user entry has a date
+ * (from its replication ChangeNumber) older than this delay, it is candidate
+ * to be purged.
+ */
private long histPurgeDelayInMilliSec = 0;
- // The last change number purged in this domain. Allows to have a continuous
- // purging process from one purge processing (task run) to the next one.
- // Values 0 when the server starts.
+ /**
+ * The last change number purged in this domain. Allows to have a continuous
+ * purging process from one purge processing (task run) to the next one.
+ * Values 0 when the server starts.
+ */
private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
/**
@@ -752,10 +742,8 @@
if (fractionalConfig.isFractional())
{
// Set new fractional configuration values
- if (newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
- fractionalConfig.setFractionalExclusive(true);
- else
- fractionalConfig.setFractionalExclusive(false);
+ fractionalConfig.setFractionalExclusive(
+ newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
fractionalConfig.setFractionalSpecificClassesAttributes(
newFractionalConfig.getFractionalSpecificClassesAttributes());
fractionalConfig.setFractionalAllClassesAttributes(
@@ -950,10 +938,8 @@
// Set stored fractional configuration values
if (storedFractionalConfig.isFractional())
{
- if (storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
- storedFractionalConfig.setFractionalExclusive(true);
- else
- storedFractionalConfig.setFractionalExclusive(false);
+ storedFractionalConfig.setFractionalExclusive(
+ storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
}
storedFractionalConfig.setFractionalSpecificClassesAttributes(
storedFractionalSpecificClassesAttributes);
@@ -1434,12 +1420,10 @@
AttributeValue rdnAttributeValue =
entryRdn.getAttributeValue(attributeType);
List<Attribute> attrList = attributesMap.get(attributeType);
- Iterator<Attribute> attrIt = attrList.iterator();
AttributeValue sameAttrValue = null;
// Locate the attribute value identical to the one in the RDN
- while(attrIt.hasNext())
+ for (Attribute attr : attrList)
{
- Attribute attr = attrIt.next();
if (attr.contains(rdnAttributeValue))
{
for (AttributeValue attrValue : attr) {
@@ -2577,9 +2561,12 @@
/**
* Create and replay a synchronized Operation from an UpdateMsg.
*
- * @param msg The UpdateMsg to be replayed.
+ * @param msg
+ * The UpdateMsg to be replayed.
+ * @param shutdown
+ * whether the server initiated shutdown
*/
- public void replay(LDAPUpdateMsg msg)
+ public void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown)
{
Operation op = null;
boolean replayDone = false;
@@ -2599,6 +2586,11 @@
while ((!dependency) && (!replayDone) && (retryCount-- > 0))
{
+ if (shutdown.get())
+ {
+ // shutdown initiated, let's leave
+ return;
+ }
// Try replay the operation
op.setInternalOperation(true);
op.setSynchronizationOperation(true);
@@ -2622,6 +2614,25 @@
// renamed by a more recent modify DN.
replayDone = true;
}
+ else if (result == ResultCode.BUSY)
+ {
+ /*
+ * We probably could not get a lock (OPENDJ-885). Give the server
+ * another chance to process this operation immediately.
+ */
+ Thread.yield();
+ continue;
+ }
+ else if (result == ResultCode.UNAVAILABLE)
+ {
+ /*
+ * It can happen when a rebuild is performed or the backend is
+ * offline (OPENDJ-49). Give the server another chance to process
+ * this operation after some time.
+ */
+ Thread.sleep(50);
+ continue;
+ }
else if (op instanceof ModifyOperation)
{
ModifyOperation newOp = (ModifyOperation) op;
@@ -2693,22 +2704,13 @@
}
} catch (ASN1Exception e)
{
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- replayErrorMsg = message.toString();
+ replayErrorMsg = logDecodingOperationError(msg, e);
} catch (LDAPException e)
{
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- replayErrorMsg = message.toString();
+ replayErrorMsg = logDecodingOperationError(msg, e);
} catch (DataFormatException e)
{
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- replayErrorMsg = message.toString();
+ replayErrorMsg = logDecodingOperationError(msg, e);
} catch (Exception e)
{
if (changeNumber != null)
@@ -2726,10 +2728,7 @@
updateError(changeNumber);
} else
{
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- replayErrorMsg = message.toString();
+ replayErrorMsg = logDecodingOperationError(msg, e);
}
} finally
{
@@ -2753,6 +2752,14 @@
} while (msg != null);
}
+ private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e)
+ {
+ Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+ String.valueOf(msg) + stackTraceToSingleLineString(e));
+ logError(message);
+ return message.toString();
+ }
+
/**
* This method is called when an error happens while replaying
* an operation.
@@ -4862,7 +4869,7 @@
* {@inheritDoc}
*/
@Override
- public boolean processUpdate(UpdateMsg updateMsg)
+ public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
{
// Ignore message if fractional configuration is inconsistent and
// we have been passed into bad data set status
@@ -4880,8 +4887,23 @@
// Put update message into the replay queue
// (block until some place in the queue is available)
- UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
- updateToReplayQueue.offer(updateToReplay);
+ final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
+ while (!shutdown.get())
+ {
+ // loop until we can offer to the queue or shutdown was initiated
+ try
+ {
+ if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS))
+ {
+ // successful offer to the queue, let's exit the loop
+ break;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Thread interrupted: check for shutdown.
+ }
+ }
return false;
}
@@ -5321,10 +5343,8 @@
case EXCLUSIVE_FRACTIONAL:
case INCLUSIVE_FRACTIONAL:
result.setFractional(true);
- if (newFractionalMode == EXCLUSIVE_FRACTIONAL)
- result.setFractionalExclusive(true);
- else
- result.setFractionalExclusive(false);
+ result.setFractionalExclusive(
+ newFractionalMode == EXCLUSIVE_FRACTIONAL);
break;
}
result.setFractionalSpecificClassesAttributes(
diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index c92c682..aab9397 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -27,57 +27,29 @@
*/
package org.opends.server.replication.plugin;
-import java.util.ArrayList;
-import static org.opends.server.replication.plugin.
-ReplicationRepairRequestControl.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
+import static org.opends.server.util.StaticUtils.*;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
-import org.opends.server.api.Backend;
-import org.opends.server.api.BackupTaskListener;
-import org.opends.server.api.ExportTaskListener;
-import org.opends.server.api.ImportTaskListener;
-import org.opends.server.api.RestoreTaskListener;
-import org.opends.server.api.SynchronizationProvider;
+import org.opends.server.api.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.types.BackupConfig;
-import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.Control;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
-import org.opends.server.types.LDIFExportConfig;
-import org.opends.server.types.LDIFImportConfig;
-import org.opends.server.types.Modification;
-import org.opends.server.types.Operation;
-import org.opends.server.types.RestoreConfig;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SynchronizationProviderResult;
-import org.opends.server.types.operation.PluginOperation;
-import org.opends.server.types.operation.PostOperationAddOperation;
-import org.opends.server.types.operation.PostOperationDeleteOperation;
-import org.opends.server.types.operation.PostOperationModifyDNOperation;
-import org.opends.server.types.operation.PostOperationModifyOperation;
-import org.opends.server.types.operation.PostOperationOperation;
-import org.opends.server.types.operation.PreOperationAddOperation;
-import org.opends.server.types.operation.PreOperationDeleteOperation;
-import org.opends.server.types.operation.PreOperationModifyDNOperation;
-import org.opends.server.types.operation.PreOperationModifyOperation;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
+import org.opends.server.types.*;
+import org.opends.server.types.operation.*;
/**
* This class is used to load the Replication code inside the JVM
@@ -104,8 +76,8 @@
* The queue of received update messages, to be treated by the ReplayThread
* threads.
*/
- private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
- new LinkedBlockingQueue<UpdateToReplay>();
+ private static final BlockingQueue<UpdateToReplay> updateToReplayQueue =
+ new LinkedBlockingQueue<UpdateToReplay>(10000);
/**
* The list of ReplayThread threads.
@@ -228,9 +200,9 @@
}
/**
- * Creates a new domain from its configEntry, do the
- * necessary initialization and starts it so that it is
- * fully operational when this method returns.
+ * Creates a new domain from its configEntry, do the necessary initialization
+ * and starts it so that it is fully operational when this method returns. It
+ * is only used for tests so far.
*
* @param configuration The entry with the configuration of this domain.
* @param queue The BlockingQueue that this domain will use.
@@ -239,13 +211,13 @@
*
* @throws ConfigException When the configuration is not valid.
*/
- public static LDAPReplicationDomain createNewDomain(
+ static LDAPReplicationDomain createNewDomain(
ReplicationDomainCfg configuration,
BlockingQueue<UpdateToReplay> queue)
throws ConfigException
{
- LDAPReplicationDomain domain;
- domain = new LDAPReplicationDomain(configuration, queue);
+ LDAPReplicationDomain domain =
+ new LDAPReplicationDomain(configuration, queue);
domains.put(domain.getBaseDN(), domain);
return domain;
@@ -362,6 +334,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isConfigurationAddAcceptable(
ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
{
@@ -372,6 +345,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public ConfigChangeResult applyConfigurationAdd(
ReplicationDomainCfg configuration)
{
@@ -652,6 +626,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processBackupBegin(Backend backend, BackupConfig config)
{
for (DN dn : backend.getBaseDNs())
@@ -665,6 +640,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processBackupEnd(Backend backend, BackupConfig config,
boolean successful)
{
@@ -679,6 +655,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processRestoreBegin(Backend backend, RestoreConfig config)
{
for (DN dn : backend.getBaseDNs())
@@ -692,6 +669,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processRestoreEnd(Backend backend, RestoreConfig config,
boolean successful)
{
@@ -706,6 +684,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processImportBegin(Backend backend, LDIFImportConfig config)
{
for (DN dn : backend.getBaseDNs())
@@ -719,6 +698,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processImportEnd(Backend backend, LDIFImportConfig config,
boolean successful)
{
@@ -733,6 +713,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processExportBegin(Backend backend, LDIFExportConfig config)
{
for (DN dn : backend.getBaseDNs())
@@ -746,6 +727,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processExportEnd(Backend backend, LDIFExportConfig config,
boolean successful)
{
@@ -760,6 +742,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public ConfigChangeResult applyConfigurationDelete(
ReplicationDomainCfg configuration)
{
@@ -771,6 +754,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isConfigurationDeleteAcceptable(
ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
{
@@ -804,10 +788,10 @@
/**
* {@inheritDoc}
*/
- public boolean
- isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
- configuration,
- List<Message> unacceptableReasons)
+ @Override
+ public boolean isConfigurationChangeAcceptable(
+ ReplicationSynchronizationProviderCfg configuration,
+ List<Message> unacceptableReasons)
{
return true;
}
@@ -815,9 +799,9 @@
/**
* {@inheritDoc}
*/
- public ConfigChangeResult
- applyConfigurationChange
- (ReplicationSynchronizationProviderCfg configuration)
+ @Override
+ public ConfigChangeResult applyConfigurationChange(
+ ReplicationSynchronizationProviderCfg configuration)
{
int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
@@ -838,6 +822,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void completeSynchronizationProvider()
{
isRegistered = true;
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
index 1304346..a57c110 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -26,20 +26,20 @@
* Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.plugin;
-import org.opends.server.replication.protocol.LDAPUpdateMsg;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.opends.messages.Message;
-
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
/**
* Thread that is used to get message from the replication servers (stored
@@ -56,7 +56,7 @@
private static final DebugTracer TRACER = getTracer();
private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
- private volatile boolean shutdown = false;
+ private AtomicBoolean shutdown = new AtomicBoolean(false);
private static int count = 0;
/**
@@ -75,7 +75,7 @@
*/
public void shutdown()
{
- shutdown = true;
+ shutdown.set(true);
}
/**
@@ -84,27 +84,26 @@
@Override
public void run()
{
-
if (debugEnabled())
{
TRACER.debugInfo("Replication Replay thread starting.");
}
- while (!shutdown)
+ while (!shutdown.get())
{
try
{
UpdateToReplay updateToreplay;
// Loop getting an updateToReplayQueue from the update message queue and
// replaying matching changes
- while ( (!shutdown) &&
+ while (!shutdown.get() &&
((updateToreplay = updateToReplayQueue.poll(1L,
TimeUnit.SECONDS)) != null))
{
// Find replication domain for that update message
LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
- domain.replay(updateMsg);
+ domain.replay(updateMsg, shutdown);
}
} catch (Exception e)
{
diff --git a/opends/src/server/org/opends/server/replication/service/ListenerThread.java b/opends/src/server/org/opends/server/replication/service/ListenerThread.java
index 5d94ac5..2dbae7f 100644
--- a/opends/src/server/org/opends/server/replication/service/ListenerThread.java
+++ b/opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -26,14 +26,15 @@
* Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.service;
-import org.opends.messages.Message;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.messages.Message;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -50,7 +51,7 @@
private static final DebugTracer TRACER = getTracer();
private final ReplicationDomain repDomain;
- private volatile boolean shutdown = false;
+ private AtomicBoolean shutdown = new AtomicBoolean(false);
private volatile boolean done = false;
@@ -72,7 +73,7 @@
*/
public void shutdown()
{
- shutdown = true;
+ shutdown.set(true);
}
/**
@@ -81,22 +82,21 @@
@Override
public void run()
{
- UpdateMsg updateMsg = null;
-
if (debugEnabled())
{
TRACER.debugInfo("Replication Listener thread starting.");
}
- while (!shutdown)
+ while (!shutdown.get())
{
+ UpdateMsg updateMsg = null;
try
{
- // Loop receiving update messages and puting them in the update message
+ // Loop receiving update messages and putting them in the update message
// queue
- while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
+ while (!shutdown.get() && ((updateMsg = repDomain.receive()) != null))
{
- if (repDomain.processUpdate(updateMsg))
+ if (repDomain.processUpdate(updateMsg, shutdown))
{
repDomain.processUpdateDoneSynchronous(updateMsg);
}
@@ -104,7 +104,7 @@
if (updateMsg == null)
{
- shutdown = true;
+ shutdown.set(true);
}
}
catch (Exception e)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 5bcc989..4505ed6 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -28,9 +28,8 @@
package org.opends.server.replication.service;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
import java.io.BufferedOutputStream;
@@ -41,6 +40,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.opends.messages.Category;
@@ -50,32 +50,8 @@
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachine;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AckMsg;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.DoneMsg;
-import org.opends.server.replication.protocol.EntryMsg;
-import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.HeartbeatMsg;
-import org.opends.server.replication.protocol.InitializeRcvAckMsg;
-import org.opends.server.replication.protocol.InitializeRequestMsg;
-import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.Session;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.common.*;
+import org.opends.server.replication.protocol.*;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.types.Attribute;
@@ -3184,41 +3160,40 @@
public abstract long countEntries() throws DirectoryException;
/**
- * This method should handle the processing of {@link UpdateMsg} receive
- * from remote replication entities.
+ * This method should handle the processing of {@link UpdateMsg} receive from
+ * remote replication entities.
* <p>
- * This method will be called by a single thread and should therefore
- * should not be blocking.
+ * This method will be called by a single thread and should therefore should
+ * not be blocking.
*
- * @param updateMsg The {@link UpdateMsg} that was received.
- *
- * @return A boolean indicating if the processing is completed at return
- * time.
- * If <code> true </code> is returned, no further
- * processing is necessary.
- *
- * If <code> false </code> is returned, the subclass should
- * call the method
- * {@link #processUpdateDone(UpdateMsg, String)}
- * and update the ServerState
- * When this processing is complete.
- *
+ * @param updateMsg
+ * The {@link UpdateMsg} that was received.
+ * @param shutdown
+ * whether the server initiated shutdown
+ * @return A boolean indicating if the processing is completed at return time.
+ * If <code> true </code> is returned, no further processing is
+ * necessary. If <code> false </code> is returned, the subclass should
+ * call the method {@link #processUpdateDone(UpdateMsg, String)} and
+ * update the ServerState When this processing is complete.
*/
- public abstract boolean processUpdate(UpdateMsg updateMsg);
+ public abstract boolean processUpdate(UpdateMsg updateMsg,
+ AtomicBoolean shutdown);
/**
* This method must be called after each call to
- * {@link #processUpdate(UpdateMsg)} when the processing of the update is
- * completed.
+ * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
+ * update is completed.
* <p>
* It is useful for implementation needing to process the update in an
- * asynchronous way or using several threads, but must be called even
- * by implementation doing it in a synchronous, single-threaded way.
+ * asynchronous way or using several threads, but must be called even by
+ * implementation doing it in a synchronous, single-threaded way.
*
- * @param msg The UpdateMsg whose processing was completed.
- * @param replayErrorMsg if not null, this means an error occurred during the
- * replay of this update, and this is the matching human readable message
- * describing the problem.
+ * @param msg
+ * The UpdateMsg whose processing was completed.
+ * @param replayErrorMsg
+ * if not null, this means an error occurred during the replay of
+ * this update, and this is the matching human readable message
+ * describing the problem.
*/
public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 6af1251..210e3f5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -23,24 +23,20 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.plugin;
-import org.opends.server.util.StaticUtils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -60,84 +56,80 @@
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationDomain;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeValue;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
-import org.opends.server.types.Modification;
-import org.opends.server.types.ModificationType;
-import org.opends.server.types.ObjectClass;
-import org.opends.server.types.ResultCode;
+import org.opends.server.types.*;
+import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
-import static org.opends.server.TestCaseUtils.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+import static org.testng.Assert.*;
/**
* Various tests around fractional replication
*/
+@SuppressWarnings("javadoc")
public class FractionalReplicationTest extends ReplicationTestCase {
- // The RS
+ /** The RS */
private ReplicationServer replicationServer = null;
- // RS port
+ /** RS port */
private int replServerPort = -1;
- // Represents the real domain to test (replays and filters)
+ /** Represents the real domain to test (replays and filters) */
private Entry fractionalDomainCfgEntry = null;
- // The domain used to send updates to the reald domain
+ /** The domain used to send updates to the reald domain */
private FakeReplicationDomain replicationDomain = null;
- // Ids of servers
+ /** Ids of servers */
private static final int DS1_ID = 1; // fractional domain
private static final int DS2_ID = 2; // fake domain
private static final int RS_ID = 91; // replication server
private final String testName = this.getClass().getSimpleName();
- // Fractional mode
+ /** Fractional mode */
private static final int EXCLUDE_FRAC_MODE = 0;
private static final int INCLUDE_FRAC_MODE = 1;
int initWindow = 100;
private ChangeNumberGenerator gen = null;
- // The tracer object for the debug logger
+ /** The tracer object for the debug logger */
private static final DebugTracer TRACER = getTracer();
- // Number of seconds before generating an error if some conditions not met
+ /** Number of seconds before generating an error if some conditions not met */
private static final int TIMEOUT = 10000;
- // Uuid of the manipulated entry
+ /** Uuid of the manipulated entry */
private static final String ENTRY_UUID =
"11111111-1111-1111-1111-111111111111";
private static final String ENTRY_UUID2 =
"22222222-2222-2222-2222-222222222222";
private static final String ENTRY_UUID3 =
"33333333-3333-3333-3333-333333333333";
- // Dn of the manipulated entry
+ /** Dn of the manipulated entry */
private static String ENTRY_DN = "uid=1," + TEST_ROOT_DN_STRING;
- // Optional attribute not part of concerned attributes of the fractional
- // configuration during tests. It should not be impacted by fractional
- // mechanism
+ /**
+ * Optional attribute not part of concerned attributes of the fractional
+ * configuration during tests. It should not be impacted by fractional
+ * mechanism
+ */
private static final String OPTIONAL_ATTR = "description";
- // Optional attribute used as synchronization attribute to know when the modify
- // operation has been processed (used as add new attribute in the modify operation)
- // It may or may not be part of the filtered attributes, depending on the fractional
- // test mode : exclusive or inclusive
+ /**
+ * Optional attribute used as synchronization attribute to know when the
+ * modify operation has been processed (used as add new attribute in the
+ * modify operation) It may or may not be part of the filtered attributes,
+ * depending on the fractional test mode : exclusive or inclusive
+ */
private static final String SYNCHRO_OPTIONAL_ATTR = "seeAlso";
- // Second test backend
+ /** Second test backend */
private static final String TEST2_ROOT_DN_STRING = "dc=example,dc=com";
private static final String TEST2_ORG_DN_STRING = "o=test2," + TEST2_ROOT_DN_STRING;
private static String ENTRY_DN2 = "uid=1," + TEST2_ORG_DN_STRING;
@@ -456,7 +448,6 @@
private void createFakeReplicationDomain(boolean firstBackend, long generationId)
{
try{
-
List<String> replicationServers = new ArrayList<String>();
replicationServers.add("localhost:" + replServerPort);
@@ -474,7 +465,7 @@
String rdPortStr = serverStr.substring(index + 1);
try
{
- rdPort = (new Integer(rdPortStr)).intValue();
+ rdPort = Integer.valueOf(rdPortStr);
} catch (Exception e)
{
fail("Enable to get an int from: " + rdPortStr);
@@ -706,11 +697,6 @@
this.exportedEntryCount = exportedEntryCount;
}
- public void initImport(StringBuilder importString)
- {
- this.importString = importString;
- }
-
@Override
public long countEntries() throws DirectoryException
{
@@ -730,7 +716,6 @@
throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
ERR_BACKEND_EXPORT_ENTRY.get("", ""));
}
-
}
@Override
@@ -761,17 +746,12 @@
}
@Override
- public boolean processUpdate(UpdateMsg updateMsg)
+ public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
{
if (queue != null)
queue.add(updateMsg);
return true;
}
-
- public void setGenerationID(long newGenerationID)
- {
- generationID = newGenerationID;
- }
}
private static final String REPLICATION_GENERATION_ID =
@@ -1320,7 +1300,7 @@
"domain status obtained after " + (toWait-nSec) + " second(s).");
return;
}
- sleep(1000);
+ TestCaseUtils.sleep(1000);
nSec--;
}
fail("Did not get expected replication domain status: expected <" + expectedStatus +
@@ -1567,9 +1547,8 @@
createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
/*
- * Perform add operation with fornbidden attribute in RDN
+ * Perform add operation with forbidden attribute in RDN
*/
-
String entryLdif = "dn: displayName=ValueToBeKept," +
TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
"objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1620,9 +1599,8 @@
*/
/*
- * Perform add operation with fornbidden attribute in RDN
+ * Perform add operation with forbidden attribute in RDN
*/
-
entryLdif = "dn: displayName=ValueToBeKept+description=ValueToBeKeptToo," +
TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
"objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1698,9 +1676,8 @@
createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
/*
- * Perform add operation with fornbidden attribute in RDN
+ * Perform add operation with forbidden attribute in RDN
*/
-
String entryLdif = "dn: displayName=ValueToBeKept," +
TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
"objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1754,9 +1731,8 @@
*/
/*
- * Perform add operation with fornbidden attribute in RDN
+ * Perform add operation with forbidden attribute in RDN
*/
-
entryLdif = "dn: displayName=ValueToBeKept+description=ValueToBeKeptToo," +
TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" +
"objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1834,9 +1810,8 @@
createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
/*
- * Perform add operation with fornbidden attribute in RDN
+ * Perform add operation with forbidden attribute in RDN
*/
-
String entryName = "displayName=ValueToBeKept+description=ValueToBeRemoved," + TEST_ROOT_DN_STRING ;
String entryLdif = "dn: " + entryName + "\n" + "objectClass: top\n" +
"objectClass: person\n" + "objectClass: organizationalPerson\n" +
@@ -1936,7 +1911,7 @@
@Test
public void testModifyDnWithForbiddenAttrInRDNInclude()
{
- String testcase = "testModifyDnWithForbiddenAttrInRDNInclude";
+ String testcase = "testModifyDnWithForbiddenAttrInRDNInclude";
initTest();
@@ -1953,9 +1928,8 @@
createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING));
/*
- * Perform add operation with fornbidden attribute in RDN
+ * Perform add operation with forbidden attribute in RDN
*/
-
String entryName = "displayName=ValueToBeKept+description=ValueToBeRemoved," + TEST_ROOT_DN_STRING ;
String entryLdif = "dn: " + entryName + "\n" + "objectClass: top\n" +
"objectClass: person\n" + "objectClass: organizationalPerson\n" +
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
index fde1cfd..bd59fbd 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
@@ -23,13 +23,16 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS
*/
package org.opends.server.replication.plugin;
-import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
import java.util.ArrayList;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
@@ -40,24 +43,20 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.protocol.AddMsg;
-import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.DeleteMsg;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.DN;
-import org.opends.server.types.RDN;
-import org.opends.server.types.Entry;
-import org.opends.server.types.ResultCode;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.types.*;
import org.testng.annotations.Test;
-
-import static org.testng.Assert.*;
-
-
/**
* Test the naming conflict resolution code.
*/
+@SuppressWarnings("javadoc")
public class NamingConflictTest extends ReplicationTestCase
{
+
+ private static final AtomicBoolean SHUTDOWN = new AtomicBoolean(false);
+
/**
* Test for issue 3402 : test, that a modrdn that is older than an other
* modrdn but that is applied later is ignored.
@@ -123,10 +122,10 @@
"uid=simultaneous2");
// Put the message in the replay queue
- domain.processUpdate(modDnMsg);
+ domain.processUpdate(modDnMsg, SHUTDOWN);
// Make the domain replay the change from the replay queue
- domain.replay(queue.take().getUpdateMessage());
+ domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
// This MODIFY DN uses an older DN and should therefore be cancelled
// at replay time.
@@ -137,11 +136,11 @@
"uid=simulatneouswrong");
// Put the message in the replay queue
- domain.processUpdate(modDnMsg);
+ domain.processUpdate(modDnMsg, SHUTDOWN);
// Make the domain replay the change from the replay queue
// and resolve conflict
- domain.replay(queue.take().getUpdateMessage());
+ domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
// Expect the conflict resolution
assertFalse(DirectoryServer.entryExists(entry.getDN()),
@@ -158,8 +157,6 @@
* a delete operation has removed one of the conflicting entries
* the other conflicting entry is correctly renamed to its
* original name.
- *
- * @throws Exception if the test fails.
*/
@Test(enabled=true)
public void conflictCleaningDelete() throws Exception
@@ -215,10 +212,10 @@
null);
// Put the message in the replay queue
- domain.processUpdate(addMsg);
+ domain.processUpdate(addMsg, SHUTDOWN);
// Make the domain replay the change from the replay queue
- domain.replay(queue.take().getUpdateMessage());
+ domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
// Now delete the first entry that was added at the beginning
TestCaseUtils.deleteEntry(entry.getDN());
@@ -226,7 +223,7 @@
// Expect the conflict resolution : the second entry should now
// have been renamed with the original DN.
Entry resultEntry = DirectoryServer.getEntry(entry.getDN());
- assertTrue(resultEntry != null, "The conflict was not cleared");
+ assertNotNull(resultEntry, "The conflict was not cleared");
assertEquals(getEntryUUID(resultEntry.getDN()),
"c9cb8c3c-615a-4122-865d-50323aaaed48",
"The wrong entry has been renamed");
@@ -300,10 +297,10 @@
null);
// Put the message in the replay queue
- domain.processUpdate(addMsg);
+ domain.processUpdate(addMsg, SHUTDOWN);
// Make the domain replay the change from the replay queue
- domain.replay(queue.take().getUpdateMessage());
+ domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
// Now delete the first entry that was added at the beginning
InternalClientConnection conn =
@@ -316,7 +313,7 @@
// Expect the conflict resolution : the second entry should now
// have been renamed with the original DN.
Entry resultEntry = DirectoryServer.getEntry(entry.getDN());
- assertTrue(resultEntry != null, "The conflict was not cleared");
+ assertNotNull(resultEntry, "The conflict was not cleared");
assertEquals(getEntryUUID(resultEntry.getDN()),
"c9cb8c3c-615a-4122-865d-50323aaaed48",
"The wrong entry has been renamed");
@@ -409,9 +406,9 @@
delMsg.setSubtreeDelete(true);
// Put the message in the replay queue
- domain.processUpdate(delMsg);
+ domain.processUpdate(delMsg, SHUTDOWN);
// Make the domain replay the change from the replay queue
- domain.replay(queue.take().getUpdateMessage());
+ domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
// Expect the subtree to be deleted and no conflict entry created
assertFalse(DirectoryServer.entryExists(parentEntry.getDN()),
@@ -488,9 +485,9 @@
// NOT SUBTREE
// Put the message in the replay queue
- domain.processUpdate(delMsg);
+ domain.processUpdate(delMsg, SHUTDOWN);
// Make the domain replay the change from the replay queue
- domain.replay(queue.take().getUpdateMessage());
+ domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
// Expect the parent entry to be deleted
assertTrue(!DirectoryServer.entryExists(parentEntry.getDN()),
@@ -502,7 +499,6 @@
"+cn=child,o=test");
assertTrue(DirectoryServer.entryExists(childDN),
"Child entry conflict exist with DN="+childDN);
-
}
finally
{
@@ -572,9 +568,9 @@
new ArrayList<Attribute>());
// Put the message in the replay queue
- domain.processUpdate(addMsg);
+ domain.processUpdate(addMsg, SHUTDOWN);
// Make the domain replay the change from the replay queue
- domain.replay(queue.take().getUpdateMessage());
+ domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
// Expect the parent entry to be deleted
assertFalse(DirectoryServer.entryExists(parentEntry.getDN()),
@@ -586,7 +582,6 @@
"+cn=child,o=test");
assertTrue(DirectoryServer.entryExists(childDN),
"Child entry conflict exist with DN="+childDN);
-
}
finally
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index 59ef954..60d967b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -23,36 +23,45 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS
*/
package org.opends.server.replication.service;
-import org.opends.server.types.ResultCode;
+import static org.opends.messages.ReplicationMessages.*;
+
import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import org.opends.server.config.ConfigException;
-import java.util.Collection;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.server.config.ConfigException;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.DirectoryException;
-import static org.opends.messages.ReplicationMessages.*;
+import org.opends.server.types.ResultCode;
/**
* This class is the minimum implementation of a Concrete ReplicationDomain
* used to test the Generic Replication Service.
*/
+@SuppressWarnings("javadoc")
public class FakeReplicationDomain extends ReplicationDomain
{
- // A blocking queue that is used to send the UpdateMsg received from
- // the Replication Service.
- BlockingQueue<UpdateMsg> queue = null;
+ /**
+ * A blocking queue that is used to send the UpdateMsg received from the
+ * Replication Service.
+ */
+ private BlockingQueue<UpdateMsg> queue = null;
- // A string that will be exported should exportBackend be called.
- String exportString = null;
+ /** A string that will be exported should exportBackend be called. */
+ private String exportString = null;
- // A StringBuilder that will be used to build a build a new String should the
- // import be called.
- StringBuilder importString = null;
+ /**
+ * A StringBuilder that will be used to build a build a new String should the
+ * import be called.
+ */
+ private StringBuilder importString = null;
private int exportedEntryCount;
@@ -142,7 +151,7 @@
}
@Override
- public boolean processUpdate(UpdateMsg updateMsg)
+ public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
{
if (queue != null)
queue.add(updateMsg);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
index fb56b9c..f41ac84 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -23,36 +23,36 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS
*/
package org.opends.server.replication.service;
-import org.opends.server.types.ResultCode;
+import static org.opends.messages.ReplicationMessages.*;
+
import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import org.opends.server.config.ConfigException;
-import java.util.Collection;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.opends.server.config.ConfigException;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.DirectoryException;
-import static org.opends.messages.ReplicationMessages.*;
+import org.opends.server.types.ResultCode;
/**
* This class is the minimum implementation of a Concrete ReplicationDomain
* used to test the Generic Replication Service.
*/
+@SuppressWarnings("javadoc")
public class FakeStressReplicationDomain extends ReplicationDomain
{
- // A blocking queue that is used to send the UpdateMsg received from
- // the Replication Service.
- BlockingQueue<UpdateMsg> queue = null;
-
- // A string that will be exported should exportBackend be called.
- String exportString = null;
-
- // A StringBuilder that will be used to build a build a new String should the
- // import be called.
- StringBuilder importString = null;
+ /**
+ * A blocking queue that is used to send the UpdateMsg received from the
+ * Replication Service.
+ */
+ private BlockingQueue<UpdateMsg> queue = null;
public FakeStressReplicationDomain(
String serviceID,
@@ -68,23 +68,8 @@
this.queue = queue;
}
- public FakeStressReplicationDomain(
- String serviceID,
- int serverID,
- Collection<String> replicationServers,
- int window,
- long heartbeatInterval,
- String exportString,
- StringBuilder importString) throws ConfigException
- {
- super(serviceID, serverID, 100);
- startPublishService(replicationServers, window, heartbeatInterval, 500);
- startListenService();
- this.exportString = exportString;
- this.importString = importString;
- }
+ private static final int IMPORT_SIZE = 100000000;
- final int IMPORT_SIZE = 100000000;
@Override
public long countEntries() throws DirectoryException
{
@@ -150,7 +135,7 @@
}
@Override
- public boolean processUpdate(UpdateMsg updateMsg)
+ public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
{
if (queue != null)
queue.add(updateMsg);
--
Gitblit v1.10.0