mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.19.2014 f948474a8031c24160da4b31f0b97354456b40ad
OPENDJ-1453 (CR-3938) Replica offline messages should be synced with updates

This changed managed to solve the combined DS/RS case. Mostly, because robot test is sometimes failing yet.
This is fixed by introducing a DSRSShutdownSync class which ensures that when the DS sends a ReplicaOfflineMsg, the RS will relay it to the rest of the topology.
Second main change is to ensure ReplicaOfflineMsgs do not update the domain's ServerState otherwise the functional tests detect an inconsistency in the backend ds-sync-state data between the 2 replica: the one that is stopped and the one that is never stopped. For this I added UpdateMsg.contributesToDomainState() and used it throughout.
The rest of the change is passing the DSRSShutdownSync down method calls.



DSRSShutdownSync.java: ADDED

UpdateMsg.java, ReplicaOfflineMsg.java:
Added contributesToDomainState()



MultimasterReplication.java:
Added dsrsShutdownSync field + created it here + passed it down creation of LDAPReplicationDomain and ReplicationServerListener.
Code cleanup.

LDAPReplicationDomain.java:
Added dsrsShutdownSync field + used it in publishReplicaOfflineMsg().

ReplicationServerListener.java
Added dsrsShutdownSync field + used it when creating ReplicationServer.
Code cleanup.

ReplicationServer.java:
Added dsrsShutdownSync field + added getDSRSShutdownSync() getter.

ServerHandler.java
Called ReplicationServer.getDSRSShutdownSync() when creating ServerWriter.
Code cleanup.

ServerWriter.java:
Added dsrsShutdownSync field + used it in run().



ECLServerWriter.java:
Consequence of the change to ServerWriter.
Code cleanup.



RemotePendingChanges.java, ReplicationDomain.java, MessageHandler.java:
Used UpdateMsg.contributesToDomainState().
1 files added
12 files modified
492 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 125 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java 5 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java 63 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java 36 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java 30 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 66 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 28 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 20 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/DSRSShutdownSync.java 85 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -67,6 +67,7 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
@@ -182,6 +183,7 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private final DSRSShutdownSync dsrsShutdownSync;
  /**
   * The update to replay message queue where the listener thread is going to
   * push incoming update messages.
@@ -452,14 +454,17 @@
   *
   * @param configuration    The configuration of this ReplicationDomain.
   * @param updateToReplayQueue The queue for update messages to replay.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @throws ConfigException In case of invalid configuration.
   */
  LDAPReplicationDomain(ReplicationDomainCfg configuration,
      BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException
      BlockingQueue<UpdateToReplay> updateToReplayQueue,
      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  {
    super(configuration, -1);
    this.updateToReplayQueue = updateToReplayQueue;
    this.dsrsShutdownSync = dsrsShutdownSync;
    // Get assured configuration
    readAssuredConfig(configuration, false);
@@ -2017,6 +2022,7 @@
  public void publishReplicaOfflineMsg()
  {
    pendingChanges.putReplicaOfflineMsg();
    dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN());
  }
  /**
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -41,13 +41,13 @@
import org.opends.server.api.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.opends.server.types.operation.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -67,9 +67,10 @@
                  BackupTaskListener, RestoreTaskListener, ImportTaskListener,
                  ExportTaskListener
{
  private ReplicationServerListener replicationServerListener = null;
  private ReplicationServerListener replicationServerListener;
  private static final Map<DN, LDAPReplicationDomain> domains =
    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
      new ConcurrentHashMap<DN, LDAPReplicationDomain>(4);
  private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
  /**
   * The queue of received update messages, to be treated by the ReplayThread
@@ -113,8 +114,7 @@
   *                   Can be null is the request has no associated operation.
   * @return           The domain for this DN.
   */
  public static LDAPReplicationDomain findDomain(
      DN dn, PluginOperation pluginOp)
  public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
  {
    /*
     * Don't run the special replication code on Operation that are
@@ -124,7 +124,9 @@
    {
        final Operation op = (Operation) pluginOp;
        if (op.dontSynchronize())
        {
          return null;
        }
        /*
         * Check if the provided operation is a repair operation and set the
@@ -181,8 +183,8 @@
  {
    try
    {
      LDAPReplicationDomain domain =
          new LDAPReplicationDomain(configuration, updateToReplayQueue);
      final LDAPReplicationDomain domain = new LDAPReplicationDomain(
          configuration, updateToReplayQueue, dsrsShutdownSync);
      if (domains.size() == 0)
      {
        // Create the threads that will process incoming update messages
@@ -218,9 +220,8 @@
      BlockingQueue<UpdateToReplay> queue)
      throws ConfigException
  {
    LDAPReplicationDomain domain =
        new LDAPReplicationDomain(configuration, queue);
    final LDAPReplicationDomain domain =
        new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
    domains.put(domain.getBaseDN(), domain);
    return domain;
  }
@@ -246,35 +247,30 @@
  /** {@inheritDoc} */
  @Override
  public void initializeSynchronizationProvider(
      ReplicationSynchronizationProviderCfg configuration)
  throws ConfigException
      ReplicationSynchronizationProviderCfg cfg) throws ConfigException
  {
    domains.clear();
    replicationServerListener = new ReplicationServerListener(configuration);
    replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
    // Register as an add and delete listener with the root configuration so we
    // can be notified if Multimaster domain entries are added or removed.
    configuration.addReplicationDomainAddListener(this);
    configuration.addReplicationDomainDeleteListener(this);
    cfg.addReplicationDomainAddListener(this);
    cfg.addReplicationDomainDeleteListener(this);
    // Register as a root configuration listener so that we can be notified if
    // number of replay threads is changed and apply changes.
    configuration.addReplicationChangeListener(this);
    cfg.addReplicationChangeListener(this);
    replayThreadNumber = configuration.getNumUpdateReplayThreads();
    connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
        Integer.MAX_VALUE);
    replayThreadNumber = cfg.getNumUpdateReplayThreads();
    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
    //  Create the list of domains that are already defined.
    for (String name : configuration.listReplicationDomains())
    for (String name : cfg.listReplicationDomains())
    {
      createNewDomain(configuration.getReplicationDomain(name));
      createNewDomain(cfg.getReplicationDomain(name));
    }
    /*
     * If any schema changes were made with the server offline, then handle them
     * now.
     */
    // If any schema changes were made with the server offline, then handle them now.
    List<Modification> offlineSchemaChanges =
         DirectoryServer.getOfflineSchemaChanges();
    if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
@@ -402,12 +398,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationModifyOperation modifyOperation)
  {
    LDAPReplicationDomain domain =
      findDomain(modifyOperation.getEntryDN(), modifyOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(modifyOperation);
    LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(modifyOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -415,12 +411,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationAddOperation addOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(addOperation.getEntryDN(), addOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(addOperation);
    LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(addOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -428,12 +424,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationDeleteOperation deleteOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(deleteOperation.getEntryDN(), deleteOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(deleteOperation);
    LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(deleteOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -441,12 +437,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(modifyDNOperation);
    LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(modifyDNOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -505,7 +501,9 @@
    LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
    if (domain == null || !domain.solveConflict())
    {
      return new SynchronizationProviderResult.ContinueProcessing();
    }
    // The historical object is retrieved from the attachment created
    // in the HandleConflictResolution phase.
@@ -537,11 +535,15 @@
    LDAPReplicationDomain domain =
      findDomain(addOperation.getEntryDN(), addOperation);
    if (domain == null)
    {
      return new SynchronizationProviderResult.ContinueProcessing();
    }
    // For LOCAL op only, generate CSN and attach Context
    if (!addOperation.isSynchronizationOperation())
    {
      domain.doPreOperation(addOperation);
    }
    // Add to the operation the historical attribute : "dn:changeNumber:add"
    EntryHistorical.setHistoricalAttrToOperation(addOperation);
@@ -564,7 +566,9 @@
    stopReplayThreads();
    if (replicationServerListener != null)
    {
      replicationServerListener.shutdown();
    }
    DirectoryServer.deregisterBackupTaskListener(this);
    DirectoryServer.deregisterRestoreTaskListener(this);
@@ -585,10 +589,11 @@
  @Override
  public void processSchemaChange(List<Modification> modifications)
  {
    LDAPReplicationDomain domain =
      findDomain(DirectoryServer.getSchemaDN(), null);
    LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
    if (domain != null)
    {
      domain.synchronizeModifications(modifications);
    }
  }
  /** {@inheritDoc} */
@@ -599,7 +604,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupStart();
      }
    }
  }
@@ -612,7 +619,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupEnd();
      }
    }
  }
@@ -624,7 +633,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.disable();
      }
    }
  }
@@ -637,7 +648,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.enable();
      }
    }
  }
@@ -649,7 +662,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.disable();
      }
    }
  }
@@ -662,7 +677,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.enable();
      }
    }
  }
@@ -674,7 +691,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupStart();
      }
    }
  }
@@ -687,7 +706,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupEnd();
      }
    }
  }
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -126,7 +126,10 @@
    while (firstChange != null && firstChange.isCommitted())
    {
      state.update(firstCSN);
      if (firstChange.getMsg().contributesToDomainState())
      {
        state.update(firstCSN);
      }
      pendingChanges.remove(firstCSN);
      if (pendingChanges.isEmpty())
opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
@@ -22,34 +22,34 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import java.util.List;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.config.ConfigException;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.ResultCode;
/**
 * This class is used to create and object that can
 * register in the admin framework as a listener for changes, add and delete
 * on the ReplicationServer configuration objects.
 *
 */
public class ReplicationServerListener
       implements ConfigurationAddListener<ReplicationServerCfg>,
       ConfigurationDeleteListener<ReplicationServerCfg>
{
  ReplicationServer replicationServer = null;
  private final DSRSShutdownSync dsrsShutdownSync;
  private ReplicationServer replicationServer;
  /**
   * Build a ReplicationServer Listener from the given Multimaster
@@ -57,36 +57,36 @@
   *
   * @param configuration The configuration that will be used to listen
   *                      for replicationServer configuration changes.
   *
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @throws ConfigException if the ReplicationServerListener can't register for
   *                         listening to changes on the provided configuration
   *                         object.
   */
  public ReplicationServerListener(
      ReplicationSynchronizationProviderCfg configuration)
      throws ConfigException
      ReplicationSynchronizationProviderCfg configuration,
      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  {
    configuration.addReplicationServerAddListener(this);
    configuration.addReplicationServerDeleteListener(this);
    this.dsrsShutdownSync = dsrsShutdownSync;
    if (configuration.hasReplicationServer())
    {
      ReplicationServerCfg server = configuration.getReplicationServer();
      replicationServer = new ReplicationServer(server);
      final ReplicationServerCfg cfg = configuration.getReplicationServer();
      replicationServer = new ReplicationServer(cfg, dsrsShutdownSync);
    }
  }
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult applyConfigurationAdd(
      ReplicationServerCfg configuration)
  /** {@inheritDoc} */
  @Override
  public ConfigChangeResult applyConfigurationAdd(ReplicationServerCfg cfg)
  {
    try
    {
      replicationServer = new ReplicationServer(configuration);
      replicationServer = new ReplicationServer(cfg, dsrsShutdownSync);
      return new ConfigChangeResult(ResultCode.SUCCESS, false);
    } catch (ConfigException e)
    }
    catch (ConfigException e)
    {
      // we should never get to this point because the configEntry has
      // already been validated in configAddisAcceptable
@@ -94,14 +94,12 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean isConfigurationAddAcceptable(
      ReplicationServerCfg configuration, List<Message> unacceptableReasons)
      ReplicationServerCfg cfg, List<Message> unacceptableReasons)
  {
    return ReplicationServer.isConfigurationAcceptable(
      configuration, unacceptableReasons);
    return ReplicationServer.isConfigurationAcceptable(cfg, unacceptableReasons);
  }
  /**
@@ -110,14 +108,14 @@
  public void shutdown()
  {
    if (replicationServer != null)
    {
      replicationServer.shutdown();
    }
  }
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult applyConfigurationDelete(
      ReplicationServerCfg configuration)
  /** {@inheritDoc} */
  @Override
  public ConfigChangeResult applyConfigurationDelete(ReplicationServerCfg cfg)
  {
    // There can be only one replicationServer, just shutdown the
    // replicationServer currently configured.
@@ -128,11 +126,10 @@
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean isConfigurationDeleteAcceptable(
      ReplicationServerCfg configuration, List<Message> unacceptableReasons)
      ReplicationServerCfg cfg, List<Message> unacceptableReasons)
  {
    return true;
  }
opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
@@ -106,6 +106,13 @@
  /** {@inheritDoc} */
  @Override
  public boolean contributesToDomainState()
  {
    return false; // replica offline msg MUST NOT update the ds-sync-state
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " offlineCSN=" + csn;
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -294,4 +294,14 @@
  {
    return payload;
  }
  /**
   * Whether the current message can update the "ds-sync-state" attribute.
   *
   * @return true if current message can update the "ds-sync-state" attribute, false otherwise.
   */
  public boolean contributesToDomainState()
  {
    return true;
  }
}
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -35,6 +35,7 @@
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.ECLUpdateMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
@@ -50,7 +51,7 @@
 * This class defines a server writer, which is used to send changes to a
 * directory server.
 */
public class ECLServerWriter extends ServerWriter
class ECLServerWriter extends ServerWriter
{
  /**
   * The tracer object for the debug logger.
@@ -62,7 +63,7 @@
  private final ReplicationServerDomain replicationServerDomain;
  private boolean suspended;
  private volatile boolean shutdown;
  private PersistentSearch mypsearch;
  private final PersistentSearch mypsearch;
  /**
   * Create a ServerWriter.
@@ -72,10 +73,10 @@
   * @param replicationServerDomain the ReplicationServerDomain of this
   *                    ServerWriter.
   */
  public ECLServerWriter(Session session, ECLServerHandler handler,
  ECLServerWriter(Session session, ECLServerHandler handler,
      ReplicationServerDomain replicationServerDomain)
  {
    super(session, handler, replicationServerDomain);
    super(session, handler, replicationServerDomain, new DSRSShutdownSync());
    setName("Replication ECL Writer Thread for operation " +
        handler.getOperationId());
@@ -85,21 +86,26 @@
    this.replicationServerDomain = replicationServerDomain;
    this.suspended = false;
    this.shutdown = false;
    this.mypsearch = findPersistentSearch(handler);
  }
    // Look for the psearch object related to this operation, the one that
    // will be notified with new entries to be returned.
    ECLWorkflowElement wfe =
        (ECLWorkflowElement) DirectoryServer
            .getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
  /**
   * Look for the persistent search object related to this operation, the one
   * that will be notified with new entries to be returned.
   */
  private PersistentSearch findPersistentSearch(ECLServerHandler handler)
  {
    ECLWorkflowElement wfe = (ECLWorkflowElement)
        DirectoryServer.getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
    for (PersistentSearch psearch : wfe.getPersistentSearches())
    {
      if (psearch.getSearchOperation().toString().equals(
          handler.getOperationId()))
      {
        mypsearch = psearch;
        break;
        return psearch;
      }
    }
    return null;
  }
  /**
@@ -107,7 +113,7 @@
   * waiting for the startCLSessionMsg. Then it may be
   * suspended between 2 jobs, each job being a separate search.
   */
  public synchronized void suspendWriter()
  private synchronized void suspendWriter()
  {
    suspended = true;
  }
@@ -115,7 +121,7 @@
  /**
   * Resume the writer.
   */
  public synchronized void resumeWriter()
  synchronized void resumeWriter()
  {
    suspended = false;
    notify();
@@ -187,7 +193,7 @@
   * @throws IOException when raised (connection closure)
   * @throws InterruptedException when raised
   */
  public void doIt() throws IOException, InterruptedException
  private void doIt() throws IOException, InterruptedException
  {
    while (true)
    {
@@ -237,7 +243,7 @@
  /**
   * Shutdown the writer.
   */
  public synchronized void shutdownWriter()
  synchronized void shutdownWriter()
  {
    shutdown = true;
    notify();
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -45,6 +45,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.types.Attributes.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -231,13 +232,10 @@
  public List<Attribute> getMonitorData()
  {
    List<Attribute> attributes = new ArrayList<Attribute>();
    attributes.add(Attributes.create("handler", getMonitorInstanceName()));
    attributes.add(
        Attributes.create("queue-size", String.valueOf(msgQueue.count())));
    attributes.add(
        Attributes.create(
            "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
    attributes.add(Attributes.create("following", String.valueOf(following)));
    attributes.add(create("handler", getMonitorInstanceName()));
    attributes.add(create("queue-size", String.valueOf(msgQueue.count())));
    attributes.add(create("queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
    attributes.add(create("following", String.valueOf(following)));
    return attributes;
  }
@@ -422,21 +420,20 @@
   */
  public CSN getOlderUpdateCSN()
  {
    CSN result = null;
    synchronized (msgQueue)
    {
      if (following)
      {
        if (!msgQueue.isEmpty())
        {
          result = msgQueue.first().getCSN();
          return msgQueue.first().getCSN();
        }
      }
      else
      {
        if (!lateQueue.isEmpty())
        {
          result = lateQueue.first().getCSN();
          return lateQueue.first().getCSN();
        }
        else
        {
@@ -447,11 +444,11 @@
          the lateQueue when it will send the next update but we are not yet
          there. So let's take the last change not sent directly from the db.
          */
          result = findOldestCSNFromReplicaDBs();
          return findOldestCSNFromReplicaDBs();
        }
      }
    }
    return result;
    return null;
  }
  private CSN findOldestCSNFromReplicaDBs()
@@ -460,10 +457,13 @@
    try
    {
      cursor = replicationServerDomain.getCursorFrom(serverState);
      cursor.next();
      if (cursor.getRecord() != null)
      while (cursor.next())
      {
        return cursor.getRecord().getCSN();
        final UpdateMsg record = cursor.getRecord();
        if (record.contributesToDomainState())
        {
          return record.getCSN();
        }
      }
      return null;
    }
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -37,8 +37,8 @@
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.*;
import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.*;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.ConflictBehavior;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
import org.opends.server.api.VirtualAttributeProvider;
@@ -51,9 +51,13 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.file.FileChangelogDB;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.util.StaticUtils;
@@ -82,6 +86,7 @@
  /** The current configuration of this replication server. */
  private ReplicationServerCfg config;
  private final DSRSShutdownSync dsrsShutdownSync;
  /**
   * This table is used to store the list of dn for which we are currently
@@ -126,34 +131,39 @@
  /**
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param configuration The configuration of this replication server.
   * @param cfg The configuration of this replication server.
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(ReplicationServerCfg configuration)
    throws ConfigException
  public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
  {
    this.config = configuration;
    ReplicationDBImplementation dbImpl = configuration.getReplicationDBImplementation();
    if (dbImpl == ReplicationDBImplementation.JE)
    this(cfg, new DSRSShutdownSync());
  }
  /**
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param cfg The configuration of this replication server.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(ReplicationServerCfg cfg,
      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  {
    this.config = cfg;
    this.dsrsShutdownSync = dsrsShutdownSync;
    ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
    if (DebugLogger.debugEnabled())
    {
      if (DebugLogger.debugEnabled())
      {
        TRACER.debugMessage(DebugLogLevel.INFO, "Using JE as DB implementation for changelog DB");
      }
      this.changelogDB = new JEChangelogDB(this, configuration);
      TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl
          + " as DB implementation for changelog DB");
    }
    else
    {
      if (DebugLogger.debugEnabled())
      {
        TRACER.debugMessage(DebugLogLevel.INFO, "Using LOG FILE as DB implementation for changelog DB");
      }
      this.changelogDB = new FileChangelogDB(this, configuration);
    }
    this.changelogDB = dbImpl == ReplicationDBImplementation.JE
        ? new JEChangelogDB(this, cfg)
        : new FileChangelogDB(this, cfg);
    replSessionSecurity = new ReplSessionSecurity();
    initialize();
    configuration.addChangeListener(this);
    cfg.addChangeListener(this);
    localPorts.add(getReplicationPort());
@@ -1227,6 +1237,16 @@
    return this.changelogDB;
  }
  /**
   * Returns the synchronization object for shutdown of combined DS/RS instances.
   *
   * @return the synchronization object for shutdown of combined DS/RS instances.
   */
  DSRSShutdownSync getDSRSShutdownSync()
  {
    return dsrsShutdownSync;
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -68,7 +68,7 @@
  /**
   * The session opened with the remote server.
   */
  protected Session session;
  protected final Session session;
  /**
   * The serverURL of the remote server.
@@ -77,40 +77,39 @@
  /**
   * Number of updates received from the server in assured safe read mode.
   */
  protected int assuredSrReceivedUpdates = 0;
  private int assuredSrReceivedUpdates = 0;
  /**
   * Number of updates received from the server in assured safe read mode that
   * timed out.
   */
  protected AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
  private final AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
  /**
   * Number of updates sent to the server in assured safe read mode.
   */
  protected int assuredSrSentUpdates = 0;
  private int assuredSrSentUpdates = 0;
  /**
   * Number of updates sent to the server in assured safe read mode that timed
   * out.
   */
  protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
  private final AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
  /**
   * Number of updates received from the server in assured safe data mode.
   */
  protected int assuredSdReceivedUpdates = 0;
  private int assuredSdReceivedUpdates = 0;
  /**
   * Number of updates received from the server in assured safe data mode that
   * timed out.
   */
  protected AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
  private final AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
  /**
   * Number of updates sent to the server in assured safe data mode.
   */
  protected int assuredSdSentUpdates = 0;
  private int assuredSdSentUpdates = 0;
  /**
   * Number of updates sent to the server in assured safe data mode that timed
   * out.
   * Number of updates sent to the server in assured safe data mode that timed out.
   */
  protected AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
  private final AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
  /**
   * The associated ServerWriter that sends messages to the remote server.
@@ -301,7 +300,8 @@
      // sendWindow MUST be created before starting the writer
      sendWindow = new Semaphore(sendWindowSize);
      writer = new ServerWriter(session, this, replicationServerDomain);
      writer = new ServerWriter(session, this, replicationServerDomain,
          replicationServer.getDSRSShutdownSync());
      reader = new ServerReader(session, this);
      session.setName("Replication server RS(" + getReplicationServerId()
@@ -626,7 +626,7 @@
   * Increment the number of updates sent to the server in assured safe data
   * mode.
   */
  public void incrementAssuredSdSentUpdates()
  private void incrementAssuredSdSentUpdates()
  {
    assuredSdSentUpdates++;
  }
@@ -662,7 +662,7 @@
   * Increment the number of updates sent to the server in assured safe read
   * mode.
   */
  public void incrementAssuredSrSentUpdates()
  private void incrementAssuredSrSentUpdates()
  {
    assuredSrSentUpdates++;
  }
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -32,8 +32,10 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.service.DSRSShutdownSync;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -55,8 +57,7 @@
  private final Session session;
  private final ServerHandler handler;
  private final ReplicationServerDomain replicationServerDomain;
  private final DSRSShutdownSync dsrsShutdownSync;
  /**
   * Create a ServerWriter. Then ServerWriter then waits on the ServerHandler
@@ -68,9 +69,11 @@
   *          handler for which the ServerWriter is created.
   * @param replicationServerDomain
   *          The ReplicationServerDomain of this ServerWriter.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   */
  public ServerWriter(Session session, ServerHandler handler,
      ReplicationServerDomain replicationServerDomain)
      ReplicationServerDomain replicationServerDomain,
      DSRSShutdownSync dsrsShutdownSync)
  {
    // Session may be null for ECLServerWriter.
    super("Replication server RS(" + handler.getReplicationServerId()
@@ -80,6 +83,7 @@
    this.session = session;
    this.handler = handler;
    this.replicationServerDomain = replicationServerDomain;
    this.dsrsShutdownSync = dsrsShutdownSync;
  }
  /**
@@ -98,7 +102,9 @@
    Message errMessage = null;
    try
    {
      while (true)
      boolean shutdown = false;
      while (!shutdown
          || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
      {
        final UpdateMsg updateMsg = replicationServerDomain.take(this.handler);
        if (updateMsg == null)
@@ -106,12 +112,16 @@
          // this connection is closing
          errMessage = Message.raw(
           "Connection closure: null update returned by domain.");
          return;
          shutdown = true;
        }
        else if (!isUpdateMsgFiltered(updateMsg))
        {
          // Publish the update to the remote server using a protocol version it supports
          session.publish(updateMsg);
          if (updateMsg instanceof ReplicaOfflineMsg)
          {
            dsrsShutdownSync.replicaOfflineMsgForwarded(replicationServerDomain.getBaseDN());
          }
        }
      }
    }
opends/src/server/org/opends/server/replication/service/DSRSShutdownSync.java
New file
@@ -0,0 +1,85 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.service;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.server.types.DN;
/**
 * Class useful for the case where DS/RS instances are collocated inside the
 * same JVM. It synchronizes the shutdown of the DS and RS sides.
 * <p>
 * More specifically, it ensures a ReplicaOfflineMsg sent by the DS is
 * relayed/forwarded by the collocated RS to the other RSs in the topology
 * before the whole process shuts down.
 *
 * @since OPENDJ-1453
 */
public class DSRSShutdownSync
{
  private static final ConcurrentSkipListSet<DN> replicaOfflineMsgs =
      new ConcurrentSkipListSet<DN>();
  private static AtomicLong stopInstanceTimestamp = new AtomicLong();
  /**
   * Message has been sent.
   *
   * @param baseDN
   *          the domain for which the message has been sent
   */
  public void replicaOfflineMsgSent(DN baseDN)
  {
    stopInstanceTimestamp.compareAndSet(0, System.currentTimeMillis());
    replicaOfflineMsgs.add(baseDN);
  }
  /**
   * Message has been forwarded.
   *
   * @param baseDN
   *          the domain for which the message has been sent
   */
  public void replicaOfflineMsgForwarded(DN baseDN)
  {
    replicaOfflineMsgs.remove(baseDN);
  }
  /**
   * Whether a ReplicationServer ServerReader or ServerWriter can proceed with
   * shutdown.
   *
   * @param baseDN
   *          the baseDN of the ServerReader or ServerWriter .
   * @return true if the caller can shutdown, false otherwise
   */
  public boolean canShutdown(DN baseDN)
  {
    return !replicaOfflineMsgs.contains(baseDN)
        || System.currentTimeMillis() - stopInstanceTimestamp.get() > 5000;
  }
}
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -3005,7 +3005,8 @@
              // The server is shutting down.
              listenerThread.initiateShutdown();
            }
            else if (processUpdate(updateMsg))
            else if (processUpdate(updateMsg)
                && updateMsg.contributesToDomainState())
            {
              /*
               * Warning: in synchronous mode, no way to tell the replay of an
@@ -3426,9 +3427,11 @@
   */
  public void publish(UpdateMsg msg)
  {
    // Publish the update
    broker.publish(msg);
    state.update(msg.getCSN());
    if (msg.contributesToDomainState())
    {
      state.update(msg.getCSN());
    }
    numSentUpdates.incrementAndGet();
  }