From 8c8415177610baee0fc1c615926f21da621f0836 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 24 Jun 2013 15:20:37 +0000
Subject: [PATCH] OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock 

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java |   87 ++++++++++++++++++-------------------------
 1 files changed, 36 insertions(+), 51 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index c92c682..aab9397 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -27,57 +27,29 @@
  */
 package org.opends.server.replication.plugin;
 
-import java.util.ArrayList;
-import static org.opends.server.replication.plugin.
-ReplicationRepairRequestControl.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
+import static org.opends.server.util.StaticUtils.*;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 import org.opends.messages.Message;
 import org.opends.server.admin.server.ConfigurationAddListener;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.server.ConfigurationDeleteListener;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
 import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
-import org.opends.server.api.Backend;
-import org.opends.server.api.BackupTaskListener;
-import org.opends.server.api.ExportTaskListener;
-import org.opends.server.api.ImportTaskListener;
-import org.opends.server.api.RestoreTaskListener;
-import org.opends.server.api.SynchronizationProvider;
+import org.opends.server.api.*;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
-import org.opends.server.types.BackupConfig;
-import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.Control;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
-import org.opends.server.types.LDIFExportConfig;
-import org.opends.server.types.LDIFImportConfig;
-import org.opends.server.types.Modification;
-import org.opends.server.types.Operation;
-import org.opends.server.types.RestoreConfig;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SynchronizationProviderResult;
-import org.opends.server.types.operation.PluginOperation;
-import org.opends.server.types.operation.PostOperationAddOperation;
-import org.opends.server.types.operation.PostOperationDeleteOperation;
-import org.opends.server.types.operation.PostOperationModifyDNOperation;
-import org.opends.server.types.operation.PostOperationModifyOperation;
-import org.opends.server.types.operation.PostOperationOperation;
-import org.opends.server.types.operation.PreOperationAddOperation;
-import org.opends.server.types.operation.PreOperationDeleteOperation;
-import org.opends.server.types.operation.PreOperationModifyDNOperation;
-import org.opends.server.types.operation.PreOperationModifyOperation;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
+import org.opends.server.types.*;
+import org.opends.server.types.operation.*;
 
 /**
  * This class is used to load the Replication code inside the JVM
@@ -104,8 +76,8 @@
    * The queue of received update messages, to be treated by the ReplayThread
    * threads.
    */
-  private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
-    new LinkedBlockingQueue<UpdateToReplay>();
+  private static final BlockingQueue<UpdateToReplay> updateToReplayQueue =
+      new LinkedBlockingQueue<UpdateToReplay>(10000);
 
   /**
    * The list of ReplayThread threads.
@@ -228,9 +200,9 @@
   }
 
   /**
-   * Creates a new domain from its configEntry, do the
-   * necessary initialization and starts it so that it is
-   * fully operational when this method returns.
+   * Creates a new domain from its configEntry, do the necessary initialization
+   * and starts it so that it is fully operational when this method returns. It
+   * is only used for tests so far.
    *
    * @param configuration The entry with the configuration of this domain.
    * @param queue         The BlockingQueue that this domain will use.
@@ -239,13 +211,13 @@
    *
    * @throws ConfigException When the configuration is not valid.
    */
-  public static LDAPReplicationDomain createNewDomain(
+  static LDAPReplicationDomain createNewDomain(
       ReplicationDomainCfg configuration,
       BlockingQueue<UpdateToReplay> queue)
       throws ConfigException
   {
-    LDAPReplicationDomain domain;
-    domain = new LDAPReplicationDomain(configuration, queue);
+    LDAPReplicationDomain domain =
+        new LDAPReplicationDomain(configuration, queue);
 
     domains.put(domain.getBaseDN(), domain);
     return domain;
@@ -362,6 +334,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public boolean isConfigurationAddAcceptable(
       ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
   {
@@ -372,6 +345,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public ConfigChangeResult applyConfigurationAdd(
      ReplicationDomainCfg configuration)
   {
@@ -652,6 +626,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processBackupBegin(Backend backend, BackupConfig config)
   {
     for (DN dn : backend.getBaseDNs())
@@ -665,6 +640,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processBackupEnd(Backend backend, BackupConfig config,
                                boolean successful)
   {
@@ -679,6 +655,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processRestoreBegin(Backend backend, RestoreConfig config)
   {
     for (DN dn : backend.getBaseDNs())
@@ -692,6 +669,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processRestoreEnd(Backend backend, RestoreConfig config,
                                 boolean successful)
   {
@@ -706,6 +684,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processImportBegin(Backend backend, LDIFImportConfig config)
   {
     for (DN dn : backend.getBaseDNs())
@@ -719,6 +698,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processImportEnd(Backend backend, LDIFImportConfig config,
                                boolean successful)
   {
@@ -733,6 +713,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processExportBegin(Backend backend, LDIFExportConfig config)
   {
     for (DN dn : backend.getBaseDNs())
@@ -746,6 +727,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void processExportEnd(Backend backend, LDIFExportConfig config,
                                boolean successful)
   {
@@ -760,6 +742,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public ConfigChangeResult applyConfigurationDelete(
       ReplicationDomainCfg configuration)
   {
@@ -771,6 +754,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public boolean isConfigurationDeleteAcceptable(
       ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
   {
@@ -804,10 +788,10 @@
   /**
    * {@inheritDoc}
    */
-  public boolean
-    isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
-    configuration,
-    List<Message> unacceptableReasons)
+  @Override
+  public boolean isConfigurationChangeAcceptable(
+      ReplicationSynchronizationProviderCfg configuration,
+      List<Message> unacceptableReasons)
   {
     return true;
   }
@@ -815,9 +799,9 @@
   /**
    * {@inheritDoc}
    */
-  public ConfigChangeResult
-    applyConfigurationChange
-    (ReplicationSynchronizationProviderCfg configuration)
+  @Override
+  public ConfigChangeResult applyConfigurationChange(
+      ReplicationSynchronizationProviderCfg configuration)
   {
     int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
 
@@ -838,6 +822,7 @@
   /**
    * {@inheritDoc}
    */
+  @Override
   public void completeSynchronizationProvider()
   {
     isRegistered = true;

--
Gitblit v1.10.0