From 8c8415177610baee0fc1c615926f21da621f0836 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 24 Jun 2013 15:20:37 +0000
Subject: [PATCH] OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock 

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java |  194 ++++++++++++++++++++++++++---------------------
 1 files changed, 107 insertions(+), 87 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 4a7af7e..c86bc38 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -25,7 +25,6 @@
  *      Copyright 2006-2010 Sun Microsystems, Inc.
  *      Portions Copyright 2011-2013 ForgeRock AS
  */
-
 package org.opends.server.replication.plugin;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -37,14 +36,12 @@
 import static org.opends.server.util.ServerConstants.*;
 import static org.opends.server.util.StaticUtils.*;
 
-import java.io.File;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.StringReader;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.DataFormatException;
 
@@ -63,14 +60,7 @@
 import org.opends.server.backends.jeb.BackendImpl;
 import org.opends.server.backends.task.Task;
 import org.opends.server.config.ConfigException;
-import org.opends.server.core.AddOperation;
-import org.opends.server.core.DeleteOperation;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.LockFileManager;
-import org.opends.server.core.ModifyDNOperation;
-import org.opends.server.core.ModifyDNOperationBasis;
-import org.opends.server.core.ModifyOperation;
-import org.opends.server.core.ModifyOperationBasis;
+import org.opends.server.core.*;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.protocols.asn1.ASN1Exception;
 import org.opends.server.protocols.internal.InternalClientConnection;
@@ -80,12 +70,7 @@
 import org.opends.server.protocols.ldap.LDAPControl;
 import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.protocols.ldap.LDAPModification;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.common.*;
 import org.opends.server.replication.protocol.*;
 import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.service.ReplicationDomain;
@@ -93,16 +78,7 @@
 import org.opends.server.tasks.PurgeConflictsHistoricalTask;
 import org.opends.server.tasks.TaskUtils;
 import org.opends.server.types.*;
-import org.opends.server.types.operation.PluginOperation;
-import org.opends.server.types.operation.PostOperationAddOperation;
-import org.opends.server.types.operation.PostOperationDeleteOperation;
-import org.opends.server.types.operation.PostOperationModifyDNOperation;
-import org.opends.server.types.operation.PostOperationModifyOperation;
-import org.opends.server.types.operation.PostOperationOperation;
-import org.opends.server.types.operation.PreOperationAddOperation;
-import org.opends.server.types.operation.PreOperationDeleteOperation;
-import org.opends.server.types.operation.PreOperationModifyDNOperation;
-import org.opends.server.types.operation.PreOperationModifyOperation;
+import org.opends.server.types.operation.*;
 import org.opends.server.util.LDIFReader;
 import org.opends.server.util.TimeThread;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -191,8 +167,10 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  // The update to replay message queue where the listener thread is going to
-  // push incoming update messages.
+  /**
+   * The update to replay message queue where the listener thread is going to
+   * push incoming update messages.
+   */
   private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
   private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
   private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
@@ -238,9 +216,11 @@
   private volatile boolean disabled = false;
   private volatile boolean stateSavingDisabled = false;
 
-  // This list is used to temporary store operations that needs
-  // to be replayed at session establishment time.
-  private final TreeMap<ChangeNumber, FakeOperation> replayOperations  =
+  /**
+   * This list is used to temporary store operations that needs to be replayed
+   * at session establishment time.
+   */
+  private final SortedMap<ChangeNumber, FakeOperation> replayOperations =
     new TreeMap<ChangeNumber, FakeOperation>();
 
   /**
@@ -288,7 +268,7 @@
    * Fractional replication variables.
    */
 
-  // Holds the fractional configuration for this domain, if any.
+  /** Holds the fractional configuration for this domain, if any. */
   private FractionalConfig fractionalConfig = null;
 
   /**
@@ -341,29 +321,39 @@
    * fractionalFilterOperation(PreOperationModifyOperation
    *  modifyOperation, boolean performFiltering) method
    */
-  // The operation contains attributes subject to fractional filtering according
-  // to the fractional configuration
+  /**
+   * The operation contains attributes subject to fractional filtering according
+   * to the fractional configuration.
+   */
   private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1;
-  // The operation contains no attributes subject to fractional filtering
-  // according to the fractional configuration
+  /**
+   * The operation contains no attributes subject to fractional filtering
+   * according to the fractional configuration.
+   */
   private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2;
-  // The operation should become a no-op
+  /** The operation should become a no-op. */
   private static final int FRACTIONAL_BECOME_NO_OP = 3;
 
-  // This configuration boolean indicates if this ReplicationDomain should log
-  // ChangeNumbers.
+  /**
+   * This configuration boolean indicates if this ReplicationDomain should log
+   * ChangeNumbers.
+   */
   private boolean logChangeNumber = false;
 
-  // This configuration integer indicates the time the domain keeps the
-  // historical information necessary to solve conflicts.
-  // When a change stored in the historical part of the user entry has a date
-  // (from its replication ChangeNumber) older than this delay, it is candidate
-  // to be purged.
+  /**
+   * This configuration integer indicates the time the domain keeps the
+   * historical information necessary to solve conflicts.<br>
+   * When a change stored in the historical part of the user entry has a date
+   * (from its replication ChangeNumber) older than this delay, it is candidate
+   * to be purged.
+   */
   private long histPurgeDelayInMilliSec = 0;
 
-  // The last change number purged in this domain. Allows to have a continuous
-  // purging process from one purge processing (task run) to the next one.
-  // Values 0 when the server starts.
+  /**
+   * The last change number purged in this domain. Allows to have a continuous
+   * purging process from one purge processing (task run) to the next one.
+   * Values 0 when the server starts.
+   */
   private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
 
   /**
@@ -752,10 +742,8 @@
     if (fractionalConfig.isFractional())
     {
       // Set new fractional configuration values
-      if (newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
-        fractionalConfig.setFractionalExclusive(true);
-      else
-        fractionalConfig.setFractionalExclusive(false);
+      fractionalConfig.setFractionalExclusive(
+          newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
       fractionalConfig.setFractionalSpecificClassesAttributes(
         newFractionalConfig.getFractionalSpecificClassesAttributes());
       fractionalConfig.setFractionalAllClassesAttributes(
@@ -950,10 +938,8 @@
     // Set stored fractional configuration values
     if (storedFractionalConfig.isFractional())
     {
-      if (storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
-        storedFractionalConfig.setFractionalExclusive(true);
-      else
-        storedFractionalConfig.setFractionalExclusive(false);
+      storedFractionalConfig.setFractionalExclusive(
+          storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
     }
     storedFractionalConfig.setFractionalSpecificClassesAttributes(
       storedFractionalSpecificClassesAttributes);
@@ -1434,12 +1420,10 @@
             AttributeValue rdnAttributeValue =
               entryRdn.getAttributeValue(attributeType);
             List<Attribute> attrList = attributesMap.get(attributeType);
-            Iterator<Attribute> attrIt = attrList.iterator();
             AttributeValue sameAttrValue = null;
             //    Locate the attribute value identical to the one in the RDN
-            while(attrIt.hasNext())
+            for (Attribute attr : attrList)
             {
-              Attribute attr = attrIt.next();
               if (attr.contains(rdnAttributeValue))
               {
                 for (AttributeValue attrValue : attr) {
@@ -2577,9 +2561,12 @@
   /**
    * Create and replay a synchronized Operation from an UpdateMsg.
    *
-   * @param msg The UpdateMsg to be replayed.
+   * @param msg
+   *          The UpdateMsg to be replayed.
+   * @param shutdown
+   *          whether the server initiated shutdown
    */
-  public void replay(LDAPUpdateMsg msg)
+  public void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown)
   {
     Operation op = null;
     boolean replayDone = false;
@@ -2599,6 +2586,11 @@
 
         while ((!dependency) && (!replayDone) && (retryCount-- > 0))
         {
+          if (shutdown.get())
+          {
+            // shutdown initiated, let's leave
+            return;
+          }
           // Try replay the operation
           op.setInternalOperation(true);
           op.setSynchronizationOperation(true);
@@ -2622,6 +2614,25 @@
               // renamed by a more recent modify DN.
               replayDone = true;
             }
+            else if (result == ResultCode.BUSY)
+            {
+              /*
+               * We probably could not get a lock (OPENDJ-885). Give the server
+               * another chance to process this operation immediately.
+               */
+              Thread.yield();
+              continue;
+            }
+            else if (result == ResultCode.UNAVAILABLE)
+            {
+              /*
+               * It can happen when a rebuild is performed or the backend is
+               * offline (OPENDJ-49). Give the server another chance to process
+               * this operation after some time.
+               */
+              Thread.sleep(50);
+              continue;
+            }
             else if (op instanceof ModifyOperation)
             {
               ModifyOperation newOp = (ModifyOperation) op;
@@ -2693,22 +2704,13 @@
         }
       } catch (ASN1Exception e)
       {
-        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-          String.valueOf(msg) + stackTraceToSingleLineString(e));
-        logError(message);
-        replayErrorMsg = message.toString();
+        replayErrorMsg = logDecodingOperationError(msg, e);
       } catch (LDAPException e)
       {
-        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-          String.valueOf(msg) + stackTraceToSingleLineString(e));
-        logError(message);
-        replayErrorMsg = message.toString();
+        replayErrorMsg = logDecodingOperationError(msg, e);
       } catch (DataFormatException e)
       {
-        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-          String.valueOf(msg) + stackTraceToSingleLineString(e));
-        logError(message);
-        replayErrorMsg = message.toString();
+        replayErrorMsg = logDecodingOperationError(msg, e);
       } catch (Exception e)
       {
         if (changeNumber != null)
@@ -2726,10 +2728,7 @@
           updateError(changeNumber);
         } else
         {
-          Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
-            String.valueOf(msg) + stackTraceToSingleLineString(e));
-          logError(message);
-          replayErrorMsg = message.toString();
+          replayErrorMsg = logDecodingOperationError(msg, e);
         }
       } finally
       {
@@ -2753,6 +2752,14 @@
     } while (msg != null);
   }
 
+  private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e)
+  {
+    Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
+      String.valueOf(msg) + stackTraceToSingleLineString(e));
+    logError(message);
+    return message.toString();
+  }
+
   /**
    * This method is called when an error happens while replaying
    * an operation.
@@ -4862,7 +4869,7 @@
    * {@inheritDoc}
    */
   @Override
-  public boolean processUpdate(UpdateMsg updateMsg)
+  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
   {
     // Ignore message if fractional configuration is inconsistent and
     // we have been passed into bad data set status
@@ -4880,8 +4887,23 @@
 
       // Put update message into the replay queue
       // (block until some place in the queue is available)
-      UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
-      updateToReplayQueue.offer(updateToReplay);
+      final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
+      while (!shutdown.get())
+      {
+        // loop until we can offer to the queue or shutdown was initiated
+        try
+        {
+          if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS))
+          {
+            // successful offer to the queue, let's exit the loop
+            break;
+          }
+        }
+        catch (InterruptedException e)
+        {
+          // Thread interrupted: check for shutdown.
+        }
+      }
 
       return false;
     }
@@ -5321,10 +5343,8 @@
         case EXCLUSIVE_FRACTIONAL:
         case INCLUSIVE_FRACTIONAL:
           result.setFractional(true);
-          if (newFractionalMode == EXCLUSIVE_FRACTIONAL)
-            result.setFractionalExclusive(true);
-          else
-            result.setFractionalExclusive(false);
+          result.setFractionalExclusive(
+              newFractionalMode == EXCLUSIVE_FRACTIONAL);
           break;
       }
       result.setFractionalSpecificClassesAttributes(

--
Gitblit v1.10.0