From aea0892feca2fd3d56c9c810debed6d22389454e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 11 Jul 2014 09:54:43 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3938) Replica offline messages should be synced with updates

---
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java |  125 ++++++++++++++++++++++++-----------------
 1 files changed, 73 insertions(+), 52 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index 0eec310..d3eb610 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -43,14 +43,14 @@
 import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
 import org.opends.server.api.*;
 import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.service.DSRSShutdownSync;
 import org.opends.server.types.*;
 import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.opends.server.types.operation.*;
 
 import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.replication.plugin.
-ReplicationRepairRequestControl.*;
+import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
 import static org.opends.server.util.StaticUtils.*;
 
 /**
@@ -73,9 +73,10 @@
 
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
-  private ReplicationServerListener replicationServerListener = null;
+  private ReplicationServerListener replicationServerListener;
   private static final Map<DN, LDAPReplicationDomain> domains =
-    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
+      new ConcurrentHashMap<DN, LDAPReplicationDomain>(4);
+  private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
 
   /**
    * The queue of received update messages, to be treated by the ReplayThread
@@ -119,8 +120,7 @@
    *                   Can be null is the request has no associated operation.
    * @return           The domain for this DN.
    */
-  public static LDAPReplicationDomain findDomain(
-      DN dn, PluginOperation pluginOp)
+  public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
   {
     /*
      * Don't run the special replication code on Operation that are
@@ -130,7 +130,9 @@
     {
         final Operation op = (Operation) pluginOp;
         if (op.dontSynchronize())
+        {
           return null;
+        }
 
         /*
          * Check if the provided operation is a repair operation and set the
@@ -187,8 +189,8 @@
   {
     try
     {
-      LDAPReplicationDomain domain =
-          new LDAPReplicationDomain(configuration, updateToReplayQueue);
+      final LDAPReplicationDomain domain = new LDAPReplicationDomain(
+          configuration, updateToReplayQueue, dsrsShutdownSync);
       if (domains.size() == 0)
       {
         // Create the threads that will process incoming update messages
@@ -223,9 +225,8 @@
       BlockingQueue<UpdateToReplay> queue)
       throws ConfigException
   {
-    LDAPReplicationDomain domain =
-        new LDAPReplicationDomain(configuration, queue);
-
+    final LDAPReplicationDomain domain =
+        new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
     domains.put(domain.getBaseDN(), domain);
     return domain;
   }
@@ -251,35 +252,30 @@
   /** {@inheritDoc} */
   @Override
   public void initializeSynchronizationProvider(
-      ReplicationSynchronizationProviderCfg configuration)
-  throws org.forgerock.opendj.config.server.ConfigException
+      ReplicationSynchronizationProviderCfg cfg) throws ConfigException
   {
     domains.clear();
-    replicationServerListener = new ReplicationServerListener(configuration);
+    replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
 
     // Register as an add and delete listener with the root configuration so we
     // can be notified if Multimaster domain entries are added or removed.
-    configuration.addReplicationDomainAddListener(this);
-    configuration.addReplicationDomainDeleteListener(this);
+    cfg.addReplicationDomainAddListener(this);
+    cfg.addReplicationDomainDeleteListener(this);
 
     // Register as a root configuration listener so that we can be notified if
     // number of replay threads is changed and apply changes.
-    configuration.addReplicationChangeListener(this);
+    cfg.addReplicationChangeListener(this);
 
-    replayThreadNumber = configuration.getNumUpdateReplayThreads();
-    connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
-        Integer.MAX_VALUE);
+    replayThreadNumber = cfg.getNumUpdateReplayThreads();
+    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
 
     //  Create the list of domains that are already defined.
-    for (String name : configuration.listReplicationDomains())
+    for (String name : cfg.listReplicationDomains())
     {
-      createNewDomain(configuration.getReplicationDomain(name));
+      createNewDomain(cfg.getReplicationDomain(name));
     }
 
-    /*
-     * If any schema changes were made with the server offline, then handle them
-     * now.
-     */
+    // If any schema changes were made with the server offline, then handle them now.
     List<Modification> offlineSchemaChanges =
          DirectoryServer.getOfflineSchemaChanges();
     if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
@@ -407,12 +403,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationModifyOperation modifyOperation)
   {
-    LDAPReplicationDomain domain =
-      findDomain(modifyOperation.getEntryDN(), modifyOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(modifyOperation);
+    LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(modifyOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -420,12 +416,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationAddOperation addOperation) throws DirectoryException
   {
-    LDAPReplicationDomain domain =
-      findDomain(addOperation.getEntryDN(), addOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(addOperation);
+    LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(addOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -433,12 +429,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationDeleteOperation deleteOperation) throws DirectoryException
   {
-    LDAPReplicationDomain domain =
-      findDomain(deleteOperation.getEntryDN(), deleteOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(deleteOperation);
+    LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(deleteOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -446,12 +442,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
   {
-    LDAPReplicationDomain domain =
-      findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(modifyDNOperation);
+    LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(modifyDNOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -510,7 +506,9 @@
     LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
 
     if (domain == null || !domain.solveConflict())
+    {
       return new SynchronizationProviderResult.ContinueProcessing();
+    }
 
     // The historical object is retrieved from the attachment created
     // in the HandleConflictResolution phase.
@@ -542,11 +540,15 @@
     LDAPReplicationDomain domain =
       findDomain(addOperation.getEntryDN(), addOperation);
     if (domain == null)
+    {
       return new SynchronizationProviderResult.ContinueProcessing();
+    }
 
     // For LOCAL op only, generate CSN and attach Context
     if (!addOperation.isSynchronizationOperation())
+    {
       domain.doPreOperation(addOperation);
+    }
 
     // Add to the operation the historical attribute : "dn:changeNumber:add"
     EntryHistorical.setHistoricalAttrToOperation(addOperation);
@@ -569,7 +571,9 @@
     stopReplayThreads();
 
     if (replicationServerListener != null)
+    {
       replicationServerListener.shutdown();
+    }
 
     DirectoryServer.deregisterBackupTaskListener(this);
     DirectoryServer.deregisterRestoreTaskListener(this);
@@ -590,10 +594,11 @@
   @Override
   public void processSchemaChange(List<Modification> modifications)
   {
-    LDAPReplicationDomain domain =
-      findDomain(DirectoryServer.getSchemaDN(), null);
+    LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
     if (domain != null)
+    {
       domain.synchronizeModifications(modifications);
+    }
   }
 
   /** {@inheritDoc} */
@@ -604,7 +609,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupStart();
+      }
     }
   }
 
@@ -617,7 +624,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupEnd();
+      }
     }
   }
 
@@ -629,7 +638,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.disable();
+      }
     }
   }
 
@@ -642,7 +653,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.enable();
+      }
     }
   }
 
@@ -654,7 +667,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.disable();
+      }
     }
   }
 
@@ -667,7 +682,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.enable();
+      }
     }
   }
 
@@ -679,7 +696,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupStart();
+      }
     }
   }
 
@@ -692,7 +711,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupEnd();
+      }
     }
   }
 

--
Gitblit v1.10.0