mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.30.2014 e33ecc4dd66d4c514cd7ad52d348a0be63a7f1eb
Increased encapsulation of ReplicationDomain.IEContext + moved more behaviour to it.
Reduced members' visibilities in LDAPReplicationDomain + removed several methods.


ReplicationDomain.java:
Do not access the IEContext field several times in one method in case it gets updated in the middle, instead accessed it once at the top of a method and passed it down private method calls.
Moved importInProgress(), getTotalEntryCount(), getLeftEntryCount() to IEContext class.
Added getImportExportContext().

LDAPReplicationDomain.java:
Reduced visibility of many methods.
Removed followImport field + setFollowImport() which can be calculated from importErrorMessageId + added and used isFollowImport() instead.
Replaced calls to "retrievesBackend(getBaseDN())" with getBackend() + inlined retrievesBackend().
Called getImportExportContext() to access the IEContext.

FractionalLDIFImportPlugin.java:
Consequence of removing LDAPReplicationDomain.setFollowImport().
In doLDIFImport(), factorized code between branches of an if statement + extracted method isNotEmpty().
In flushFractionalConfigIntoEntry(), used StaticUtils.collectionToString() + extracted method add()

ReplicationMonitor.java:
In getMonitorData(), simplified the code.

ReplicationDomainTest.java:
Extracted methods waitEndExport(), assertExportSucessful(), buildExportedData().

*.java:
Updated copyright years for r10098.
13 files modified
690 ■■■■■ changed files
opends/src/server/org/opends/server/replication/common/DSInfo.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java 129 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 126 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 321 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java 25 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 73 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.common;
opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
@@ -42,6 +42,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.plugin.LDAPReplicationDomain.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class implements a Directory Server plugin that is used in fractional
@@ -61,7 +62,7 @@
{
  /**
   * Holds the fractional configuration and if available the replication domain
   * matching this import session (they form the importfractional context).
   * matching this import session (they form the import fractional context).
   * Domain is available if the server is online (import-ldif, online full
   * update..) otherwise, this is an import-ldif with server off. The key is the
   * ImportConfig object of the session which acts as a cookie for the whole
@@ -126,9 +127,7 @@
    super();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override()
  public final void initializePlugin(Set<PluginType> pluginTypes,
    FractionalLDIFImportPluginCfg configuration)
@@ -153,9 +152,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override()
  public final void finalizePlugin()
  {
@@ -217,9 +214,7 @@
    return FractionalConfig.toFractionalConfig(matchingReplicatedDomainCfg);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override()
  public final void doLDIFImportEnd(LDIFImportConfig importConfig)
  {
@@ -343,26 +338,20 @@
      }
      // Compare backend and local fractional configuration
      boolean sameConfig =
        isFractionalConfigConsistent(localFractionalConfig, exclIt, inclIt);
      if (isFractionalConfigConsistent(localFractionalConfig, exclIt, inclIt))
      {
        // local and remote non/fractional config are equivalent :
        // follow import, no need to go with filtering as remote backend
        // should be ok
        // let import finish
        return PluginResult.ImportLDIF.continueEntryProcessing();
      }
      if (localFractionalConfig.isFractional())
      {
        // Local domain is fractional
        if (sameConfig)
        {
          // Both local and remote fractional configuration are equivalent :
          // follow import, no need to go with filtering as remote backend
          // should be ok
          return PluginResult.ImportLDIF.continueEntryProcessing();
        }
        // Local domain is fractional, remote domain has not same config
        boolean remoteDomainHasSomeConfig = false;
        if ((exclAttr != null && (exclAttr.size() > 0))
            || (inclAttr != null && (inclAttr.size() > 0)))
        {
          remoteDomainHasSomeConfig = true;
        }
        boolean remoteDomainHasSomeConfig =
            isNotEmpty(exclAttr) || isNotEmpty(inclAttr);
        if (remoteDomainHasSomeConfig)
        {
          LDAPReplicationDomain domain = importFractionalContext.getDomain();
@@ -372,7 +361,6 @@
            // is different : stop import (error will be logged when import is
            // stopped)
            domain.setImportErrorMessageId(IMPORT_ERROR_MESSAGE_BAD_REMOTE);
            domain.setFollowImport(false);
            return PluginResult.ImportLDIF.stopEntryProcessing(null);
          }
@@ -384,16 +372,10 @@
        // Local domain is fractional but remote domain has no config :
        // flush local config into root entry and follow import with filtering
        flushFractionalConfigIntoEntry(localFractionalConfig, entry);
      } else
      }
      else
      {
        // Local domain is not fractional
        if (sameConfig)
        {
          // None of the local or remote domain has fractional config : nothing
          // more to do : let import finish
          return PluginResult.ImportLDIF.continueEntryProcessing();
        }
        LDAPReplicationDomain domain = importFractionalContext.getDomain();
        if (domain != null)
        {
@@ -401,7 +383,6 @@
          //local domain should be configured with the same config as remote one
          domain.setImportErrorMessageId(
              IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL);
          domain.setFollowImport(false);
          return PluginResult.ImportLDIF.stopEntryProcessing(null);
        }
@@ -420,6 +401,11 @@
    return PluginResult.ImportLDIF.continueEntryProcessing();
  }
  private boolean isNotEmpty(Attribute attr)
  {
    return attr != null && attr.size() > 0;
  }
  private Attribute getAttribute(String attributeName, Entry entry)
  {
    AttributeType attrType = DirectoryServer.getAttributeType(attributeName);
@@ -459,51 +445,19 @@
      String fractAttribute = fractionalExclusive ?
          REPLICATION_FRACTIONAL_EXCLUDE : REPLICATION_FRACTIONAL_INCLUDE;
      AttributeBuilder attrBuilder = new AttributeBuilder(fractAttribute);
      boolean somethingToFlush = false;
      // Add attribute values for all classes
      int size = fractionalAllClassesAttributes.size();
      if (size > 0)
      {
        String fracValue = "*:";
        int i = 1;
        for (String attrName : fractionalAllClassesAttributes)
        {
          fracValue += attrName;
          if (i < size)
          {
            fracValue += ",";
          }
          i++;
        }
        somethingToFlush = true;
        attrBuilder.add(fracValue);
      }
      boolean somethingToFlush =
          add(attrBuilder, "*", fractionalAllClassesAttributes);
      // Add attribute values for specific classes
      size = fractionalSpecificClassesAttributes.size();
      if (size > 0)
      if (fractionalSpecificClassesAttributes.size() > 0)
      {
        for (String className : fractionalSpecificClassesAttributes.keySet())
        for (Map.Entry<String, Set<String>> specific
            : fractionalSpecificClassesAttributes.entrySet())
        {
          int valuesSize =
            fractionalSpecificClassesAttributes.get(className).size();
          if (valuesSize > 0)
          if (add(attrBuilder, specific.getKey(), specific.getValue()))
          {
            String fracValue = className + ":";
            int i = 1;
            for (String attrName : fractionalSpecificClassesAttributes.get(
              className))
            {
              fracValue += attrName;
              if (i < valuesSize)
              {
                fracValue += ",";
              }
              i++;
            }
            somethingToFlush = true;
            attrBuilder.add(fracValue);
          }
        }
      }
@@ -517,9 +471,18 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  private static boolean add(AttributeBuilder attrBuilder, String className,
      Set<String> values)
  {
    if (values.size() > 0)
    {
      attrBuilder.add(className + ":" + collectionToString(values, ","));
      return true;
    }
    return false;
  }
  /** {@inheritDoc} */
  @Override()
  public boolean isConfigurationAcceptable(PluginCfg configuration,
    List<Message> unacceptableReasons)
@@ -527,9 +490,7 @@
    return true;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean isConfigurationChangeAcceptable(
    FractionalLDIFImportPluginCfg configuration,
@@ -538,9 +499,7 @@
    return true;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public ConfigChangeResult applyConfigurationChange(
    FractionalLDIFImportPluginCfg configuration)
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
@@ -41,7 +41,7 @@
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.AlertGenerator;
@@ -71,7 +71,7 @@
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import org.opends.server.workflowelement.localbackend.*;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
@@ -285,13 +285,6 @@
  private boolean forceBadDataSet = false;
  /**
   * This flag is used by the fractional replication ldif import plugin to
   * stop the (online) import process if a fractional configuration
   * inconsistency is detected by it.
   */
  private boolean followImport = true;
  /**
   * The message id to be used when an import is stopped with error by
   * the fractional replication ldif import plugin.
   */
@@ -481,7 +474,7 @@
    storeECLConfiguration(configuration);
    solveConflictFlag = isSolveConflict(configuration);
    Backend backend = retrievesBackend(getBaseDN());
    Backend backend = getBackend();
    if (backend == null)
    {
      throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
@@ -544,20 +537,22 @@
   * error by the fractional replication ldif import plugin.
   * @param importErrorMessageId The message to use.
   */
  public void setImportErrorMessageId(int importErrorMessageId)
  void setImportErrorMessageId(int importErrorMessageId)
  {
    this.importErrorMessageId = importErrorMessageId;
  }
  /**
   * Sets the boolean telling if the online import currently in progress should
   * continue.
   * @param followImport The boolean telling if the online import currently in
   * progress should continue.
   * This flag is used by the fractional replication ldif import plugin to stop
   * the (online) import process if a fractional configuration inconsistency is
   * detected by it.
   *
   * @return true if the online import currently in progress should continue,
   *         false otherwise.
   */
  public void setFollowImport(boolean followImport)
  private boolean isFollowImport()
  {
    this.followImport = followImport;
    return importErrorMessageId == -1;
  }
  /**
@@ -1067,7 +1062,7 @@
   * @return true if the operation contains some attributes subject to filtering
   * by the fractional configuration
   */
  public boolean fractionalFilterOperation(
  private boolean fractionalFilterOperation(
    PreOperationAddOperation addOperation, boolean performFiltering)
  {
    return fractionalRemoveAttributesFromEntry(fractionalConfig,
@@ -1088,7 +1083,7 @@
   * @return true if the operation is inconsistent with fractional
   * configuration
   */
  public boolean fractionalFilterOperation(
  private boolean fractionalFilterOperation(
    PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering)
  {
    // Quick exit if not called for analyze and
@@ -1406,7 +1401,7 @@
   * @return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES,
   * FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES or FRACTIONAL_BECOME_NO_OP
   */
  public int fractionalFilterOperation(PreOperationModifyOperation
  private int fractionalFilterOperation(PreOperationModifyOperation
    modifyOperation, boolean performFiltering)
  {
    /*
@@ -1494,7 +1489,7 @@
  @Override
  protected byte[] receiveEntryBytes()
  {
    if (followImport)
    if (isFollowImport())
    {
      // Ok, next entry is allowed to be received
      return super.receiveEntryBytes();
@@ -1505,19 +1500,20 @@
    // process:
    // This is an error termination during the import
    // The error is stored and the import is ended by returning null
    final IEContext ieCtx = getImportExportContext();
    Message msg = null;
    switch (importErrorMessageId)
    {
    case IMPORT_ERROR_MESSAGE_BAD_REMOTE:
      msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE.get(
          getBaseDNString(), Integer.toString(ieContext.getImportSource()));
          getBaseDNString(), Integer.toString(ieCtx.getImportSource()));
      break;
    case IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL:
      msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL.get(
          getBaseDNString(), Integer.toString(ieContext.getImportSource()));
          getBaseDNString(), Integer.toString(ieCtx.getImportSource()));
      break;
    }
    ieContext.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg));
    ieCtx.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg));
    return null;
  }
@@ -1552,7 +1548,7 @@
   * @return A SynchronizationProviderResult indicating if the operation
   *         can continue.
   */
  public SynchronizationProviderResult handleConflictResolution(
  SynchronizationProviderResult handleConflictResolution(
         PreOperationDeleteOperation deleteOperation)
  {
    if (!deleteOperation.isSynchronizationOperation() && !brokerIsConnected())
@@ -1624,7 +1620,7 @@
   * @return A SynchronizationProviderResult indicating if the operation
   *         can continue.
   */
  public SynchronizationProviderResult handleConflictResolution(
  SynchronizationProviderResult handleConflictResolution(
      PreOperationAddOperation addOperation)
  {
    if (!addOperation.isSynchronizationOperation() && !brokerIsConnected())
@@ -1754,7 +1750,7 @@
   * @return A SynchronizationProviderResult indicating if the operation
   *         can continue.
   */
  public SynchronizationProviderResult handleConflictResolution(
  SynchronizationProviderResult handleConflictResolution(
      PreOperationModifyDNOperation modifyDNOperation)
  {
    if (!modifyDNOperation.isSynchronizationOperation() && !brokerIsConnected())
@@ -1872,7 +1868,7 @@
   * @param modifyOperation the operation
   * @return code indicating is operation must proceed
   */
  public SynchronizationProviderResult handleConflictResolution(
  SynchronizationProviderResult handleConflictResolution(
         PreOperationModifyOperation modifyOperation)
  {
    if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected())
@@ -2243,7 +2239,7 @@
   *
   * @return The number of updates in the pending list
   */
  public int getPendingUpdatesCount()
  private int getPendingUpdatesCount()
  {
    if (pendingChanges != null)
      return pendingChanges.size();
@@ -2255,7 +2251,7 @@
   *
   * @return The number of updates replayed successfully
   */
  public int getNumReplayedPostOpCalled()
  private int getNumReplayedPostOpCalled()
  {
    return numReplayedPostOpCalled;
  }
@@ -2518,7 +2514,7 @@
   *
   * @param csn the CSN of the operation with error.
   */
  public void updateError(CSN csn)
  private void updateError(CSN csn)
  {
    try
    {
@@ -3217,7 +3213,7 @@
   * Get the number of modify conflicts successfully resolved.
   * @return The number of modify conflicts successfully resolved.
   */
  public int getNumResolvedModifyConflicts()
  private int getNumResolvedModifyConflicts()
  {
    return numResolvedModifyConflicts.get();
  }
@@ -3226,7 +3222,7 @@
   * Get the number of naming conflicts successfully resolved.
   * @return The number of naming conflicts successfully resolved.
   */
  public int getNumResolvedNamingConflicts()
  private int getNumResolvedNamingConflicts()
  {
    return numResolvedNamingConflicts.get();
  }
@@ -3235,7 +3231,7 @@
   * Get the number of unresolved conflicts.
   * @return The number of unresolved conflicts.
   */
  public int getNumUnresolvedNamingConflicts()
  private int getNumUnresolvedNamingConflicts()
  {
    return numUnresolvedNamingConflicts.get();
  }
@@ -3322,7 +3318,7 @@
   * @return The computed generationId.
   * @throws DirectoryException When an error occurs.
   */
  public long computeGenerationId() throws DirectoryException
  private long computeGenerationId() throws DirectoryException
  {
    long genId = exportBackend(null, true);
@@ -3503,7 +3499,7 @@
   * Do whatever is needed when a backup is started.
   * We need to make sure that the serverState is correctly save.
   */
  public void backupStart()
  void backupStart()
  {
    state.save();
  }
@@ -3511,7 +3507,7 @@
  /**
   * Do whatever is needed when a backup is finished.
   */
  public void backupEnd()
  void backupEnd()
  {
    // Nothing is needed at the moment
  }
@@ -3549,7 +3545,7 @@
  private long exportBackend(OutputStream output, boolean checksumOutput)
      throws DirectoryException
  {
    Backend backend = retrievesBackend(getBaseDN());
    Backend backend = getBackend();
    //  Acquire a shared lock for the backend.
    try
@@ -3678,20 +3674,6 @@
  }
  /**
   * Retrieves the backend related to the domain.
   *
   * @return The backend of that domain.
   * @param baseDN The baseDN to retrieve the backend
   */
  protected static Backend retrievesBackend(DN baseDN)
  {
    // Retrieves the backend related to this domain
    return DirectoryServer.getBackend(baseDN);
  }
  /**
   * Process backend before import.
   *
   * @param backend
@@ -3731,13 +3713,14 @@
  {
    LDIFImportConfig importConfig = null;
    Backend backend = retrievesBackend(getBaseDN());
    Backend backend = getBackend();
    IEContext ieCtx = getImportExportContext();
    try
    {
      if (!backend.supportsLDIFImport())
      {
        ieContext.setExceptionIfNoneSet(new DirectoryException(OTHER,
        ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER,
            ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID())));
      }
      else
@@ -3754,7 +3737,6 @@
        importConfig.setInvokeImportPlugins(true);
        // Reset the follow import flag and message before starting the import
        importErrorMessageId = -1;
        followImport = true;
        // TODO How to deal with rejected entries during the import
        importConfig.writeRejectedEntries(
@@ -3771,7 +3753,7 @@
    }
    catch(Exception e)
    {
      ieContext.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER,
      ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER,
          ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(e))));
    }
    finally
@@ -3783,12 +3765,12 @@
        {
          importConfig.close();
          closeBackendImport(backend); // Re-enable backend
          backend = retrievesBackend(getBaseDN());
          backend = getBackend();
        }
        loadDataState();
        if (ieContext.getException() != null)
        if (ieCtx.getException() != null)
        {
          // When an error occurred during an import, most of times
          // the generationId coming in the root entry of the imported data,
@@ -3804,14 +3786,14 @@
        // so we don't bother about the new Exception.
        // However if there was no Exception before we want
        // to return this Exception to the task creator.
        ieContext.setExceptionIfNoneSet(new DirectoryException(
        ieCtx.setExceptionIfNoneSet(new DirectoryException(
            ResultCode.OTHER,
            ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(fe))));
      }
    }
    if (ieContext.getException() != null)
      throw ieContext.getException();
    if (ieCtx.getException() != null)
      throw ieCtx.getException();
  }
  /**
@@ -3887,9 +3869,9 @@
   * Returns the backend associated to this domain.
   * @return The associated backend.
   */
  public Backend getBackend()
  private Backend getBackend()
  {
    return retrievesBackend(getBaseDN());
    return DirectoryServer.getBackend(getBaseDN());
  }
  /*
@@ -3946,7 +3928,7 @@
    }
    // Check that the base DN is configured as a base-dn of the directory server
    if (retrievesBackend(dn) == null)
    if (DirectoryServer.getBackend(dn) == null)
    {
      unacceptableReasons.add(ERR_UNKNOWN_DN.get(dn.toString()));
      return false;
@@ -3997,7 +3979,7 @@
         ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
  {
    // Check that a import/export is not in progress
    if (importInProgress() || exportInProgress())
    if (ieRunning())
    {
      unacceptableReasons.add(
          NOTE_ERR_CANNOT_CHANGE_CONFIG_DURING_TOTAL_UPDATE.get());
@@ -4059,7 +4041,7 @@
   * Remove from this domain configuration, the configuration of the
   * external change log.
   */
  public void removeECLDomainCfg()
  private void removeECLDomainCfg()
  {
    try
    {
@@ -4082,8 +4064,8 @@
   * @param  domCfg       The provided configuration.
   * @throws ConfigException When an error occurred.
   */
  public void storeECLConfiguration(ReplicationDomainCfg domCfg)
  throws ConfigException
  private void storeECLConfiguration(ReplicationDomainCfg domCfg)
      throws ConfigException
  {
    ExternalChangelogDomainCfg eclDomCfg = null;
    // create the ecl config if it does not exist
@@ -4434,7 +4416,7 @@
  @Override
  public long countEntries() throws DirectoryException
  {
    Backend backend = retrievesBackend(getBaseDN());
    Backend backend = getBackend();
    if (!backend.supportsLDIFExport())
    {
      Message msg = ERR_INIT_EXPORT_NOT_SUPPORTED.get(backend.getBackendID());
@@ -5105,7 +5087,7 @@
   * Specifies whether this domain is enabled/disabled regarding the ECL.
   * @return enabled/disabled for the ECL.
   */
  public boolean isECLEnabled()
  boolean isECLEnabled()
  {
    return this.eclDomain.isEnabled();
  }
@@ -5116,7 +5098,7 @@
   *
   * @return the purge delay.
   */
  public long getHistoricalPurgeDelay()
  long getHistoricalPurgeDelay()
  {
    return config.getConflictsHistoricalPurgeDelay() * 60 * 1000;
  }
opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -146,14 +146,14 @@
   * The context related to an import or export being processed
   * Null when none is being processed.
   */
  protected IEContext ieContext = null;
  volatile IEContext ieContext;
  /**
   * The Thread waiting for incoming update messages for this domain and pushing
   * them to the global incoming update message queue for later processing by
   * replay threads.
   */
  private volatile DirectoryThread listenerThread = null;
  private volatile DirectoryThread listenerThread;
  /**
   * A set of counters used for Monitoring.
@@ -732,7 +732,8 @@
        else if (msg instanceof ErrorMsg)
        {
          ErrorMsg errorMsg = (ErrorMsg)msg;
          if (ieContext != null)
          IEContext ieCtx = ieContext;
          if (ieCtx != null)
          {
            /*
            This is an error termination for the 2 following cases :
@@ -750,10 +751,10 @@
                  " baseDN: " + getBaseDN() +
                  " Error Msg received: " + errorMsg);
            if (errorMsg.getCreationTime() > ieContext.startTime)
            if (errorMsg.getCreationTime() > ieCtx.startTime)
            {
              // consider only ErrorMsg that relate to the current import/export
              processErrorMsg(errorMsg);
              processErrorMsg(errorMsg, ieCtx);
            }
            else
            {
@@ -784,10 +785,11 @@
        }
        else if (msg instanceof InitializeRcvAckMsg)
        {
          if (ieContext != null)
          IEContext ieCtx = ieContext;
          if (ieCtx != null)
          {
            InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
            ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
            ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
          }
          // Trash this msg When no input/export is running/should never happen
        }
@@ -1044,7 +1046,7 @@
    private long entryLeftCount = 0;
    /** Exception raised during the initialization. */
    private DirectoryException exception = null;
    private DirectoryException exception;
    /** Whether the context is related to an import or an export. */
    private boolean importInProgress;
@@ -1061,7 +1063,7 @@
    /**
     * Request message sent when this server has the initializeFromRemote task.
     */
    private InitializeRequestMsg initReqMsgSent = null;
    private InitializeRequestMsg initReqMsgSent;
    /**
     * Start time of the initialization process. ErrorMsg timestamped before
@@ -1116,12 +1118,47 @@
    }
    /**
     * Returns a boolean indicating if a total update import is currently in
     * Progress.
     *
     * @return A boolean indicating if a total update import is currently in
     *         Progress.
     */
    public boolean importInProgress()
    {
      return importInProgress;
    }
    /**
     * Returns the total number of entries to be processed when a total update
     * is in progress.
     *
     * @return The total number of entries to be processed when a total update
     *         is in progress.
     */
    long getTotalEntryCount()
    {
      return entryCount;
    }
    /**
     * Returns the number of entries still to be processed when a total update
     * is in progress.
     *
     * @return The number of entries still to be processed when a total update
     *         is in progress.
     */
    long getLeftEntryCount()
    {
      return entryLeftCount;
    }
    /**
     * Initializes the import/export counters with the provider value.
     * @param total Total number of entries to be processed.
     * @throws DirectoryException if an error occurred.
     */
    private void initializeCounters(long total)
      throws DirectoryException
    private void initializeCounters(long total) throws DirectoryException
    {
      entryCount = total;
      entryLeftCount = total;
@@ -1150,8 +1187,7 @@
     *
     * @throws DirectoryException if an error occurred.
     */
    public void updateCounters(int entriesDone)
      throws DirectoryException
    public void updateCounters(int entriesDone) throws DirectoryException
    {
      entryLeftCount -= entriesDone;
@@ -1358,6 +1394,7 @@
    - to update the task with the server(s) where this test failed
    */
    IEContext ieCtx = ieContext;
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
@@ -1365,7 +1402,7 @@
      for (DSInfo dsi : getReplicasList())
      {
        ieContext.startList.add(dsi.getDsId());
        ieCtx.startList.add(dsi.getDsId());
      }
      // We manage the list of servers with which a flow control can be enabled
@@ -1373,7 +1410,7 @@
      {
        if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          ieContext.setAckVal(dsi.getDsId(), 0);
          ieCtx.setAckVal(dsi.getDsId(), 0);
        }
      }
    }
@@ -1382,7 +1419,7 @@
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
          getBaseDNString(), getServerId(), serverToInitialize));
      ieContext.startList.add(serverToInitialize);
      ieCtx.startList.add(serverToInitialize);
      // We manage the list of servers with which a flow control can be enabled
      for (DSInfo dsi : getReplicasList())
@@ -1390,7 +1427,7 @@
        if (dsi.getDsId() == serverToInitialize &&
            dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          ieContext.setAckVal(dsi.getDsId(), 0);
          ieCtx.setAckVal(dsi.getDsId(), 0);
        }
      }
    }
@@ -1402,34 +1439,34 @@
    {
      try
      {
        ieContext.exportTarget = serverToInitialize;
        ieCtx.exportTarget = serverToInitialize;
        if (initTask != null)
        {
          ieContext.initializeTask = initTask;
          ieCtx.initializeTask = initTask;
        }
        ieContext.initializeCounters(this.countEntries());
        ieContext.msgCnt = 0;
        ieContext.initNumLostConnections = broker.getNumLostConnections();
        ieContext.initWindow = initWindow;
        ieCtx.initializeCounters(this.countEntries());
        ieCtx.msgCnt = 0;
        ieCtx.initNumLostConnections = broker.getNumLostConnections();
        ieCtx.initWindow = initWindow;
        // Send start message to the peer
        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
            getBaseDN(), getServerId(), serverToInitialize,
            serverRunningTheTask, ieContext.entryCount, initWindow);
            serverRunningTheTask, ieCtx.entryCount, initWindow);
        broker.publish(initTargetMsg);
        // Wait for all servers to be ok
        waitForRemoteStartOfInit();
        waitForRemoteStartOfInit(ieCtx);
        // Servers that left in the list are those for which we could not test
        // that they have been successfully initialized.
        if (!ieContext.failureList.isEmpty())
        if (!ieCtx.failureList.isEmpty())
        {
          throw new DirectoryException(
              ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
                  ieContext.failureList.toString()));
                  ieCtx.failureList.toString()));
        }
        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
@@ -1441,7 +1478,7 @@
      catch(DirectoryException exportException)
      {
        // Give priority to the first exception raised - stored in the context
        final DirectoryException ieEx = ieContext.exception;
        final DirectoryException ieEx = ieCtx.exception;
        exportRootException = ieEx != null ? ieEx : exportException;
      }
@@ -1500,10 +1537,8 @@
            continue;
          }
          ErrorMsg errorMsg =
              new ErrorMsg(serverToInitialize,
                  exportRootException.getMessageObject());
          broker.publish(errorMsg);
          broker.publish(new ErrorMsg(
              serverToInitialize, exportRootException.getMessageObject()));
        }
        catch(Exception e)
        {
@@ -1518,16 +1553,16 @@
    } // attempt loop
    // Wait for all servers to be ok, and build the failure list
    waitForRemoteEndOfInit();
    waitForRemoteEndOfInit(ieCtx);
    // Servers that left in the list are those for which we could not test
    // that they have been successfully initialized.
    if (!ieContext.failureList.isEmpty() && exportRootException == null)
    if (!ieCtx.failureList.isEmpty() && exportRootException == null)
    {
      exportRootException = new DirectoryException(ResultCode.OTHER,
              ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
                  Long.toString(getGenerationID()),
                  ieContext.failureList.toString()));
                  ieCtx.failureList.toString()));
    }
    // Don't forget to release IEcontext acquired at beginning.
@@ -1558,10 +1593,10 @@
   * - wait it has finished the import and present the expected generationID,
   * - build the failureList.
   */
  private void waitForRemoteStartOfInit()
  private void waitForRemoteStartOfInit(IEContext ieCtx)
  {
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieContext.startList);
        new HashSet<Integer>(ieCtx.startList);
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1580,7 +1615,7 @@
            + " " + dsi.getStatus()
            + " " + dsi.getGenerationId()
            + " " + getGenerationID());
        if (ieContext.startList.contains(dsi.getDsId()))
        if (ieCtx.startList.contains(dsi.getDsId()))
        {
          if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
          {
@@ -1604,11 +1639,11 @@
    }
    while (!done && waitResultAttempt < 1200 && !broker.shuttingDown());
    ieContext.failureList.addAll(replicasWeAreWaitingFor);
    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for start ends with " + ieContext.failureList);
        "[IE] wait for start ends with " + ieCtx.failureList);
  }
  /**
@@ -1616,10 +1651,10 @@
   * - wait it has finished the import and present the expected generationID,
   * - build the failureList.
   */
  private void waitForRemoteEndOfInit()
  private void waitForRemoteEndOfInit(IEContext ieCtx)
  {
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieContext.startList);
        new HashSet<Integer>(ieCtx.startList);
    if (debugEnabled())
      TRACER.debugInfo(
@@ -1645,7 +1680,7 @@
      while (it.hasNext())
      {
        int serverId = it.next();
        if (ieContext.failureList.contains(serverId))
        if (ieCtx.failureList.contains(serverId))
        {
          /*
          this server has already been in error during initialization
@@ -1701,11 +1736,11 @@
    }
    while (!done && !broker.shuttingDown()); // infinite wait
    ieContext.failureList.addAll(replicasWeAreWaitingFor);
    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
    if (debugEnabled())
      TRACER.debugInfo(
        "[IE] wait for end ends with " + ieContext.failureList);
        "[IE] wait for end ends with " + ieCtx.failureList);
  }
  /**
@@ -1743,13 +1778,13 @@
   *
   * @param errorMsg The error message received.
   */
  private void processErrorMsg(ErrorMsg errorMsg)
  private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx)
  {
    //Exporting must not be stopped on the first error, if we run initialize-all
    if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
    if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
    {
      // The ErrorMsg is received while we have started an initialization
      ieContext.setExceptionIfNoneSet(new DirectoryException(
      ieCtx.setExceptionIfNoneSet(new DirectoryException(
          ResultCode.OTHER, errorMsg.getDetails()));
      /*
@@ -1759,11 +1794,11 @@
       *   even after the nextInitAttemptDelay
       * During the import, the ErrorMsg will be received by receiveEntryBytes
       */
      if (ieContext.initializeTask instanceof InitializeTask)
      if (ieCtx.initializeTask instanceof InitializeTask)
      {
        // Update the task that initiated the import
        ((InitializeTask) ieContext.initializeTask)
            .updateTaskCompletionState(ieContext.getException());
        ((InitializeTask) ieCtx.initializeTask)
            .updateTaskCompletionState(ieCtx.getException());
        releaseIEContext();
      }
@@ -1781,6 +1816,7 @@
    ReplicationMsg msg;
    while (true)
    {
      IEContext ieCtx = ieContext;
      try
      {
        // In the context of the total update, we don't want any automatic
@@ -1807,7 +1843,7 @@
          else
          {
            // Handle connection issues
            ieContext.setExceptionIfNoneSet(new DirectoryException(
            ieCtx.setExceptionIfNoneSet(new DirectoryException(
                ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT
                    .get(broker.getReplicationServer())));
            return null;
@@ -1819,26 +1855,26 @@
        {
          EntryMsg entryMsg = (EntryMsg)msg;
          byte[] entryBytes = entryMsg.getEntryBytes();
          ieContext.updateCounters(countEntryLimits(entryBytes));
          ieCtx.updateCounters(countEntryLimits(entryBytes));
          if (ieContext.exporterProtocolVersion >=
          if (ieCtx.exporterProtocolVersion >=
            ProtocolVersion.REPLICATION_PROTOCOL_V4)
          {
            // check the msgCnt of the msg received to check ordering
            if (++ieContext.msgCnt != entryMsg.getMsgId())
            if (++ieCtx.msgCnt != entryMsg.getMsgId())
            {
              ieContext.setExceptionIfNoneSet(new DirectoryException(
              ieCtx.setExceptionIfNoneSet(new DirectoryException(
                  ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(
                      String.valueOf(ieContext.msgCnt),
                      String.valueOf(ieCtx.msgCnt),
                      String.valueOf(entryMsg.getMsgId()))));
              return null;
            }
            // send the ack of flow control mgmt
            if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
            if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0)
            {
              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
                  getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
                  getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt);
              broker.publish(amsg, false);
              if (debugEnabled())
              {
@@ -1864,12 +1900,12 @@
          This is an error termination during the import
          The error is stored and the import is ended by returning null
          */
          if (ieContext.getException() == null)
          if (ieCtx.getException() == null)
          {
            ErrorMsg errMsg = (ErrorMsg)msg;
            if (errMsg.getCreationTime() > ieContext.startTime)
            if (errMsg.getCreationTime() > ieCtx.startTime)
            {
              ieContext.setException(
              ieCtx.setException(
                  new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
              return null;
            }
@@ -1880,15 +1916,15 @@
          // Other messages received during an import are trashed except
          // the topologyMsg.
          if (msg instanceof TopologyMsg
              && isRemoteDSConnected(ieContext.importSource) == null)
              && isRemoteDSConnected(ieCtx.importSource) == null)
          {
            Message errMsg =
              Message.raw(Category.SYNC, Severity.NOTICE,
                  ERR_INIT_EXPORTER_DISCONNECTION.get(
                      getBaseDNString(),
                      Integer.toString(getServerId()),
                      Integer.toString(ieContext.importSource)));
            ieContext.setExceptionIfNoneSet(new DirectoryException(
                      Integer.toString(ieCtx.importSource)));
            ieCtx.setExceptionIfNoneSet(new DirectoryException(
                ResultCode.OTHER, errMsg));
            return null;
          }
@@ -1896,7 +1932,7 @@
      }
      catch(Exception e)
      {
        ieContext.setExceptionIfNoneSet(new DirectoryException(
        ieCtx.setExceptionIfNoneSet(new DirectoryException(
            ResultCode.OTHER,
            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
      }
@@ -1957,9 +1993,10 @@
          Arrays.toString(lDIFEntry));
    // build the message
    IEContext ieCtx = ieContext;
    EntryMsg entryMessage = new EntryMsg(
        getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
        ++ieContext.msgCnt);
        getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
        ++ieCtx.msgCnt);
    // Waiting the slowest loop
    while (!broker.shuttingDown())
@@ -1969,30 +2006,30 @@
      server that have been stored by the listener thread in the ieContext,
      we just abandon the export by throwing an exception.
      */
      if (ieContext.getException() != null)
      if (ieCtx.getException() != null)
      {
        throw new IOException(ieContext.getException().getMessage());
        throw new IOException(ieCtx.getException().getMessage());
      }
      int slowestServerId = ieContext.getSlowestServer();
      int slowestServerId = ieCtx.getSlowestServer();
      if (isRemoteDSConnected(slowestServerId)==null)
      {
        ieContext.setException(new DirectoryException(ResultCode.OTHER,
        ieCtx.setException(new DirectoryException(ResultCode.OTHER,
            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
                Integer.toString(ieContext.getSlowestServer()))));
                Integer.toString(ieCtx.getSlowestServer()))));
        throw new IOException("IOException with nested DirectoryException",
            ieContext.getException());
            ieCtx.getException());
      }
      int ourLastExportedCnt = ieContext.msgCnt;
      int slowestCnt = ieContext.ackVals.get(slowestServerId);
      int ourLastExportedCnt = ieCtx.msgCnt;
      int slowestCnt = ieCtx.ackVals.get(slowestServerId);
      if (debugEnabled())
        TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " +
            " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
      if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow)
      if ((ourLastExportedCnt - slowestCnt) > ieCtx.initWindow)
      {
        if (debugEnabled())
          TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting");
@@ -2003,13 +2040,13 @@
        // process any connection error
        if (broker.hasConnectionError()
          || broker.getNumLostConnections() != ieContext.initNumLostConnections)
          || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
        {
          // publish failed - store the error in the ieContext ...
          DirectoryException de = new DirectoryException(ResultCode.OTHER,
              ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
                  Integer.toString(broker.getRsServerId())));
          ieContext.setExceptionIfNoneSet(de);
          ieCtx.setExceptionIfNoneSet(de);
          // .. and abandon the export by throwing an exception.
          throw new IOException(de.getMessage());
        }
@@ -2031,13 +2068,13 @@
    // process any publish error
    if (!sent
        || broker.hasConnectionError()
        || broker.getNumLostConnections() != ieContext.initNumLostConnections)
        || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
    {
      // publish failed - store the error in the ieContext ...
      DirectoryException de = new DirectoryException(ResultCode.OTHER,
          ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
              Integer.toString(broker.getRsServerId())));
      ieContext.setExceptionIfNoneSet(de);
      ieCtx.setExceptionIfNoneSet(de);
      // .. and abandon the export by throwing an exception.
      throw new IOException(de.getMessage());
    }
@@ -2045,11 +2082,11 @@
    // publish succeeded
    try
    {
      ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length));
      ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length));
    }
    catch (DirectoryException de)
    {
      ieContext.setExceptionIfNoneSet(de);
      ieCtx.setExceptionIfNoneSet(de);
      // .. and abandon the export by throwing an exception.
      throw new IOException(de.getMessage());
    }
@@ -2157,13 +2194,12 @@
      */
      acquireIEContext(true);  //test and set if no import already in progress
      ieContext.initializeTask = initTask;
      ieContext.attemptCnt = 0;
      ieContext.initReqMsgSent = new InitializeRequestMsg(
      IEContext ieCtx = ieContext;
      ieCtx.initializeTask = initTask;
      ieCtx.attemptCnt = 0;
      ieCtx.initReqMsgSent = new InitializeRequestMsg(
          getBaseDN(), getServerId(), source, getInitWindow());
      // Publish Init request msg
      broker.publish(ieContext.initReqMsgSent);
      broker.publish(ieCtx.initReqMsgSent);
      /*
      The normal success processing is now to receive InitTargetMsg then
@@ -2219,6 +2255,7 @@
    int source = initTargetMsgReceived.getSenderID();
    IEContext ieCtx = ieContext;
    try
    {
      // Log starting
@@ -2240,12 +2277,12 @@
      }
      // Initialize stuff
      ieContext.importSource = source;
      ieContext.initializeCounters(initTargetMsgReceived.getEntryCount());
      ieContext.initWindow = initTargetMsgReceived.getInitWindow();
      ieCtx.importSource = source;
      ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
      ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
      // Protocol version is -1 when not known.
      ieContext.exporterProtocolVersion = getProtocolVersion(source);
      initFromTask = (InitializeTask)ieContext.initializeTask;
      ieCtx.exporterProtocolVersion = getProtocolVersion(source);
      initFromTask = (InitializeTask) ieCtx.initializeTask;
      // Launch the import
      importBackend(new ReplInputStream(this));
@@ -2257,14 +2294,14 @@
      Store the exception raised. It will be considered if no other exception
      has been previously stored in  the context
      */
      ieContext.setExceptionIfNoneSet(e);
      ieCtx.setExceptionIfNoneSet(e);
    }
    finally
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends import with exception=" + ieContext.getException()
          + " ends import with exception=" + ieCtx.getException()
          + " connected=" + broker.isConnected());
      }
@@ -2278,10 +2315,10 @@
      */
      broker.reStart(false);
      if (ieContext.getException() != null
      if (ieCtx.getException() != null
          && broker.isConnected()
          && initFromTask != null
          && ++ieContext.attemptCnt < 2)
          && ++ieCtx.attemptCnt < 2)
      {
          /*
          Worth a new attempt
@@ -2300,13 +2337,13 @@
            the request
            */
            logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
                ieContext.getException().getLocalizedMessage()));
                ieCtx.getException().getLocalizedMessage()));
            broker.publish(ieContext.initReqMsgSent);
            broker.publish(ieCtx.initReqMsgSent);
            ieContext.initializeCounters(0);
            ieContext.exception = null;
            ieContext.msgCnt = 0;
            ieCtx.initializeCounters(0);
            ieCtx.exception = null;
            ieCtx.msgCnt = 0;
            // Processing of the received initTargetMsgReceived is done
            // let's wait for the next one
@@ -2320,7 +2357,7 @@
            */
            logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
              e.getLocalizedMessage(),
              ieContext.getException().getLocalizedMessage()));
              ieCtx.getException().getLocalizedMessage()));
          }
      }
@@ -2330,19 +2367,19 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("[IE] Domain=" + this
          + " ends initialization with exception=" + ieContext.getException()
          + " ends initialization with exception=" + ieCtx.getException()
          + " connected=" + broker.isConnected()
          + " task=" + initFromTask
          + " attempt=" + ieContext.attemptCnt);
          + " attempt=" + ieCtx.attemptCnt);
      }
      try
      {
        if (broker.isConnected() && ieContext.getException() != null)
        if (broker.isConnected() && ieCtx.getException() != null)
        {
          // Let's notify the exporter
          ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
              ieContext.getException().getMessageObject());
              ieCtx.getException().getMessageObject());
          broker.publish(errorMsg);
        }
        /*
@@ -2353,7 +2390,7 @@
        */
        if (initFromTask != null)
        {
          initFromTask.updateTaskCompletionState(ieContext.getException());
          initFromTask.updateTaskCompletionState(ieCtx.getException());
        }
      }
      finally
@@ -2361,8 +2398,8 @@
        Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
            getBaseDNString(), initTargetMsgReceived.getSenderID(),
            getServerId(),
            (ieContext.getException() == null ? ""
                : ieContext.getException().getLocalizedMessage()));
            (ieCtx.getException() == null ? ""
                : ieCtx.getException().getLocalizedMessage()));
        logError(msg);
        releaseIEContext();
      } // finally
@@ -3449,43 +3486,13 @@
  }
  /**
   * Returns a boolean indicating if a total update import is currently
   * in Progress.
   * Returns the Import/Export context associated to this ReplicationDomain.
   *
   * @return A boolean indicating if a total update import is currently
   *         in Progress.
   * @return the Import/Export context associated to this ReplicationDomain
   */
  public boolean importInProgress()
  protected IEContext getImportExportContext()
  {
    return ieContext != null && ieContext.importInProgress;
  }
  /**
   * Returns a boolean indicating if a total update export is currently
   * in Progress.
   *
   * @return A boolean indicating if a total update export is currently
   *         in Progress.
   */
  public boolean exportInProgress()
  {
    return ieContext != null && !ieContext.importInProgress;
  }
  /**
   * Returns the number of entries still to be processed when a total update
   * is in progress.
   *
   * @return The number of entries still to be processed when a total update
   *         is in progress.
   */
  long getLeftEntryCount()
  {
    if (ieContext != null)
    {
      return ieContext.entryLeftCount;
    }
    return 0;
    return ieContext;
  }
  /**
@@ -3501,24 +3508,6 @@
  }
  /**
   * Returns the total number of entries to be processed when a total update
   * is in progress.
   *
   * @return The total number of entries to be processed when a total update
   *         is in progress.
   */
  long getTotalEntryCount()
  {
    if (ieContext != null)
    {
      return ieContext.entryCount;
    }
    return 0;
  }
  /**
   * Set the attributes configured on a server to be included in the ECL.
   *
   * @param serverId
@@ -3617,7 +3606,7 @@
   *          The serverId for which we want the include attributes.
   * @return The attributes.
   */
  public Set<String> getEclIncludes(int serverId)
  Set<String> getEclIncludes(int serverId)
  {
    synchronized (eclIncludesLock)
    {
@@ -3635,7 +3624,7 @@
   *          The serverId for which we want the include attributes.
   * @return The attributes.
   */
  public Set<String> getEclIncludesForDeletes(int serverId)
  Set<String> getEclIncludesForDeletes(int serverId)
  {
    synchronized (eclIncludesLock)
    {
opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.service;
@@ -34,6 +34,7 @@
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.service.ReplicationDomain.IEContext;
import org.opends.server.types.*;
/**
@@ -130,21 +131,15 @@
        String.valueOf(domain.getGenerationID())));
    // Add import/export monitoring attributes
    if (domain.importInProgress())
    final IEContext ieContext = domain.getImportExportContext();
    if (ieContext != null)
    {
      addMonitorData(attributes, "total-update", "import");
      addMonitorData(
          attributes, "total-update-entry-count", domain.getTotalEntryCount());
      addMonitorData(
          attributes, "total-update-entry-left", domain.getLeftEntryCount());
    }
    if (domain.exportInProgress())
    {
      addMonitorData(attributes, "total-update", "export");
      addMonitorData(
          attributes, "total-update-entry-count", domain.getTotalEntryCount());
      addMonitorData(
          attributes, "total-update-entry-left", domain.getLeftEntryCount());
      addMonitorData(attributes, "total-update",
          ieContext.importInProgress() ? "import" : "export");
      addMonitorData(attributes, "total-update-entry-count",
          ieContext.getTotalEntryCount());
      addMonitorData(attributes, "total-update-entry-left",
          ieContext.getLeftEntryCount());
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -20,7 +20,7 @@
 *
 * CDDL HEADER END
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 *      Portions Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 *      Portions Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -317,12 +317,7 @@
          "exportAndImportData", 100);
      SortedSet<String> servers = newSortedSet("localhost:" + replServerPort);
      StringBuilder exportedDataBuilder = new StringBuilder();
      for (int i =0; i<ENTRYCOUNT; i++)
      {
        exportedDataBuilder.append("key : value"+i+"\n\n");
      }
      String exportedData=exportedDataBuilder.toString();
      String exportedData = buildExportedData(ENTRYCOUNT);
      domain1 = new FakeReplicationDomain(
          testService, serverId1, servers, 0, exportedData, null, ENTRYCOUNT);
@@ -343,18 +338,8 @@
        }
      }
      int count = 0;
      while ((importedData.length() < exportedData.length()) && (count < 500))
      {
        count ++;
        Thread.sleep(100);
      }
      assertEquals(domain2.getLeftEntryCount(), 0,
          "LeftEntryCount for export is " + domain2.getLeftEntryCount());
      assertEquals(domain1.getLeftEntryCount(), 0,
          "LeftEntryCount for import is " + domain1.getLeftEntryCount());
      assertEquals(importedData.length(), exportedData.length());
      assertEquals(importedData.toString(), exportedData);
      waitEndExport(exportedData, importedData);
      assertExportSucessful(domain1, domain2, exportedData, importedData);
    }
    finally
    {
@@ -393,12 +378,7 @@
      SortedSet<String> servers1 = newSortedSet("localhost:" + replServerPort1);
      SortedSet<String> servers2 = newSortedSet("localhost:" + replServerPort2);
      StringBuilder exportedDataBuilder = new StringBuilder();
      for (int i =0; i<ENTRYCOUNT; i++)
      {
        exportedDataBuilder.append("key : value"+i+"\n\n");
      }
      String exportedData=exportedDataBuilder.toString();
      String exportedData = buildExportedData(ENTRYCOUNT);
      domain1 = new FakeReplicationDomain(
          testService, 1, servers1, 0, exportedData, null, ENTRYCOUNT);
@@ -408,18 +388,8 @@
      domain2.initializeFromRemote(1);
      int count = 0;
      while ((importedData.length() < exportedData.length()) && (count < 500))
      {
        count ++;
        Thread.sleep(100);
      }
      assertEquals(domain2.getLeftEntryCount(), 0,
          "LeftEntryCount for export is " + domain2.getLeftEntryCount());
      assertEquals(domain1.getLeftEntryCount(), 0,
          "LeftEntryCount for import is " + domain1.getLeftEntryCount());
      assertEquals(importedData.length(), exportedData.length());
      assertEquals(importedData.toString(), exportedData);
      waitEndExport(exportedData, importedData);
      assertExportSucessful(domain1, domain2, exportedData, importedData);
    }
    finally
    {
@@ -428,6 +398,35 @@
    }
  }
  private String buildExportedData(final int ENTRYCOUNT)
  {
    final StringBuilder sb = new StringBuilder();
    for (int i = 0; i < ENTRYCOUNT; i++)
    {
      sb.append("key : value" + i + "\n\n");
    }
    return sb.toString();
  }
  private void waitEndExport(String exportedData, StringBuilder importedData) throws Exception
  {
    int count = 0;
    while (importedData.length() < exportedData.length() && count < 500)
    {
      count ++;
      Thread.sleep(100);
    }
  }
  private void assertExportSucessful(ReplicationDomain domain1,
      ReplicationDomain domain2, String exportedData, StringBuilder importedData)
  {
    assertEquals(domain2.ieContext.getLeftEntryCount(), 0, "Wrong LeftEntryCount for export");
    assertEquals(domain1.ieContext.getLeftEntryCount(), 0, "Wrong LeftEntryCount for import");
    assertEquals(importedData.length(), exportedData.length());
    assertEquals(importedData.toString(), exportedData);
  }
  /**
   * Sender side of the Total Update Perf test.
   * The goal of this test is to measure the performance