From 8c8415177610baee0fc1c615926f21da621f0836 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 24 Jun 2013 15:20:37 +0000
Subject: [PATCH] OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 194 ++++++++++++++++++++++++++---------------------
1 files changed, 107 insertions(+), 87 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 4a7af7e..c86bc38 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -25,7 +25,6 @@
* Copyright 2006-2010 Sun Microsystems, Inc.
* Portions Copyright 2011-2013 ForgeRock AS
*/
-
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
@@ -37,14 +36,12 @@
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
-import java.io.File;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.StringReader;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
import java.util.*;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
@@ -63,14 +60,7 @@
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
-import org.opends.server.core.AddOperation;
-import org.opends.server.core.DeleteOperation;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.LockFileManager;
-import org.opends.server.core.ModifyDNOperation;
-import org.opends.server.core.ModifyDNOperationBasis;
-import org.opends.server.core.ModifyOperation;
-import org.opends.server.core.ModifyOperationBasis;
+import org.opends.server.core.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -80,12 +70,7 @@
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
@@ -93,16 +78,7 @@
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.*;
-import org.opends.server.types.operation.PluginOperation;
-import org.opends.server.types.operation.PostOperationAddOperation;
-import org.opends.server.types.operation.PostOperationDeleteOperation;
-import org.opends.server.types.operation.PostOperationModifyDNOperation;
-import org.opends.server.types.operation.PostOperationModifyOperation;
-import org.opends.server.types.operation.PostOperationOperation;
-import org.opends.server.types.operation.PreOperationAddOperation;
-import org.opends.server.types.operation.PreOperationDeleteOperation;
-import org.opends.server.types.operation.PreOperationModifyDNOperation;
-import org.opends.server.types.operation.PreOperationModifyOperation;
+import org.opends.server.types.operation.*;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -191,8 +167,10 @@
*/
private static final DebugTracer TRACER = getTracer();
- // The update to replay message queue where the listener thread is going to
- // push incoming update messages.
+ /**
+ * The update to replay message queue where the listener thread is going to
+ * push incoming update messages.
+ */
private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
@@ -238,9 +216,11 @@
private volatile boolean disabled = false;
private volatile boolean stateSavingDisabled = false;
- // This list is used to temporary store operations that needs
- // to be replayed at session establishment time.
- private final TreeMap<ChangeNumber, FakeOperation> replayOperations =
+ /**
+ * This list is used to temporary store operations that needs to be replayed
+ * at session establishment time.
+ */
+ private final SortedMap<ChangeNumber, FakeOperation> replayOperations =
new TreeMap<ChangeNumber, FakeOperation>();
/**
@@ -288,7 +268,7 @@
* Fractional replication variables.
*/
- // Holds the fractional configuration for this domain, if any.
+ /** Holds the fractional configuration for this domain, if any. */
private FractionalConfig fractionalConfig = null;
/**
@@ -341,29 +321,39 @@
* fractionalFilterOperation(PreOperationModifyOperation
* modifyOperation, boolean performFiltering) method
*/
- // The operation contains attributes subject to fractional filtering according
- // to the fractional configuration
+ /**
+ * The operation contains attributes subject to fractional filtering according
+ * to the fractional configuration.
+ */
private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1;
- // The operation contains no attributes subject to fractional filtering
- // according to the fractional configuration
+ /**
+ * The operation contains no attributes subject to fractional filtering
+ * according to the fractional configuration.
+ */
private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2;
- // The operation should become a no-op
+ /** The operation should become a no-op. */
private static final int FRACTIONAL_BECOME_NO_OP = 3;
- // This configuration boolean indicates if this ReplicationDomain should log
- // ChangeNumbers.
+ /**
+ * This configuration boolean indicates if this ReplicationDomain should log
+ * ChangeNumbers.
+ */
private boolean logChangeNumber = false;
- // This configuration integer indicates the time the domain keeps the
- // historical information necessary to solve conflicts.
- // When a change stored in the historical part of the user entry has a date
- // (from its replication ChangeNumber) older than this delay, it is candidate
- // to be purged.
+ /**
+ * This configuration integer indicates the time the domain keeps the
+ * historical information necessary to solve conflicts.<br>
+ * When a change stored in the historical part of the user entry has a date
+ * (from its replication ChangeNumber) older than this delay, it is candidate
+ * to be purged.
+ */
private long histPurgeDelayInMilliSec = 0;
- // The last change number purged in this domain. Allows to have a continuous
- // purging process from one purge processing (task run) to the next one.
- // Values 0 when the server starts.
+ /**
+ * The last change number purged in this domain. Allows to have a continuous
+ * purging process from one purge processing (task run) to the next one.
+ * Values 0 when the server starts.
+ */
private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
/**
@@ -752,10 +742,8 @@
if (fractionalConfig.isFractional())
{
// Set new fractional configuration values
- if (newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
- fractionalConfig.setFractionalExclusive(true);
- else
- fractionalConfig.setFractionalExclusive(false);
+ fractionalConfig.setFractionalExclusive(
+ newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
fractionalConfig.setFractionalSpecificClassesAttributes(
newFractionalConfig.getFractionalSpecificClassesAttributes());
fractionalConfig.setFractionalAllClassesAttributes(
@@ -950,10 +938,8 @@
// Set stored fractional configuration values
if (storedFractionalConfig.isFractional())
{
- if (storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
- storedFractionalConfig.setFractionalExclusive(true);
- else
- storedFractionalConfig.setFractionalExclusive(false);
+ storedFractionalConfig.setFractionalExclusive(
+ storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
}
storedFractionalConfig.setFractionalSpecificClassesAttributes(
storedFractionalSpecificClassesAttributes);
@@ -1434,12 +1420,10 @@
AttributeValue rdnAttributeValue =
entryRdn.getAttributeValue(attributeType);
List<Attribute> attrList = attributesMap.get(attributeType);
- Iterator<Attribute> attrIt = attrList.iterator();
AttributeValue sameAttrValue = null;
// Locate the attribute value identical to the one in the RDN
- while(attrIt.hasNext())
+ for (Attribute attr : attrList)
{
- Attribute attr = attrIt.next();
if (attr.contains(rdnAttributeValue))
{
for (AttributeValue attrValue : attr) {
@@ -2577,9 +2561,12 @@
/**
* Create and replay a synchronized Operation from an UpdateMsg.
*
- * @param msg The UpdateMsg to be replayed.
+ * @param msg
+ * The UpdateMsg to be replayed.
+ * @param shutdown
+ * whether the server initiated shutdown
*/
- public void replay(LDAPUpdateMsg msg)
+ public void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown)
{
Operation op = null;
boolean replayDone = false;
@@ -2599,6 +2586,11 @@
while ((!dependency) && (!replayDone) && (retryCount-- > 0))
{
+ if (shutdown.get())
+ {
+ // shutdown initiated, let's leave
+ return;
+ }
// Try replay the operation
op.setInternalOperation(true);
op.setSynchronizationOperation(true);
@@ -2622,6 +2614,25 @@
// renamed by a more recent modify DN.
replayDone = true;
}
+ else if (result == ResultCode.BUSY)
+ {
+ /*
+ * We probably could not get a lock (OPENDJ-885). Give the server
+ * another chance to process this operation immediately.
+ */
+ Thread.yield();
+ continue;
+ }
+ else if (result == ResultCode.UNAVAILABLE)
+ {
+ /*
+ * It can happen when a rebuild is performed or the backend is
+ * offline (OPENDJ-49). Give the server another chance to process
+ * this operation after some time.
+ */
+ Thread.sleep(50);
+ continue;
+ }
else if (op instanceof ModifyOperation)
{
ModifyOperation newOp = (ModifyOperation) op;
@@ -2693,22 +2704,13 @@
}
} catch (ASN1Exception e)
{
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- replayErrorMsg = message.toString();
+ replayErrorMsg = logDecodingOperationError(msg, e);
} catch (LDAPException e)
{
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- replayErrorMsg = message.toString();
+ replayErrorMsg = logDecodingOperationError(msg, e);
} catch (DataFormatException e)
{
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- replayErrorMsg = message.toString();
+ replayErrorMsg = logDecodingOperationError(msg, e);
} catch (Exception e)
{
if (changeNumber != null)
@@ -2726,10 +2728,7 @@
updateError(changeNumber);
} else
{
- Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
- String.valueOf(msg) + stackTraceToSingleLineString(e));
- logError(message);
- replayErrorMsg = message.toString();
+ replayErrorMsg = logDecodingOperationError(msg, e);
}
} finally
{
@@ -2753,6 +2752,14 @@
} while (msg != null);
}
+ private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e)
+ {
+ Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+ String.valueOf(msg) + stackTraceToSingleLineString(e));
+ logError(message);
+ return message.toString();
+ }
+
/**
* This method is called when an error happens while replaying
* an operation.
@@ -4862,7 +4869,7 @@
* {@inheritDoc}
*/
@Override
- public boolean processUpdate(UpdateMsg updateMsg)
+ public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
{
// Ignore message if fractional configuration is inconsistent and
// we have been passed into bad data set status
@@ -4880,8 +4887,23 @@
// Put update message into the replay queue
// (block until some place in the queue is available)
- UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
- updateToReplayQueue.offer(updateToReplay);
+ final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
+ while (!shutdown.get())
+ {
+ // loop until we can offer to the queue or shutdown was initiated
+ try
+ {
+ if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS))
+ {
+ // successful offer to the queue, let's exit the loop
+ break;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Thread interrupted: check for shutdown.
+ }
+ }
return false;
}
@@ -5321,10 +5343,8 @@
case EXCLUSIVE_FRACTIONAL:
case INCLUSIVE_FRACTIONAL:
result.setFractional(true);
- if (newFractionalMode == EXCLUSIVE_FRACTIONAL)
- result.setFractionalExclusive(true);
- else
- result.setFractionalExclusive(false);
+ result.setFractionalExclusive(
+ newFractionalMode == EXCLUSIVE_FRACTIONAL);
break;
}
result.setFractionalSpecificClassesAttributes(
--
Gitblit v1.10.0