From 856fdd0571358c660afaf379f8e774ab8b24f05c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 24 Jun 2013 15:20:37 +0000
Subject: [PATCH] OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock 

---
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java |   85 +++++++++++++++---------------------------
 1 files changed, 30 insertions(+), 55 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 5bcc989..4505ed6 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -28,9 +28,8 @@
 package org.opends.server.replication.service;
 
 import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.replication.common.StatusMachine.*;
 
 import java.io.BufferedOutputStream;
@@ -41,6 +40,7 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.opends.messages.Category;
@@ -50,32 +50,8 @@
 import org.opends.server.backends.task.Task;
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachine;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AckMsg;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.DoneMsg;
-import org.opends.server.replication.protocol.EntryMsg;
-import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.HeartbeatMsg;
-import org.opends.server.replication.protocol.InitializeRcvAckMsg;
-import org.opends.server.replication.protocol.InitializeRequestMsg;
-import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.Session;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.common.*;
+import org.opends.server.replication.protocol.*;
 import org.opends.server.tasks.InitializeTargetTask;
 import org.opends.server.tasks.InitializeTask;
 import org.opends.server.types.Attribute;
@@ -3184,41 +3160,40 @@
   public abstract long countEntries() throws DirectoryException;
 
   /**
-   * This method should handle the processing of {@link UpdateMsg} receive
-   * from remote replication entities.
+   * This method should handle the processing of {@link UpdateMsg} receive from
+   * remote replication entities.
    * <p>
-   * This method will be called by a single thread and should therefore
-   * should not be blocking.
+   * This method will be called by a single thread and should therefore should
+   * not be blocking.
    *
-   * @param updateMsg The {@link UpdateMsg} that was received.
-   *
-   * @return A boolean indicating if the processing is completed at return
-   *                   time.
-   *                   If <code> true </code> is returned, no further
-   *                   processing is necessary.
-   *
-   *                   If <code> false </code> is returned, the subclass should
-   *                   call the method
-   *                   {@link #processUpdateDone(UpdateMsg, String)}
-   *                   and update the ServerState
-   *                   When this processing is complete.
-   *
+   * @param updateMsg
+   *          The {@link UpdateMsg} that was received.
+   * @param shutdown
+   *          whether the server initiated shutdown
+   * @return A boolean indicating if the processing is completed at return time.
+   *         If <code> true </code> is returned, no further processing is
+   *         necessary. If <code> false </code> is returned, the subclass should
+   *         call the method {@link #processUpdateDone(UpdateMsg, String)} and
+   *         update the ServerState When this processing is complete.
    */
-  public abstract boolean processUpdate(UpdateMsg updateMsg);
+  public abstract boolean processUpdate(UpdateMsg updateMsg,
+      AtomicBoolean shutdown);
 
   /**
    * This method must be called after each call to
-   * {@link #processUpdate(UpdateMsg)} when the processing of the update is
-   * completed.
+   * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the
+   * update is completed.
    * <p>
    * It is useful for implementation needing to process the update in an
-   * asynchronous way or using several threads, but must be called even
-   * by implementation doing it in a synchronous, single-threaded way.
+   * asynchronous way or using several threads, but must be called even by
+   * implementation doing it in a synchronous, single-threaded way.
    *
-   * @param  msg The UpdateMsg whose processing was completed.
-   * @param replayErrorMsg if not null, this means an error occurred during the
-   * replay of this update, and this is the matching human readable message
-   * describing the problem.
+   * @param msg
+   *          The UpdateMsg whose processing was completed.
+   * @param replayErrorMsg
+   *          if not null, this means an error occurred during the replay of
+   *          this update, and this is the matching human readable message
+   *          describing the problem.
    */
   public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
   {

--
Gitblit v1.10.0