From f948474a8031c24160da4b31f0b97354456b40ad Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 03 Jul 2014 15:19:12 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3938) Replica offline messages should be synced with updates

---
 opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java |  125 ++++++++++++++++++++++++-----------------
 1 files changed, 73 insertions(+), 52 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index f78bf74..e22aa5b 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -41,13 +41,13 @@
 import org.opends.server.api.*;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.service.DSRSShutdownSync;
 import org.opends.server.types.*;
 import org.opends.server.types.operation.*;
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.replication.plugin.
-ReplicationRepairRequestControl.*;
+import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
 import static org.opends.server.util.StaticUtils.*;
 
 /**
@@ -67,9 +67,10 @@
                   BackupTaskListener, RestoreTaskListener, ImportTaskListener,
                   ExportTaskListener
 {
-  private ReplicationServerListener replicationServerListener = null;
+  private ReplicationServerListener replicationServerListener;
   private static final Map<DN, LDAPReplicationDomain> domains =
-    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
+      new ConcurrentHashMap<DN, LDAPReplicationDomain>(4);
+  private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
 
   /**
    * The queue of received update messages, to be treated by the ReplayThread
@@ -113,8 +114,7 @@
    *                   Can be null is the request has no associated operation.
    * @return           The domain for this DN.
    */
-  public static LDAPReplicationDomain findDomain(
-      DN dn, PluginOperation pluginOp)
+  public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
   {
     /*
      * Don't run the special replication code on Operation that are
@@ -124,7 +124,9 @@
     {
         final Operation op = (Operation) pluginOp;
         if (op.dontSynchronize())
+        {
           return null;
+        }
 
         /*
          * Check if the provided operation is a repair operation and set the
@@ -181,8 +183,8 @@
   {
     try
     {
-      LDAPReplicationDomain domain =
-          new LDAPReplicationDomain(configuration, updateToReplayQueue);
+      final LDAPReplicationDomain domain = new LDAPReplicationDomain(
+          configuration, updateToReplayQueue, dsrsShutdownSync);
       if (domains.size() == 0)
       {
         // Create the threads that will process incoming update messages
@@ -218,9 +220,8 @@
       BlockingQueue<UpdateToReplay> queue)
       throws ConfigException
   {
-    LDAPReplicationDomain domain =
-        new LDAPReplicationDomain(configuration, queue);
-
+    final LDAPReplicationDomain domain =
+        new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
     domains.put(domain.getBaseDN(), domain);
     return domain;
   }
@@ -246,35 +247,30 @@
   /** {@inheritDoc} */
   @Override
   public void initializeSynchronizationProvider(
-      ReplicationSynchronizationProviderCfg configuration)
-  throws ConfigException
+      ReplicationSynchronizationProviderCfg cfg) throws ConfigException
   {
     domains.clear();
-    replicationServerListener = new ReplicationServerListener(configuration);
+    replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
 
     // Register as an add and delete listener with the root configuration so we
     // can be notified if Multimaster domain entries are added or removed.
-    configuration.addReplicationDomainAddListener(this);
-    configuration.addReplicationDomainDeleteListener(this);
+    cfg.addReplicationDomainAddListener(this);
+    cfg.addReplicationDomainDeleteListener(this);
 
     // Register as a root configuration listener so that we can be notified if
     // number of replay threads is changed and apply changes.
-    configuration.addReplicationChangeListener(this);
+    cfg.addReplicationChangeListener(this);
 
-    replayThreadNumber = configuration.getNumUpdateReplayThreads();
-    connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
-        Integer.MAX_VALUE);
+    replayThreadNumber = cfg.getNumUpdateReplayThreads();
+    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
 
     //  Create the list of domains that are already defined.
-    for (String name : configuration.listReplicationDomains())
+    for (String name : cfg.listReplicationDomains())
     {
-      createNewDomain(configuration.getReplicationDomain(name));
+      createNewDomain(cfg.getReplicationDomain(name));
     }
 
-    /*
-     * If any schema changes were made with the server offline, then handle them
-     * now.
-     */
+    // If any schema changes were made with the server offline, then handle them now.
     List<Modification> offlineSchemaChanges =
          DirectoryServer.getOfflineSchemaChanges();
     if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
@@ -402,12 +398,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationModifyOperation modifyOperation)
   {
-    LDAPReplicationDomain domain =
-      findDomain(modifyOperation.getEntryDN(), modifyOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(modifyOperation);
+    LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(modifyOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -415,12 +411,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationAddOperation addOperation) throws DirectoryException
   {
-    LDAPReplicationDomain domain =
-      findDomain(addOperation.getEntryDN(), addOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(addOperation);
+    LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(addOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -428,12 +424,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationDeleteOperation deleteOperation) throws DirectoryException
   {
-    LDAPReplicationDomain domain =
-      findDomain(deleteOperation.getEntryDN(), deleteOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(deleteOperation);
+    LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(deleteOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -441,12 +437,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
   {
-    LDAPReplicationDomain domain =
-      findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(modifyDNOperation);
+    LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(modifyDNOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -505,7 +501,9 @@
     LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
 
     if (domain == null || !domain.solveConflict())
+    {
       return new SynchronizationProviderResult.ContinueProcessing();
+    }
 
     // The historical object is retrieved from the attachment created
     // in the HandleConflictResolution phase.
@@ -537,11 +535,15 @@
     LDAPReplicationDomain domain =
       findDomain(addOperation.getEntryDN(), addOperation);
     if (domain == null)
+    {
       return new SynchronizationProviderResult.ContinueProcessing();
+    }
 
     // For LOCAL op only, generate CSN and attach Context
     if (!addOperation.isSynchronizationOperation())
+    {
       domain.doPreOperation(addOperation);
+    }
 
     // Add to the operation the historical attribute : "dn:changeNumber:add"
     EntryHistorical.setHistoricalAttrToOperation(addOperation);
@@ -564,7 +566,9 @@
     stopReplayThreads();
 
     if (replicationServerListener != null)
+    {
       replicationServerListener.shutdown();
+    }
 
     DirectoryServer.deregisterBackupTaskListener(this);
     DirectoryServer.deregisterRestoreTaskListener(this);
@@ -585,10 +589,11 @@
   @Override
   public void processSchemaChange(List<Modification> modifications)
   {
-    LDAPReplicationDomain domain =
-      findDomain(DirectoryServer.getSchemaDN(), null);
+    LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
     if (domain != null)
+    {
       domain.synchronizeModifications(modifications);
+    }
   }
 
   /** {@inheritDoc} */
@@ -599,7 +604,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupStart();
+      }
     }
   }
 
@@ -612,7 +619,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupEnd();
+      }
     }
   }
 
@@ -624,7 +633,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.disable();
+      }
     }
   }
 
@@ -637,7 +648,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.enable();
+      }
     }
   }
 
@@ -649,7 +662,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.disable();
+      }
     }
   }
 
@@ -662,7 +677,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.enable();
+      }
     }
   }
 
@@ -674,7 +691,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupStart();
+      }
     }
   }
 
@@ -687,7 +706,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupEnd();
+      }
     }
   }
 

--
Gitblit v1.10.0