mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.54.2014 80054b8dde903071ee455c05f08e9cc0c02a56f4
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -70,6 +70,7 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
@@ -185,6 +186,7 @@
  public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private final DSRSShutdownSync dsrsShutdownSync;
  /**
   * The update to replay message queue where the listener thread is going to
   * push incoming update messages.
@@ -455,14 +457,17 @@
   *
   * @param configuration    The configuration of this ReplicationDomain.
   * @param updateToReplayQueue The queue for update messages to replay.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @throws ConfigException In case of invalid configuration.
   */
  LDAPReplicationDomain(ReplicationDomainCfg configuration,
      BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException
      BlockingQueue<UpdateToReplay> updateToReplayQueue,
      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  {
    super(configuration, -1);
    this.updateToReplayQueue = updateToReplayQueue;
    this.dsrsShutdownSync = dsrsShutdownSync;
    // Get assured configuration
    readAssuredConfig(configuration, false);
@@ -2011,6 +2016,7 @@
  public void publishReplicaOfflineMsg()
  {
    pendingChanges.putReplicaOfflineMsg();
    dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN());
  }
  /**
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -43,14 +43,14 @@
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.api.*;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.types.operation.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -73,9 +73,10 @@
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private ReplicationServerListener replicationServerListener = null;
  private ReplicationServerListener replicationServerListener;
  private static final Map<DN, LDAPReplicationDomain> domains =
    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
      new ConcurrentHashMap<DN, LDAPReplicationDomain>(4);
  private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
  /**
   * The queue of received update messages, to be treated by the ReplayThread
@@ -119,8 +120,7 @@
   *                   Can be null is the request has no associated operation.
   * @return           The domain for this DN.
   */
  public static LDAPReplicationDomain findDomain(
      DN dn, PluginOperation pluginOp)
  public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
  {
    /*
     * Don't run the special replication code on Operation that are
@@ -130,7 +130,9 @@
    {
        final Operation op = (Operation) pluginOp;
        if (op.dontSynchronize())
        {
          return null;
        }
        /*
         * Check if the provided operation is a repair operation and set the
@@ -187,8 +189,8 @@
  {
    try
    {
      LDAPReplicationDomain domain =
          new LDAPReplicationDomain(configuration, updateToReplayQueue);
      final LDAPReplicationDomain domain = new LDAPReplicationDomain(
          configuration, updateToReplayQueue, dsrsShutdownSync);
      if (domains.size() == 0)
      {
        // Create the threads that will process incoming update messages
@@ -223,9 +225,8 @@
      BlockingQueue<UpdateToReplay> queue)
      throws ConfigException
  {
    LDAPReplicationDomain domain =
        new LDAPReplicationDomain(configuration, queue);
    final LDAPReplicationDomain domain =
        new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
    domains.put(domain.getBaseDN(), domain);
    return domain;
  }
@@ -251,35 +252,30 @@
  /** {@inheritDoc} */
  @Override
  public void initializeSynchronizationProvider(
      ReplicationSynchronizationProviderCfg configuration)
  throws org.forgerock.opendj.config.server.ConfigException
      ReplicationSynchronizationProviderCfg cfg) throws ConfigException
  {
    domains.clear();
    replicationServerListener = new ReplicationServerListener(configuration);
    replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
    // Register as an add and delete listener with the root configuration so we
    // can be notified if Multimaster domain entries are added or removed.
    configuration.addReplicationDomainAddListener(this);
    configuration.addReplicationDomainDeleteListener(this);
    cfg.addReplicationDomainAddListener(this);
    cfg.addReplicationDomainDeleteListener(this);
    // Register as a root configuration listener so that we can be notified if
    // number of replay threads is changed and apply changes.
    configuration.addReplicationChangeListener(this);
    cfg.addReplicationChangeListener(this);
    replayThreadNumber = configuration.getNumUpdateReplayThreads();
    connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
        Integer.MAX_VALUE);
    replayThreadNumber = cfg.getNumUpdateReplayThreads();
    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
    //  Create the list of domains that are already defined.
    for (String name : configuration.listReplicationDomains())
    for (String name : cfg.listReplicationDomains())
    {
      createNewDomain(configuration.getReplicationDomain(name));
      createNewDomain(cfg.getReplicationDomain(name));
    }
    /*
     * If any schema changes were made with the server offline, then handle them
     * now.
     */
    // If any schema changes were made with the server offline, then handle them now.
    List<Modification> offlineSchemaChanges =
         DirectoryServer.getOfflineSchemaChanges();
    if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
@@ -407,12 +403,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationModifyOperation modifyOperation)
  {
    LDAPReplicationDomain domain =
      findDomain(modifyOperation.getEntryDN(), modifyOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(modifyOperation);
    LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(modifyOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -420,12 +416,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationAddOperation addOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(addOperation.getEntryDN(), addOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(addOperation);
    LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(addOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -433,12 +429,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationDeleteOperation deleteOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(deleteOperation.getEntryDN(), deleteOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(deleteOperation);
    LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(deleteOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -446,12 +442,12 @@
  public SynchronizationProviderResult handleConflictResolution(
      PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
  {
    LDAPReplicationDomain domain =
      findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
    if (domain == null)
      return new SynchronizationProviderResult.ContinueProcessing();
    return domain.handleConflictResolution(modifyDNOperation);
    LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
    if (domain != null)
    {
      return domain.handleConflictResolution(modifyDNOperation);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
  /** {@inheritDoc} */
@@ -510,7 +506,9 @@
    LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
    if (domain == null || !domain.solveConflict())
    {
      return new SynchronizationProviderResult.ContinueProcessing();
    }
    // The historical object is retrieved from the attachment created
    // in the HandleConflictResolution phase.
@@ -542,11 +540,15 @@
    LDAPReplicationDomain domain =
      findDomain(addOperation.getEntryDN(), addOperation);
    if (domain == null)
    {
      return new SynchronizationProviderResult.ContinueProcessing();
    }
    // For LOCAL op only, generate CSN and attach Context
    if (!addOperation.isSynchronizationOperation())
    {
      domain.doPreOperation(addOperation);
    }
    // Add to the operation the historical attribute : "dn:changeNumber:add"
    EntryHistorical.setHistoricalAttrToOperation(addOperation);
@@ -569,7 +571,9 @@
    stopReplayThreads();
    if (replicationServerListener != null)
    {
      replicationServerListener.shutdown();
    }
    DirectoryServer.deregisterBackupTaskListener(this);
    DirectoryServer.deregisterRestoreTaskListener(this);
@@ -590,10 +594,11 @@
  @Override
  public void processSchemaChange(List<Modification> modifications)
  {
    LDAPReplicationDomain domain =
      findDomain(DirectoryServer.getSchemaDN(), null);
    LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
    if (domain != null)
    {
      domain.synchronizeModifications(modifications);
    }
  }
  /** {@inheritDoc} */
@@ -604,7 +609,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupStart();
      }
    }
  }
@@ -617,7 +624,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupEnd();
      }
    }
  }
@@ -629,7 +638,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.disable();
      }
    }
  }
@@ -642,7 +653,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.enable();
      }
    }
  }
@@ -654,7 +667,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.disable();
      }
    }
  }
@@ -667,7 +682,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.enable();
      }
    }
  }
@@ -679,7 +696,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupStart();
      }
    }
  }
@@ -692,7 +711,9 @@
    {
      LDAPReplicationDomain domain = findDomain(dn, null);
      if (domain != null)
      {
        domain.backupEnd();
      }
    }
  }
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -126,7 +126,10 @@
    while (firstChange != null && firstChange.isCommitted())
    {
      state.update(firstCSN);
      if (firstChange.getMsg().contributesToDomainState())
      {
        state.update(firstCSN);
      }
      pendingChanges.remove(firstCSN);
      if (pendingChanges.isEmpty())
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
@@ -26,30 +26,30 @@
 */
package org.opends.server.replication.plugin;
import org.forgerock.i18n.LocalizableMessage;
import java.util.List;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.ConfigChangeResult;
import org.forgerock.opendj.ldap.ResultCode;
/**
 * This class is used to create and object that can
 * register in the admin framework as a listener for changes, add and delete
 * on the ReplicationServer configuration objects.
 *
 */
public class ReplicationServerListener
       implements ConfigurationAddListener<ReplicationServerCfg>,
       ConfigurationDeleteListener<ReplicationServerCfg>
{
  private final DSRSShutdownSync dsrsShutdownSync;
  private ReplicationServer replicationServer;
  /**
@@ -58,36 +58,36 @@
   *
   * @param configuration The configuration that will be used to listen
   *                      for replicationServer configuration changes.
   *
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @throws ConfigException if the ReplicationServerListener can't register for
   *                         listening to changes on the provided configuration
   *                         object.
   */
  public ReplicationServerListener(
      ReplicationSynchronizationProviderCfg configuration)
      throws ConfigException
      ReplicationSynchronizationProviderCfg configuration,
      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  {
    configuration.addReplicationServerAddListener(this);
    configuration.addReplicationServerDeleteListener(this);
    this.dsrsShutdownSync = dsrsShutdownSync;
    if (configuration.hasReplicationServer())
    {
      ReplicationServerCfg server = configuration.getReplicationServer();
      replicationServer = new ReplicationServer(server);
      final ReplicationServerCfg cfg = configuration.getReplicationServer();
      replicationServer = new ReplicationServer(cfg, dsrsShutdownSync);
    }
  }
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult applyConfigurationAdd(
      ReplicationServerCfg configuration)
  /** {@inheritDoc} */
  @Override
  public ConfigChangeResult applyConfigurationAdd(ReplicationServerCfg cfg)
  {
    try
    {
      replicationServer = new ReplicationServer(configuration);
      replicationServer = new ReplicationServer(cfg, dsrsShutdownSync);
      return new ConfigChangeResult(ResultCode.SUCCESS, false);
    } catch (ConfigException e)
    }
    catch (ConfigException e)
    {
      // we should never get to this point because the configEntry has
      // already been validated in configAddisAcceptable
@@ -95,14 +95,12 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean isConfigurationAddAcceptable(
      ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
      ReplicationServerCfg cfg, List<LocalizableMessage> unacceptableReasons)
  {
    return ReplicationServer.isConfigurationAcceptable(
      configuration, unacceptableReasons);
    return ReplicationServer.isConfigurationAcceptable(cfg, unacceptableReasons);
  }
  /**
@@ -111,14 +109,14 @@
  public void shutdown()
  {
    if (replicationServer != null)
    {
      replicationServer.shutdown();
    }
  }
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult applyConfigurationDelete(
      ReplicationServerCfg configuration)
  /** {@inheritDoc} */
  @Override
  public ConfigChangeResult applyConfigurationDelete(ReplicationServerCfg cfg)
  {
    // There can be only one replicationServer, just shutdown the
    // replicationServer currently configured.
@@ -129,11 +127,10 @@
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean isConfigurationDeleteAcceptable(
      ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
      ReplicationServerCfg cfg, List<LocalizableMessage> unacceptableReasons)
  {
    return true;
  }
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
@@ -106,6 +106,13 @@
  /** {@inheritDoc} */
  @Override
  public boolean contributesToDomainState()
  {
    return false; // replica offline msg MUST NOT update the ds-sync-state
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " offlineCSN=" + csn;
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -294,4 +294,14 @@
  {
    return payload;
  }
  /**
   * Whether the current message can update the "ds-sync-state" attribute.
   *
   * @return true if current message can update the "ds-sync-state" attribute, false otherwise.
   */
  public boolean contributesToDomainState()
  {
    return true;
  }
}
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -35,6 +35,7 @@
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.ECLUpdateMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
@@ -47,7 +48,7 @@
 * This class defines a server writer, which is used to send changes to a
 * directory server.
 */
public class ECLServerWriter extends ServerWriter
class ECLServerWriter extends ServerWriter
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
@@ -56,7 +57,7 @@
  private final ReplicationServerDomain replicationServerDomain;
  private boolean suspended;
  private volatile boolean shutdown;
  private PersistentSearch mypsearch;
  private final PersistentSearch mypsearch;
  /**
   * Create a ServerWriter.
@@ -66,10 +67,10 @@
   * @param replicationServerDomain the ReplicationServerDomain of this
   *                    ServerWriter.
   */
  public ECLServerWriter(Session session, ECLServerHandler handler,
  ECLServerWriter(Session session, ECLServerHandler handler,
      ReplicationServerDomain replicationServerDomain)
  {
    super(session, handler, replicationServerDomain);
    super(session, handler, replicationServerDomain, new DSRSShutdownSync());
    setName("Replication ECL Writer Thread for operation " +
        handler.getOperationId());
@@ -79,21 +80,26 @@
    this.replicationServerDomain = replicationServerDomain;
    this.suspended = false;
    this.shutdown = false;
    this.mypsearch = findPersistentSearch(handler);
  }
    // Look for the psearch object related to this operation, the one that
    // will be notified with new entries to be returned.
    ECLWorkflowElement wfe =
        (ECLWorkflowElement) DirectoryServer
            .getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
  /**
   * Look for the persistent search object related to this operation, the one
   * that will be notified with new entries to be returned.
   */
  private PersistentSearch findPersistentSearch(ECLServerHandler handler)
  {
    ECLWorkflowElement wfe = (ECLWorkflowElement)
        DirectoryServer.getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
    for (PersistentSearch psearch : wfe.getPersistentSearches())
    {
      if (psearch.getSearchOperation().toString().equals(
          handler.getOperationId()))
      {
        mypsearch = psearch;
        break;
        return psearch;
      }
    }
    return null;
  }
  /**
@@ -101,7 +107,7 @@
   * waiting for the startCLSessionMsg. Then it may be
   * suspended between 2 jobs, each job being a separate search.
   */
  public synchronized void suspendWriter()
  private synchronized void suspendWriter()
  {
    suspended = true;
  }
@@ -109,7 +115,7 @@
  /**
   * Resume the writer.
   */
  public synchronized void resumeWriter()
  synchronized void resumeWriter()
  {
    suspended = false;
    notify();
@@ -180,7 +186,7 @@
   * @throws IOException when raised (connection closure)
   * @throws InterruptedException when raised
   */
  public void doIt() throws IOException, InterruptedException
  private void doIt() throws IOException, InterruptedException
  {
    while (true)
    {
@@ -230,7 +236,7 @@
  /**
   * Shutdown the writer.
   */
  public synchronized void shutdownWriter()
  synchronized void shutdownWriter()
  {
    shutdown = true;
    notify();
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -44,6 +44,7 @@
import org.opends.server.types.*;
import org.forgerock.opendj.ldap.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.types.Attributes.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -228,13 +229,10 @@
  public List<Attribute> getMonitorData()
  {
    List<Attribute> attributes = new ArrayList<Attribute>();
    attributes.add(Attributes.create("handler", getMonitorInstanceName()));
    attributes.add(
        Attributes.create("queue-size", String.valueOf(msgQueue.count())));
    attributes.add(
        Attributes.create(
            "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
    attributes.add(Attributes.create("following", String.valueOf(following)));
    attributes.add(create("handler", getMonitorInstanceName()));
    attributes.add(create("queue-size", String.valueOf(msgQueue.count())));
    attributes.add(create("queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
    attributes.add(create("following", String.valueOf(following)));
    return attributes;
  }
@@ -419,21 +417,20 @@
   */
  public CSN getOlderUpdateCSN()
  {
    CSN result = null;
    synchronized (msgQueue)
    {
      if (following)
      {
        if (!msgQueue.isEmpty())
        {
          result = msgQueue.first().getCSN();
          return msgQueue.first().getCSN();
        }
      }
      else
      {
        if (!lateQueue.isEmpty())
        {
          result = lateQueue.first().getCSN();
          return lateQueue.first().getCSN();
        }
        else
        {
@@ -444,11 +441,11 @@
          the lateQueue when it will send the next update but we are not yet
          there. So let's take the last change not sent directly from the db.
          */
          result = findOldestCSNFromReplicaDBs();
          return findOldestCSNFromReplicaDBs();
        }
      }
    }
    return result;
    return null;
  }
  private CSN findOldestCSNFromReplicaDBs()
@@ -457,10 +454,13 @@
    try
    {
      cursor = replicationServerDomain.getCursorFrom(serverState);
      cursor.next();
      if (cursor.getRecord() != null)
      while (cursor.next())
      {
        return cursor.getRecord().getCSN();
        final UpdateMsg record = cursor.getRecord();
        if (record.contributesToDomainState())
        {
          return record.getCSN();
        }
      }
      return null;
    }
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -54,6 +54,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -79,6 +80,7 @@
  /** The current configuration of this replication server. */
  private ReplicationServerCfg config;
  private final DSRSShutdownSync dsrsShutdownSync;
  /**
   * This table is used to store the list of dn for which we are currently
@@ -122,18 +124,31 @@
  /**
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param configuration The configuration of this replication server.
   * @param cfg The configuration of this replication server.
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(ReplicationServerCfg configuration)
    throws ConfigException
  public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
  {
    this.config = configuration;
    this.changelogDB = new JEChangelogDB(this, configuration);
    this(cfg, new DSRSShutdownSync());
  }
  /**
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param cfg The configuration of this replication server.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(ReplicationServerCfg cfg,
      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  {
    this.config = cfg;
    this.changelogDB = new JEChangelogDB(this, cfg);
    this.dsrsShutdownSync = dsrsShutdownSync;
    replSessionSecurity = new ReplSessionSecurity();
    initialize();
    configuration.addChangeListener(this);
    cfg.addChangeListener(this);
    localPorts.add(getReplicationPort());
@@ -1183,6 +1198,16 @@
    return this.changelogDB;
  }
  /**
   * Returns the synchronization object for shutdown of combined DS/RS instances.
   *
   * @return the synchronization object for shutdown of combined DS/RS instances.
   */
  DSRSShutdownSync getDSRSShutdownSync()
  {
    return dsrsShutdownSync;
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -72,7 +72,7 @@
  /**
   * The session opened with the remote server.
   */
  protected Session session;
  protected final Session session;
  /**
   * The serverURL of the remote server.
@@ -81,40 +81,39 @@
  /**
   * Number of updates received from the server in assured safe read mode.
   */
  protected int assuredSrReceivedUpdates = 0;
  private int assuredSrReceivedUpdates = 0;
  /**
   * Number of updates received from the server in assured safe read mode that
   * timed out.
   */
  protected AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
  private final AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
  /**
   * Number of updates sent to the server in assured safe read mode.
   */
  protected int assuredSrSentUpdates = 0;
  private int assuredSrSentUpdates = 0;
  /**
   * Number of updates sent to the server in assured safe read mode that timed
   * out.
   */
  protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
  private final AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
  /**
   * Number of updates received from the server in assured safe data mode.
   */
  protected int assuredSdReceivedUpdates = 0;
  private int assuredSdReceivedUpdates = 0;
  /**
   * Number of updates received from the server in assured safe data mode that
   * timed out.
   */
  protected AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
  private final AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
  /**
   * Number of updates sent to the server in assured safe data mode.
   */
  protected int assuredSdSentUpdates = 0;
  private int assuredSdSentUpdates = 0;
  /**
   * Number of updates sent to the server in assured safe data mode that timed
   * out.
   * Number of updates sent to the server in assured safe data mode that timed out.
   */
  protected AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
  private final AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
  /**
   * The associated ServerWriter that sends messages to the remote server.
@@ -305,7 +304,8 @@
      // sendWindow MUST be created before starting the writer
      sendWindow = new Semaphore(sendWindowSize);
      writer = new ServerWriter(session, this, replicationServerDomain);
      writer = new ServerWriter(session, this, replicationServerDomain,
          replicationServer.getDSRSShutdownSync());
      reader = new ServerReader(session, this);
      session.setName("Replication server RS(" + getReplicationServerId()
@@ -630,7 +630,7 @@
   * Increment the number of updates sent to the server in assured safe data
   * mode.
   */
  public void incrementAssuredSdSentUpdates()
  private void incrementAssuredSdSentUpdates()
  {
    assuredSdSentUpdates++;
  }
@@ -666,7 +666,7 @@
   * Increment the number of updates sent to the server in assured safe read
   * mode.
   */
  public void incrementAssuredSrSentUpdates()
  private void incrementAssuredSrSentUpdates()
  {
    assuredSrSentUpdates++;
  }
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -32,8 +32,10 @@
import org.opends.server.api.DirectoryThread;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.service.DSRSShutdownSync;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.common.ServerStatus.*;
@@ -50,8 +52,7 @@
  private final Session session;
  private final ServerHandler handler;
  private final ReplicationServerDomain replicationServerDomain;
  private final DSRSShutdownSync dsrsShutdownSync;
  /**
   * Create a ServerWriter. Then ServerWriter then waits on the ServerHandler
@@ -63,9 +64,11 @@
   *          handler for which the ServerWriter is created.
   * @param replicationServerDomain
   *          The ReplicationServerDomain of this ServerWriter.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   */
  public ServerWriter(Session session, ServerHandler handler,
      ReplicationServerDomain replicationServerDomain)
      ReplicationServerDomain replicationServerDomain,
      DSRSShutdownSync dsrsShutdownSync)
  {
    // Session may be null for ECLServerWriter.
    super("Replication server RS(" + handler.getReplicationServerId()
@@ -75,6 +78,7 @@
    this.session = session;
    this.handler = handler;
    this.replicationServerDomain = replicationServerDomain;
    this.dsrsShutdownSync = dsrsShutdownSync;
  }
  /**
@@ -93,7 +97,9 @@
    LocalizableMessage errMessage = null;
    try
    {
      while (true)
      boolean shutdown = false;
      while (!shutdown
          || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
      {
        final UpdateMsg updateMsg = replicationServerDomain.take(this.handler);
        if (updateMsg == null)
@@ -101,12 +107,16 @@
          // this connection is closing
          errMessage = LocalizableMessage.raw(
           "Connection closure: null update returned by domain.");
          return;
          shutdown = true;
        }
        else if (!isUpdateMsgFiltered(updateMsg))
        {
          // Publish the update to the remote server using a protocol version it supports
          session.publish(updateMsg);
          if (updateMsg instanceof ReplicaOfflineMsg)
          {
            dsrsShutdownSync.replicaOfflineMsgForwarded(replicationServerDomain.getBaseDN());
          }
        }
      }
    }
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/DSRSShutdownSync.java
New file
@@ -0,0 +1,85 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.service;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.server.types.DN;
/**
 * Class useful for the case where DS/RS instances are collocated inside the
 * same JVM. It synchronizes the shutdown of the DS and RS sides.
 * <p>
 * More specifically, it ensures a ReplicaOfflineMsg sent by the DS is
 * relayed/forwarded by the collocated RS to the other RSs in the topology
 * before the whole process shuts down.
 *
 * @since OPENDJ-1453
 */
public class DSRSShutdownSync
{
  private static final ConcurrentSkipListSet<DN> replicaOfflineMsgs =
      new ConcurrentSkipListSet<DN>();
  private static AtomicLong stopInstanceTimestamp = new AtomicLong();
  /**
   * Message has been sent.
   *
   * @param baseDN
   *          the domain for which the message has been sent
   */
  public void replicaOfflineMsgSent(DN baseDN)
  {
    stopInstanceTimestamp.compareAndSet(0, System.currentTimeMillis());
    replicaOfflineMsgs.add(baseDN);
  }
  /**
   * Message has been forwarded.
   *
   * @param baseDN
   *          the domain for which the message has been sent
   */
  public void replicaOfflineMsgForwarded(DN baseDN)
  {
    replicaOfflineMsgs.remove(baseDN);
  }
  /**
   * Whether a ReplicationServer ServerReader or ServerWriter can proceed with
   * shutdown.
   *
   * @param baseDN
   *          the baseDN of the ServerReader or ServerWriter .
   * @return true if the caller can shutdown, false otherwise
   */
  public boolean canShutdown(DN baseDN)
  {
    return !replicaOfflineMsgs.contains(baseDN)
        || System.currentTimeMillis() - stopInstanceTimestamp.get() > 5000;
  }
}
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -2974,7 +2974,8 @@
              // The server is shutting down.
              listenerThread.initiateShutdown();
            }
            else if (processUpdate(updateMsg))
            else if (processUpdate(updateMsg)
                && updateMsg.contributesToDomainState())
            {
              /*
               * Warning: in synchronous mode, no way to tell the replay of an
@@ -3393,9 +3394,11 @@
   */
  public void publish(UpdateMsg msg)
  {
    // Publish the update
    broker.publish(msg);
    state.update(msg.getCSN());
    if (msg.contributesToDomainState())
    {
      state.update(msg.getCSN());
    }
    numSentUpdates.incrementAndGet();
  }