From 856fdd0571358c660afaf379f8e774ab8b24f05c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 24 Jun 2013 15:20:37 +0000
Subject: [PATCH] OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock
---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 85 +++++++++++++++---------------------------
1 files changed, 30 insertions(+), 55 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 5bcc989..4505ed6 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -28,9 +28,8 @@
package org.opends.server.replication.service;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
import java.io.BufferedOutputStream;
@@ -41,6 +40,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.opends.messages.Category;
@@ -50,32 +50,8 @@
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachine;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AckMsg;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.DoneMsg;
-import org.opends.server.replication.protocol.EntryMsg;
-import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.HeartbeatMsg;
-import org.opends.server.replication.protocol.InitializeRcvAckMsg;
-import org.opends.server.replication.protocol.InitializeRequestMsg;
-import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.Session;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.common.*;
+import org.opends.server.replication.protocol.*;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.types.Attribute;
@@ -3184,41 +3160,40 @@
public abstract long countEntries() throws DirectoryException;
/**
- * This method should handle the processing of {@link UpdateMsg} receive
- * from remote replication entities.
+ * This method should handle the processing of {@link UpdateMsg} receive from
+ * remote replication entities.
* <p>
- * This method will be called by a single thread and should therefore
- * should not be blocking.
+ * This method will be called by a single thread and should therefore should
+ * not be blocking.
*
- * @param updateMsg The {@link UpdateMsg} that was received.
- *
- * @return A boolean indicating if the processing is completed at return
- * time.
- * If <code> true </code> is returned, no further
- * processing is necessary.
- *
- * If <code> false </code> is returned, the subclass should
- * call the method
- * {@link #processUpdateDone(UpdateMsg, String)}
- * and update the ServerState
- * When this processing is complete.
- *
+ * @param updateMsg
+ * The {@link UpdateMsg} that was received.
+ * @param shutdown
+ * whether the server initiated shutdown
+ * @return A boolean indicating if the processing is completed at return time.
+ * If <code> true </code> is returned, no further processing is
+ * necessary. If <code> false </code> is returned, the subclass should
+ * call the method {@link #processUpdateDone(UpdateMsg, String)} and
+ * update the ServerState When this processing is complete.
*/
- public abstract boolean processUpdate(UpdateMsg updateMsg);
+ public abstract boolean processUpdate(UpdateMsg updateMsg,
+ AtomicBoolean shutdown);
/**
* This method must be called after each call to
- * {@link #processUpdate(UpdateMsg)} when the processing of the update is
- * completed.
+ * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
+ * update is completed.
* <p>
* It is useful for implementation needing to process the update in an
- * asynchronous way or using several threads, but must be called even
- * by implementation doing it in a synchronous, single-threaded way.
+ * asynchronous way or using several threads, but must be called even by
+ * implementation doing it in a synchronous, single-threaded way.
*
- * @param msg The UpdateMsg whose processing was completed.
- * @param replayErrorMsg if not null, this means an error occurred during the
- * replay of this update, and this is the matching human readable message
- * describing the problem.
+ * @param msg
+ * The UpdateMsg whose processing was completed.
+ * @param replayErrorMsg
+ * if not null, this means an error occurred during the replay of
+ * this update, and this is the matching human readable message
+ * describing the problem.
*/
public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
{
--
Gitblit v1.10.0