From aea0892feca2fd3d56c9c810debed6d22389454e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 11 Jul 2014 09:54:43 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3938) Replica offline messages should be synced with updates

---
 opendj3-server-dev/src/server/org/opends/server/replication/service/DSRSShutdownSync.java         |   85 ++++++++++
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java     |    8 
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java |   65 +++----
 opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java        |    9 
 opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java         |   37 +++
 opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java           |   36 ++-
 opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java            |   30 +-
 opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java              |   20 +
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java    |  125 +++++++++------
 opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java      |    5 
 opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java             |   28 +-
 opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java       |    7 
 opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java               |   10 +
 13 files changed, 319 insertions(+), 146 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index d99d8c1..cc581d0 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -70,6 +70,7 @@
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.common.StatusMachineEvent;
 import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.service.DSRSShutdownSync;
 import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.tasks.PurgeConflictsHistoricalTask;
@@ -185,6 +186,7 @@
   public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
+  private final DSRSShutdownSync dsrsShutdownSync;
   /**
    * The update to replay message queue where the listener thread is going to
    * push incoming update messages.
@@ -455,14 +457,17 @@
    *
    * @param configuration    The configuration of this ReplicationDomain.
    * @param updateToReplayQueue The queue for update messages to replay.
+   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
    * @throws ConfigException In case of invalid configuration.
    */
   LDAPReplicationDomain(ReplicationDomainCfg configuration,
-      BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException
+      BlockingQueue<UpdateToReplay> updateToReplayQueue,
+      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
   {
     super(configuration, -1);
 
     this.updateToReplayQueue = updateToReplayQueue;
+    this.dsrsShutdownSync = dsrsShutdownSync;
 
     // Get assured configuration
     readAssuredConfig(configuration, false);
@@ -2011,6 +2016,7 @@
   public void publishReplicaOfflineMsg()
   {
     pendingChanges.putReplicaOfflineMsg();
+    dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN());
   }
 
   /**
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index 0eec310..d3eb610 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -43,14 +43,14 @@
 import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
 import org.opends.server.api.*;
 import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.service.DSRSShutdownSync;
 import org.opends.server.types.*;
 import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.opends.server.types.operation.*;
 
 import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.replication.plugin.
-ReplicationRepairRequestControl.*;
+import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
 import static org.opends.server.util.StaticUtils.*;
 
 /**
@@ -73,9 +73,10 @@
 
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
-  private ReplicationServerListener replicationServerListener = null;
+  private ReplicationServerListener replicationServerListener;
   private static final Map<DN, LDAPReplicationDomain> domains =
-    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
+      new ConcurrentHashMap<DN, LDAPReplicationDomain>(4);
+  private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
 
   /**
    * The queue of received update messages, to be treated by the ReplayThread
@@ -119,8 +120,7 @@
    *                   Can be null is the request has no associated operation.
    * @return           The domain for this DN.
    */
-  public static LDAPReplicationDomain findDomain(
-      DN dn, PluginOperation pluginOp)
+  public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
   {
     /*
      * Don't run the special replication code on Operation that are
@@ -130,7 +130,9 @@
     {
         final Operation op = (Operation) pluginOp;
         if (op.dontSynchronize())
+        {
           return null;
+        }
 
         /*
          * Check if the provided operation is a repair operation and set the
@@ -187,8 +189,8 @@
   {
     try
     {
-      LDAPReplicationDomain domain =
-          new LDAPReplicationDomain(configuration, updateToReplayQueue);
+      final LDAPReplicationDomain domain = new LDAPReplicationDomain(
+          configuration, updateToReplayQueue, dsrsShutdownSync);
       if (domains.size() == 0)
       {
         // Create the threads that will process incoming update messages
@@ -223,9 +225,8 @@
       BlockingQueue<UpdateToReplay> queue)
       throws ConfigException
   {
-    LDAPReplicationDomain domain =
-        new LDAPReplicationDomain(configuration, queue);
-
+    final LDAPReplicationDomain domain =
+        new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
     domains.put(domain.getBaseDN(), domain);
     return domain;
   }
@@ -251,35 +252,30 @@
   /** {@inheritDoc} */
   @Override
   public void initializeSynchronizationProvider(
-      ReplicationSynchronizationProviderCfg configuration)
-  throws org.forgerock.opendj.config.server.ConfigException
+      ReplicationSynchronizationProviderCfg cfg) throws ConfigException
   {
     domains.clear();
-    replicationServerListener = new ReplicationServerListener(configuration);
+    replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
 
     // Register as an add and delete listener with the root configuration so we
     // can be notified if Multimaster domain entries are added or removed.
-    configuration.addReplicationDomainAddListener(this);
-    configuration.addReplicationDomainDeleteListener(this);
+    cfg.addReplicationDomainAddListener(this);
+    cfg.addReplicationDomainDeleteListener(this);
 
     // Register as a root configuration listener so that we can be notified if
     // number of replay threads is changed and apply changes.
-    configuration.addReplicationChangeListener(this);
+    cfg.addReplicationChangeListener(this);
 
-    replayThreadNumber = configuration.getNumUpdateReplayThreads();
-    connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
-        Integer.MAX_VALUE);
+    replayThreadNumber = cfg.getNumUpdateReplayThreads();
+    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
 
     //  Create the list of domains that are already defined.
-    for (String name : configuration.listReplicationDomains())
+    for (String name : cfg.listReplicationDomains())
     {
-      createNewDomain(configuration.getReplicationDomain(name));
+      createNewDomain(cfg.getReplicationDomain(name));
     }
 
-    /*
-     * If any schema changes were made with the server offline, then handle them
-     * now.
-     */
+    // If any schema changes were made with the server offline, then handle them now.
     List<Modification> offlineSchemaChanges =
          DirectoryServer.getOfflineSchemaChanges();
     if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
@@ -407,12 +403,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationModifyOperation modifyOperation)
   {
-    LDAPReplicationDomain domain =
-      findDomain(modifyOperation.getEntryDN(), modifyOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(modifyOperation);
+    LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(modifyOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -420,12 +416,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationAddOperation addOperation) throws DirectoryException
   {
-    LDAPReplicationDomain domain =
-      findDomain(addOperation.getEntryDN(), addOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(addOperation);
+    LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(addOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -433,12 +429,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationDeleteOperation deleteOperation) throws DirectoryException
   {
-    LDAPReplicationDomain domain =
-      findDomain(deleteOperation.getEntryDN(), deleteOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(deleteOperation);
+    LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(deleteOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -446,12 +442,12 @@
   public SynchronizationProviderResult handleConflictResolution(
       PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
   {
-    LDAPReplicationDomain domain =
-      findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
-    if (domain == null)
-      return new SynchronizationProviderResult.ContinueProcessing();
-
-    return domain.handleConflictResolution(modifyDNOperation);
+    LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
+    if (domain != null)
+    {
+      return domain.handleConflictResolution(modifyDNOperation);
+    }
+    return new SynchronizationProviderResult.ContinueProcessing();
   }
 
   /** {@inheritDoc} */
@@ -510,7 +506,9 @@
     LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
 
     if (domain == null || !domain.solveConflict())
+    {
       return new SynchronizationProviderResult.ContinueProcessing();
+    }
 
     // The historical object is retrieved from the attachment created
     // in the HandleConflictResolution phase.
@@ -542,11 +540,15 @@
     LDAPReplicationDomain domain =
       findDomain(addOperation.getEntryDN(), addOperation);
     if (domain == null)
+    {
       return new SynchronizationProviderResult.ContinueProcessing();
+    }
 
     // For LOCAL op only, generate CSN and attach Context
     if (!addOperation.isSynchronizationOperation())
+    {
       domain.doPreOperation(addOperation);
+    }
 
     // Add to the operation the historical attribute : "dn:changeNumber:add"
     EntryHistorical.setHistoricalAttrToOperation(addOperation);
@@ -569,7 +571,9 @@
     stopReplayThreads();
 
     if (replicationServerListener != null)
+    {
       replicationServerListener.shutdown();
+    }
 
     DirectoryServer.deregisterBackupTaskListener(this);
     DirectoryServer.deregisterRestoreTaskListener(this);
@@ -590,10 +594,11 @@
   @Override
   public void processSchemaChange(List<Modification> modifications)
   {
-    LDAPReplicationDomain domain =
-      findDomain(DirectoryServer.getSchemaDN(), null);
+    LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
     if (domain != null)
+    {
       domain.synchronizeModifications(modifications);
+    }
   }
 
   /** {@inheritDoc} */
@@ -604,7 +609,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupStart();
+      }
     }
   }
 
@@ -617,7 +624,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupEnd();
+      }
     }
   }
 
@@ -629,7 +638,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.disable();
+      }
     }
   }
 
@@ -642,7 +653,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.enable();
+      }
     }
   }
 
@@ -654,7 +667,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.disable();
+      }
     }
   }
 
@@ -667,7 +682,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.enable();
+      }
     }
   }
 
@@ -679,7 +696,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupStart();
+      }
     }
   }
 
@@ -692,7 +711,9 @@
     {
       LDAPReplicationDomain domain = findDomain(dn, null);
       if (domain != null)
+      {
         domain.backupEnd();
+      }
     }
   }
 
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index beb6d19..8252018 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -126,7 +126,10 @@
 
     while (firstChange != null && firstChange.isCommitted())
     {
-      state.update(firstCSN);
+      if (firstChange.getMsg().contributesToDomainState())
+      {
+        state.update(firstCSN);
+      }
       pendingChanges.remove(firstCSN);
 
       if (pendingChanges.isEmpty())
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
index 96de92f..939bfa0 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
@@ -26,30 +26,30 @@
  */
 package org.opends.server.replication.plugin;
 
-import org.forgerock.i18n.LocalizableMessage;
-
 import java.util.List;
 
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ResultCode;
+
 import org.opends.server.admin.server.ConfigurationAddListener;
 import org.opends.server.admin.server.ConfigurationDeleteListener;
-import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
-import org.forgerock.opendj.config.server.ConfigException;
+import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
 import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.service.DSRSShutdownSync;
 import org.opends.server.types.ConfigChangeResult;
-import org.forgerock.opendj.ldap.ResultCode;
-
 
 /**
  * This class is used to create and object that can
  * register in the admin framework as a listener for changes, add and delete
  * on the ReplicationServer configuration objects.
- *
  */
 public class ReplicationServerListener
        implements ConfigurationAddListener<ReplicationServerCfg>,
        ConfigurationDeleteListener<ReplicationServerCfg>
 {
+  private final DSRSShutdownSync dsrsShutdownSync;
   private ReplicationServer replicationServer;
 
   /**
@@ -58,36 +58,36 @@
    *
    * @param configuration The configuration that will be used to listen
    *                      for replicationServer configuration changes.
-   *
+   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
    * @throws ConfigException if the ReplicationServerListener can't register for
    *                         listening to changes on the provided configuration
    *                         object.
    */
   public ReplicationServerListener(
-      ReplicationSynchronizationProviderCfg configuration)
-      throws ConfigException
+      ReplicationSynchronizationProviderCfg configuration,
+      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
   {
     configuration.addReplicationServerAddListener(this);
     configuration.addReplicationServerDeleteListener(this);
 
+    this.dsrsShutdownSync = dsrsShutdownSync;
     if (configuration.hasReplicationServer())
     {
-      ReplicationServerCfg server = configuration.getReplicationServer();
-      replicationServer = new ReplicationServer(server);
+      final ReplicationServerCfg cfg = configuration.getReplicationServer();
+      replicationServer = new ReplicationServer(cfg, dsrsShutdownSync);
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public ConfigChangeResult applyConfigurationAdd(
-      ReplicationServerCfg configuration)
+  /** {@inheritDoc} */
+  @Override
+  public ConfigChangeResult applyConfigurationAdd(ReplicationServerCfg cfg)
   {
     try
     {
-      replicationServer = new ReplicationServer(configuration);
+      replicationServer = new ReplicationServer(cfg, dsrsShutdownSync);
       return new ConfigChangeResult(ResultCode.SUCCESS, false);
-    } catch (ConfigException e)
+    }
+    catch (ConfigException e)
     {
       // we should never get to this point because the configEntry has
       // already been validated in configAddisAcceptable
@@ -95,14 +95,12 @@
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
+  @Override
   public boolean isConfigurationAddAcceptable(
-      ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
+      ReplicationServerCfg cfg, List<LocalizableMessage> unacceptableReasons)
   {
-    return ReplicationServer.isConfigurationAcceptable(
-      configuration, unacceptableReasons);
+    return ReplicationServer.isConfigurationAcceptable(cfg, unacceptableReasons);
   }
 
   /**
@@ -111,14 +109,14 @@
   public void shutdown()
   {
     if (replicationServer != null)
+    {
       replicationServer.shutdown();
+    }
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public ConfigChangeResult applyConfigurationDelete(
-      ReplicationServerCfg configuration)
+  /** {@inheritDoc} */
+  @Override
+  public ConfigChangeResult applyConfigurationDelete(ReplicationServerCfg cfg)
   {
     // There can be only one replicationServer, just shutdown the
     // replicationServer currently configured.
@@ -129,11 +127,10 @@
     return new ConfigChangeResult(ResultCode.SUCCESS, false);
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
+  @Override
   public boolean isConfigurationDeleteAcceptable(
-      ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
+      ReplicationServerCfg cfg, List<LocalizableMessage> unacceptableReasons)
   {
     return true;
   }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
index a9ac7c7..a1409cd 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
@@ -106,6 +106,13 @@
 
   /** {@inheritDoc} */
   @Override
+  public boolean contributesToDomainState()
+  {
+    return false; // replica offline msg MUST NOT update the ds-sync-state
+  }
+
+  /** {@inheritDoc} */
+  @Override
   public String toString()
   {
     return getClass().getSimpleName() + " offlineCSN=" + csn;
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java
index 3cf7e78..f27e597 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -294,4 +294,14 @@
   {
     return payload;
   }
+
+  /**
+   * Whether the current message can update the "ds-sync-state" attribute.
+   *
+   * @return true if current message can update the "ds-sync-state" attribute, false otherwise.
+   */
+  public boolean contributesToDomainState()
+  {
+    return true;
+  }
 }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 880308c..0005e50 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -35,6 +35,7 @@
 import org.opends.server.replication.protocol.DoneMsg;
 import org.opends.server.replication.protocol.ECLUpdateMsg;
 import org.opends.server.replication.protocol.Session;
+import org.opends.server.replication.service.DSRSShutdownSync;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
@@ -47,7 +48,7 @@
  * This class defines a server writer, which is used to send changes to a
  * directory server.
  */
-public class ECLServerWriter extends ServerWriter
+class ECLServerWriter extends ServerWriter
 {
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
@@ -56,7 +57,7 @@
   private final ReplicationServerDomain replicationServerDomain;
   private boolean suspended;
   private volatile boolean shutdown;
-  private PersistentSearch mypsearch;
+  private final PersistentSearch mypsearch;
 
   /**
    * Create a ServerWriter.
@@ -66,10 +67,10 @@
    * @param replicationServerDomain the ReplicationServerDomain of this
    *                    ServerWriter.
    */
-  public ECLServerWriter(Session session, ECLServerHandler handler,
+  ECLServerWriter(Session session, ECLServerHandler handler,
       ReplicationServerDomain replicationServerDomain)
   {
-    super(session, handler, replicationServerDomain);
+    super(session, handler, replicationServerDomain, new DSRSShutdownSync());
 
     setName("Replication ECL Writer Thread for operation " +
         handler.getOperationId());
@@ -79,21 +80,26 @@
     this.replicationServerDomain = replicationServerDomain;
     this.suspended = false;
     this.shutdown = false;
+    this.mypsearch = findPersistentSearch(handler);
+  }
 
-    // Look for the psearch object related to this operation, the one that
-    // will be notified with new entries to be returned.
-    ECLWorkflowElement wfe =
-        (ECLWorkflowElement) DirectoryServer
-            .getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
+  /**
+   * Look for the persistent search object related to this operation, the one
+   * that will be notified with new entries to be returned.
+   */
+  private PersistentSearch findPersistentSearch(ECLServerHandler handler)
+  {
+    ECLWorkflowElement wfe = (ECLWorkflowElement)
+        DirectoryServer.getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
     for (PersistentSearch psearch : wfe.getPersistentSearches())
     {
       if (psearch.getSearchOperation().toString().equals(
           handler.getOperationId()))
       {
-        mypsearch = psearch;
-        break;
+        return psearch;
       }
     }
+    return null;
   }
 
   /**
@@ -101,7 +107,7 @@
    * waiting for the startCLSessionMsg. Then it may be
    * suspended between 2 jobs, each job being a separate search.
    */
-  public synchronized void suspendWriter()
+  private synchronized void suspendWriter()
   {
     suspended = true;
   }
@@ -109,7 +115,7 @@
   /**
    * Resume the writer.
    */
-  public synchronized void resumeWriter()
+  synchronized void resumeWriter()
   {
     suspended = false;
     notify();
@@ -180,7 +186,7 @@
    * @throws IOException when raised (connection closure)
    * @throws InterruptedException when raised
    */
-  public void doIt() throws IOException, InterruptedException
+  private void doIt() throws IOException, InterruptedException
   {
     while (true)
     {
@@ -230,7 +236,7 @@
   /**
    * Shutdown the writer.
    */
-  public synchronized void shutdownWriter()
+  synchronized void shutdownWriter()
   {
     shutdown = true;
     notify();
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
index 9171191..e3684be 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -44,6 +44,7 @@
 import org.opends.server.types.*;
 import org.forgerock.opendj.ldap.ResultCode;
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.types.Attributes.*;
 import static org.opends.server.util.StaticUtils.*;
 
 /**
@@ -228,13 +229,10 @@
   public List<Attribute> getMonitorData()
   {
     List<Attribute> attributes = new ArrayList<Attribute>();
-    attributes.add(Attributes.create("handler", getMonitorInstanceName()));
-    attributes.add(
-        Attributes.create("queue-size", String.valueOf(msgQueue.count())));
-    attributes.add(
-        Attributes.create(
-            "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
-    attributes.add(Attributes.create("following", String.valueOf(following)));
+    attributes.add(create("handler", getMonitorInstanceName()));
+    attributes.add(create("queue-size", String.valueOf(msgQueue.count())));
+    attributes.add(create("queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
+    attributes.add(create("following", String.valueOf(following)));
     return attributes;
   }
 
@@ -419,21 +417,20 @@
    */
   public CSN getOlderUpdateCSN()
   {
-    CSN result = null;
     synchronized (msgQueue)
     {
       if (following)
       {
         if (!msgQueue.isEmpty())
         {
-          result = msgQueue.first().getCSN();
+          return msgQueue.first().getCSN();
         }
       }
       else
       {
         if (!lateQueue.isEmpty())
         {
-          result = lateQueue.first().getCSN();
+          return lateQueue.first().getCSN();
         }
         else
         {
@@ -444,11 +441,11 @@
           the lateQueue when it will send the next update but we are not yet
           there. So let's take the last change not sent directly from the db.
           */
-          result = findOldestCSNFromReplicaDBs();
+          return findOldestCSNFromReplicaDBs();
         }
       }
     }
-    return result;
+    return null;
   }
 
   private CSN findOldestCSNFromReplicaDBs()
@@ -457,10 +454,13 @@
     try
     {
       cursor = replicationServerDomain.getCursorFrom(serverState);
-      cursor.next();
-      if (cursor.getRecord() != null)
+      while (cursor.next())
       {
-        return cursor.getRecord().getCSN();
+        final UpdateMsg record = cursor.getRecord();
+        if (record.contributesToDomainState())
+        {
+          return record.getCSN();
+        }
       }
       return null;
     }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
index 2c6d9a8..61a993c 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -54,6 +54,7 @@
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.je.JEChangelogDB;
+import org.opends.server.replication.service.DSRSShutdownSync;
 import org.opends.server.types.*;
 import org.opends.server.util.ServerConstants;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -79,6 +80,7 @@
 
   /** The current configuration of this replication server. */
   private ReplicationServerCfg config;
+  private final DSRSShutdownSync dsrsShutdownSync;
 
   /**
    * This table is used to store the list of dn for which we are currently
@@ -122,18 +124,31 @@
   /**
    * Creates a new Replication server using the provided configuration entry.
    *
-   * @param configuration The configuration of this replication server.
+   * @param cfg The configuration of this replication server.
    * @throws ConfigException When Configuration is invalid.
    */
-  public ReplicationServer(ReplicationServerCfg configuration)
-    throws ConfigException
+  public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
   {
-    this.config = configuration;
-    this.changelogDB = new JEChangelogDB(this, configuration);
+    this(cfg, new DSRSShutdownSync());
+  }
+
+  /**
+   * Creates a new Replication server using the provided configuration entry.
+   *
+   * @param cfg The configuration of this replication server.
+   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
+   * @throws ConfigException When Configuration is invalid.
+   */
+  public ReplicationServer(ReplicationServerCfg cfg,
+      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+  {
+    this.config = cfg;
+    this.changelogDB = new JEChangelogDB(this, cfg);
+    this.dsrsShutdownSync = dsrsShutdownSync;
 
     replSessionSecurity = new ReplSessionSecurity();
     initialize();
-    configuration.addChangeListener(this);
+    cfg.addChangeListener(this);
 
     localPorts.add(getReplicationPort());
 
@@ -1183,6 +1198,16 @@
     return this.changelogDB;
   }
 
+  /**
+   * Returns the synchronization object for shutdown of combined DS/RS instances.
+   *
+   * @return the synchronization object for shutdown of combined DS/RS instances.
+   */
+  DSRSShutdownSync getDSRSShutdownSync()
+  {
+    return dsrsShutdownSync;
+  }
+
   /** {@inheritDoc} */
   @Override
   public String toString()
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
index d83ba85..3c664be 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -72,7 +72,7 @@
   /**
    * The session opened with the remote server.
    */
-  protected Session session;
+  protected final Session session;
 
   /**
    * The serverURL of the remote server.
@@ -81,40 +81,39 @@
   /**
    * Number of updates received from the server in assured safe read mode.
    */
-  protected int assuredSrReceivedUpdates = 0;
+  private int assuredSrReceivedUpdates = 0;
   /**
    * Number of updates received from the server in assured safe read mode that
    * timed out.
    */
-  protected AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
+  private final AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
   /**
    * Number of updates sent to the server in assured safe read mode.
    */
-  protected int assuredSrSentUpdates = 0;
+  private int assuredSrSentUpdates = 0;
   /**
    * Number of updates sent to the server in assured safe read mode that timed
    * out.
    */
-  protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
+  private final AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
   /**
    * Number of updates received from the server in assured safe data mode.
    */
-  protected int assuredSdReceivedUpdates = 0;
+  private int assuredSdReceivedUpdates = 0;
   /**
    * Number of updates received from the server in assured safe data mode that
    * timed out.
    */
-  protected AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
+  private final AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
   /**
    * Number of updates sent to the server in assured safe data mode.
    */
-  protected int assuredSdSentUpdates = 0;
+  private int assuredSdSentUpdates = 0;
 
   /**
-   * Number of updates sent to the server in assured safe data mode that timed
-   * out.
+   * Number of updates sent to the server in assured safe data mode that timed out.
    */
-  protected AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
+  private final AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
 
   /**
    * The associated ServerWriter that sends messages to the remote server.
@@ -305,7 +304,8 @@
       // sendWindow MUST be created before starting the writer
       sendWindow = new Semaphore(sendWindowSize);
 
-      writer = new ServerWriter(session, this, replicationServerDomain);
+      writer = new ServerWriter(session, this, replicationServerDomain,
+          replicationServer.getDSRSShutdownSync());
       reader = new ServerReader(session, this);
 
       session.setName("Replication server RS(" + getReplicationServerId()
@@ -630,7 +630,7 @@
    * Increment the number of updates sent to the server in assured safe data
    * mode.
    */
-  public void incrementAssuredSdSentUpdates()
+  private void incrementAssuredSdSentUpdates()
   {
     assuredSdSentUpdates++;
   }
@@ -666,7 +666,7 @@
    * Increment the number of updates sent to the server in assured safe read
    * mode.
    */
-  public void incrementAssuredSrSentUpdates()
+  private void incrementAssuredSrSentUpdates()
   {
     assuredSrSentUpdates++;
   }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java
index b071f99..343e2d5 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -32,8 +32,10 @@
 import org.opends.server.api.DirectoryThread;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
 import org.opends.server.replication.protocol.Session;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.service.DSRSShutdownSync;
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.replication.common.ServerStatus.*;
@@ -50,8 +52,7 @@
   private final Session session;
   private final ServerHandler handler;
   private final ReplicationServerDomain replicationServerDomain;
-
-
+  private final DSRSShutdownSync dsrsShutdownSync;
 
   /**
    * Create a ServerWriter. Then ServerWriter then waits on the ServerHandler
@@ -63,9 +64,11 @@
    *          handler for which the ServerWriter is created.
    * @param replicationServerDomain
    *          The ReplicationServerDomain of this ServerWriter.
+   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
    */
   public ServerWriter(Session session, ServerHandler handler,
-      ReplicationServerDomain replicationServerDomain)
+      ReplicationServerDomain replicationServerDomain,
+      DSRSShutdownSync dsrsShutdownSync)
   {
     // Session may be null for ECLServerWriter.
     super("Replication server RS(" + handler.getReplicationServerId()
@@ -75,6 +78,7 @@
     this.session = session;
     this.handler = handler;
     this.replicationServerDomain = replicationServerDomain;
+    this.dsrsShutdownSync = dsrsShutdownSync;
   }
 
   /**
@@ -93,7 +97,9 @@
     LocalizableMessage errMessage = null;
     try
     {
-      while (true)
+      boolean shutdown = false;
+      while (!shutdown
+          || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
       {
         final UpdateMsg updateMsg = replicationServerDomain.take(this.handler);
         if (updateMsg == null)
@@ -101,12 +107,16 @@
           // this connection is closing
           errMessage = LocalizableMessage.raw(
            "Connection closure: null update returned by domain.");
-          return;
+          shutdown = true;
         }
         else if (!isUpdateMsgFiltered(updateMsg))
         {
           // Publish the update to the remote server using a protocol version it supports
           session.publish(updateMsg);
+          if (updateMsg instanceof ReplicaOfflineMsg)
+          {
+            dsrsShutdownSync.replicaOfflineMsgForwarded(replicationServerDomain.getBaseDN());
+          }
         }
       }
     }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/DSRSShutdownSync.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/DSRSShutdownSync.java
new file mode 100644
index 0000000..5ef595e
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/DSRSShutdownSync.java
@@ -0,0 +1,85 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.service;
+
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opends.server.types.DN;
+
+/**
+ * Class useful for the case where DS/RS instances are collocated inside the
+ * same JVM. It synchronizes the shutdown of the DS and RS sides.
+ * <p>
+ * More specifically, it ensures a ReplicaOfflineMsg sent by the DS is
+ * relayed/forwarded by the collocated RS to the other RSs in the topology
+ * before the whole process shuts down.
+ *
+ * @since OPENDJ-1453
+ */
+public class DSRSShutdownSync
+{
+
+  private static final ConcurrentSkipListSet<DN> replicaOfflineMsgs =
+      new ConcurrentSkipListSet<DN>();
+  private static AtomicLong stopInstanceTimestamp = new AtomicLong();
+
+  /**
+   * Message has been sent.
+   *
+   * @param baseDN
+   *          the domain for which the message has been sent
+   */
+  public void replicaOfflineMsgSent(DN baseDN)
+  {
+    stopInstanceTimestamp.compareAndSet(0, System.currentTimeMillis());
+    replicaOfflineMsgs.add(baseDN);
+  }
+
+  /**
+   * Message has been forwarded.
+   *
+   * @param baseDN
+   *          the domain for which the message has been sent
+   */
+  public void replicaOfflineMsgForwarded(DN baseDN)
+  {
+    replicaOfflineMsgs.remove(baseDN);
+  }
+
+  /**
+   * Whether a ReplicationServer ServerReader or ServerWriter can proceed with
+   * shutdown.
+   *
+   * @param baseDN
+   *          the baseDN of the ServerReader or ServerWriter .
+   * @return true if the caller can shutdown, false otherwise
+   */
+  public boolean canShutdown(DN baseDN)
+  {
+    return !replicaOfflineMsgs.contains(baseDN)
+        || System.currentTimeMillis() - stopInstanceTimestamp.get() > 5000;
+  }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 667cc2a..2faa52d 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -2974,7 +2974,8 @@
               // The server is shutting down.
               listenerThread.initiateShutdown();
             }
-            else if (processUpdate(updateMsg))
+            else if (processUpdate(updateMsg)
+                && updateMsg.contributesToDomainState())
             {
               /*
                * Warning: in synchronous mode, no way to tell the replay of an
@@ -3393,9 +3394,11 @@
    */
   public void publish(UpdateMsg msg)
   {
-    // Publish the update
     broker.publish(msg);
-    state.update(msg.getCSN());
+    if (msg.contributesToDomainState())
+    {
+      state.update(msg.getCSN());
+    }
     numSentUpdates.incrementAndGet();
   }
 

--
Gitblit v1.10.0