mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
07.57.2007 6c09411df244a409bcd0a440a1e974ef91bd7035

The following changes in the replication server provide an implementation of the backend API for the replication server changes database.
In a first step, the implemented features are backup/restore.
The coming ones will be export/search to have an LDAP access of the content of the replication server DB.

I also cleaned/shared some code on replication unit tests.
Also fixed a potential bug in the replication plugin on the total update by replacing any object dependency from the ReplicationDomain to the associated
backend because the configuration changes on the backend object may replace this object by another instance, so keeping a reference on the backend instance is
buggy. The instance must be retrieved when needed.

1 files added
11 files modified
1449 ■■■■ changed files
opends/src/messages/messages/replication.properties 9 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 17 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 29 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java 493 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 242 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 43 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 188 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 162 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 193 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 64 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -80,7 +80,7 @@
 changes that this server has already processed on suffix %s
NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \
 server should be configured
NOTICE_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
 communication on domain %s with replication server %s : %s
MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \
 database for base DN %s
@@ -224,4 +224,9 @@
SEVERE_ERR_ERROR_CLEARING_DB_87=While clearing the database %s , the following \
 error happened: %s
 NOTICE_ERR_ROUTING_TO_SERVER_88=Protocol error : a replication server is not expected \
 to be the destination of a message of type %s
 to be the destination of a message of type %s
 SEVERE_ERR_CHECK_CREATE_REPL_BACKEND_FAILED_89=An unexpected error occured when \
 testing existence or creating the replication backend : %s
 SEVERE_ERR_DELETE_REPL_BACKEND_FAILED_90=An unexpected error occured when \
 deleting the replication backend : %s
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -26,10 +26,14 @@
 */
package org.opends.server.replication.plugin;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.MultimasterDomainCfg;
@@ -42,7 +46,6 @@
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.Control;
@@ -66,10 +69,6 @@
import org.opends.server.types.operation.PreOperationDeleteOperation;
import org.opends.server.types.operation.PreOperationModifyDNOperation;
import org.opends.server.types.operation.PreOperationModifyOperation;
import org.opends.messages.Message;
import static org.opends.server.replication.plugin.
              ReplicationRepairRequestControl.*;
/**
 * This class is used to load the Replication code inside the JVM
@@ -86,7 +85,7 @@
                  BackupTaskListener, RestoreTaskListener, ImportTaskListener,
                  ExportTaskListener
{
  private ReplicationServerListener replicationServer = null;
  private ReplicationServerListener replicationServerListener = null;
  private static Map<DN, ReplicationDomain> domains =
    new HashMap<DN, ReplicationDomain>() ;
@@ -193,7 +192,7 @@
      MultimasterSynchronizationProviderCfg configuration)
  throws ConfigException
  {
    replicationServer = new ReplicationServerListener(configuration);
    replicationServerListener = new ReplicationServerListener(configuration);
    // Register as an add and delete listener with the root configuration so we
    // can be notified if Multimaster domain entries are added or removed.
@@ -438,8 +437,8 @@
    }
    // shutdown the ReplicationServer Service if necessary
    if (replicationServer != null)
      replicationServer.shutdown();
    if (replicationServerListener != null)
      replicationServerListener.shutdown();
    DirectoryServer.deregisterBackupTaskListener(this);
    DirectoryServer.deregisterRestoreTaskListener(this);
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -392,7 +392,7 @@
          }
          catch (Exception e)
          {
            Message message = NOTE_EXCEPTION_STARTING_SESSION.get(
            Message message = ERR_EXCEPTION_STARTING_SESSION.get(
                baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
                stackTraceToSingleLineString(e));
            logError(message);
@@ -733,7 +733,7 @@
      if (debugEnabled())
      {
        debugInfo("ReplicationBroker is stopping. and will" +
          "close the connection");
          " close the connection");
      }
      if (session != null)
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -212,9 +212,6 @@
  // Null when none is being processed.
  private IEContext ieContext = null;
  // The backend information necessary to make an import or export.
  private Backend backend;
  private int listenerThreadNumber = 10;
  private Collection<String> replicationServers;
@@ -383,7 +380,7 @@
    monitor = new ReplicationMonitor(this);
    DirectoryServer.registerMonitorProvider(monitor);
    backend = retrievesBackend(baseDN);
    Backend backend = retrievesBackend(baseDN);
    if (backend == null)
    {
      throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
@@ -855,8 +852,6 @@
                                   de.getMessageObject());
                MessageBuilder mb = new MessageBuilder();
                mb.append(de.getMessageObject());
                mb.append("Backend ID: ");
                mb.append(backend.getBackendID());
                TRACER.debugInfo(Message.toString(mb.toMessage()));
                broker.publish(errorMsg);
              }
@@ -2224,10 +2219,8 @@
   */
  public long computeGenerationId() throws DirectoryException
  {
    Backend backend = this.retrievesBackend(baseDN);
    long bec = backend.getEntryCount();
    if (bec<0)
      backend = this.retrievesBackend(baseDN);
    bec = backend.getEntryCount();
    this.acquireIEContext();
    ieContext.checksumOutput = true;
    ieContext.entryCount = (bec<1000?bec:1000);
@@ -2598,9 +2591,7 @@
  protected void exportBackend()
  throws DirectoryException
  {
    // FIXME Temporary workaround - will probably be fixed when implementing
    // dynamic config
    backend = retrievesBackend(this.baseDN);
    Backend backend = retrievesBackend(this.baseDN);
    //  Acquire a shared lock for the backend.
    try
@@ -2938,9 +2929,7 @@
  {
    try
    {
      // FIXME Temporary workaround - will probably be fixed when implementing
      // dynamic config
      backend = retrievesBackend(this.baseDN);
      Backend backend = retrievesBackend(this.baseDN);
      if (!backend.supportsLDIFExport())
      {
@@ -3027,6 +3016,8 @@
    LDIFImportConfig importConfig = null;
    DirectoryException de = null;
    Backend backend = this.retrievesBackend(baseDN);
    if (!backend.supportsLDIFImport())
    {
      Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
@@ -3051,7 +3042,7 @@
      ieContext.entryLeftCount = initializeMessage.getEntryCount();
      ieContext.initImportExportCounters(initializeMessage.getEntryCount());
      preBackendImport(this.backend);
      preBackendImport(backend);
      ieContext.ldifImportInputStream = new ReplLDIFInputStream(this);
      importConfig =
@@ -3066,7 +3057,7 @@
      // ExistingFileBehavior.OVERWRITE);
      // Process import
      this.backend.importLDIF(importConfig);
      backend.importLDIF(importConfig);
      TRACER.debugInfo("The import has ended successfully.");
      stateSavingDisabled = false;
@@ -3083,7 +3074,7 @@
      importConfig.close();
      // Re-enable backend
      closeBackendImport(this.backend);
      closeBackendImport(backend);
      // Update the task that initiated the import
      if ((ieContext != null ) && (ieContext.initializeTask != null))
@@ -3202,7 +3193,7 @@
   */
  public Backend getBackend()
  {
    return backend;
    return retrievesBackend(baseDN);
  }
  /**
opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
@@ -25,6 +25,7 @@
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import java.util.List;
@@ -105,7 +106,7 @@
  }
  /**
   * Shutdown the Replication servers.
   * Shutdown the replication server.
   */
  public void shutdown()
  {
@@ -123,7 +124,7 @@
    // replicationServer currently configured.
    if (replicationServer != null)
    {
      replicationServer.shutdown();
      replicationServer.remove();
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
New file
@@ -0,0 +1,493 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.getExceptionMessage;
import java.util.HashMap;
import java.util.HashSet;
import org.opends.messages.Message;
import org.opends.server.admin.Configuration;
import org.opends.server.admin.std.server.BackendCfg;
import org.opends.server.admin.std.server.JEBackendCfg;
import org.opends.server.api.Backend;
import org.opends.server.backends.jeb.BackupManager;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.BackupDirectory;
import org.opends.server.types.ConditionResult;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.Validator;
/**
 * This class defines a backend that stores its information in an
 * associated replication server object.
 * This is primarily intended to take advantage of the backup/restore/
 * import/export of the backend API, and to provide an LDAP access
 * to the replication server database.
 * <BR><BR>
 * Entries stored in this backend are held in the DB associated with
 * the replication server.
 * <BR><BR>
 * Currently are only implemented the create and restore backup features.
 *
 */
public class ReplicationBackend
       extends Backend
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  // The base DNs for this backend.
  private DN[] baseDNs;
  // The mapping between parent DNs and their immediate children.
  private HashMap<DN,HashSet<DN>> childDNs;
  // The base DNs for this backend, in a hash set.
  private HashSet<DN> baseDNSet;
  // The set of supported controls for this backend.
  private HashSet<String> supportedControls;
  // The set of supported features for this backend.
  private HashSet<String> supportedFeatures;
  // The directory associated with this backend.
  private BackupDirectory backendDirectory;
  ReplicationServer server;
  /**
   * The configuration of this backend.
   */
  private JEBackendCfg cfg;
  /**
   * Creates a new backend with the provided information.  All backend
   * implementations must implement a default constructor that use
   * <CODE>super()</CODE> to invoke this constructor.
   */
  public ReplicationBackend()
  {
    super();
    // Perform all initialization in initializeBackend.
  }
  /**
   * Set the base DNs for this backend.  This is used by the unit tests
   * to set the base DNs without having to provide a configuration
   * object when initializing the backend.
   * @param baseDNs The set of base DNs to be served by this memory backend.
   */
  public void setBaseDNs(DN[] baseDNs)
  {
    this.baseDNs = baseDNs;
  }
  /**
   * {@inheritDoc}
   */
  public void configureBackend(Configuration config) throws ConfigException
  {
    if (config != null)
    {
      Validator.ensureTrue(config instanceof BackendCfg);
      cfg = (JEBackendCfg)config;
      DN[] baseDNs = new DN[cfg.getBackendBaseDN().size()];
      cfg.getBackendBaseDN().toArray(baseDNs);
      setBaseDNs(baseDNs);
      backendDirectory = new BackupDirectory(
          cfg.getBackendDirectory(), null);
    }
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void initializeBackend()
       throws ConfigException, InitializationException
  {
    if ((baseDNs == null) || (baseDNs.length != 1))
    {
      Message message = ERR_MEMORYBACKEND_REQUIRE_EXACTLY_ONE_BASE.get();
      throw new ConfigException(message);
    }
    baseDNSet = new HashSet<DN>();
    for (DN dn : baseDNs)
    {
      baseDNSet.add(dn);
    }
    childDNs = new HashMap<DN,HashSet<DN>>();
    supportedControls = new HashSet<String>();
    supportedFeatures = new HashSet<String>();
    for (DN dn : baseDNs)
    {
      try
      {
        DirectoryServer.registerBaseDN(dn, this, false, false);
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message message = ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(
            dn.toString(), getExceptionMessage(e));
        throw new InitializationException(message, e);
      }
    }
  }
  /**
   * Removes any data that may have been stored in this backend.
   */
  public synchronized void clearMemoryBackend()
  {
    childDNs.clear();
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void finalizeBackend()
  {
    for (DN dn : baseDNs)
    {
      try
      {
        DirectoryServer.deregisterBaseDN(dn, false);
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
    }
  }
  /**
   * {@inheritDoc}
   */
  public DN[] getBaseDNs()
  {
    return baseDNs;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized long getEntryCount()
  {
    return -1;
  }
  /**
   * {@inheritDoc}
   */
  public boolean isLocal()
  {
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized Entry getEntry(DN entryDN)
  {
    return null;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized boolean entryExists(DN entryDN)
  {
    return false;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void addEntry(Entry entry, AddOperation addOperation)
         throws DirectoryException
  {
    Message message = ERR_BACKUP_ADD_NOT_SUPPORTED.get();
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void deleteEntry(DN entryDN,
                                       DeleteOperation deleteOperation)
         throws DirectoryException
  {
    Message message = ERR_BACKUP_DELETE_NOT_SUPPORTED.get();
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void replaceEntry(Entry entry,
                                        ModifyOperation modifyOperation)
         throws DirectoryException
  {
    Message message = ERR_BACKUP_MODIFY_NOT_SUPPORTED.get();
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void renameEntry(DN currentDN, Entry entry,
                                       ModifyDNOperation modifyDNOperation)
         throws DirectoryException
  {
    Message message = ERR_BACKUP_MODIFY_DN_NOT_SUPPORTED.get();
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void search(SearchOperation searchOperation)
         throws DirectoryException
  {
    DN matchedDN = baseDNs[0];
    DN baseDN = searchOperation.getBaseDN();
    // FIXME Remove this error message or replace when implementing
    //       the search.
    Message message =
      ERR_MEMORYBACKEND_ENTRY_DOESNT_EXIST.get(String.valueOf(baseDN));
    throw new DirectoryException(
          ResultCode.NO_SUCH_OBJECT, message, matchedDN, null);
  }
  /**
   * {@inheritDoc}
   */
  public HashSet<String> getSupportedControls()
  {
    return supportedControls;
  }
  /**
   * {@inheritDoc}
   */
  public HashSet<String> getSupportedFeatures()
  {
    return supportedFeatures;
  }
  /**
   * {@inheritDoc}
   */
  public boolean supportsLDIFExport()
  {
    return false;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void exportLDIF(LDIFExportConfig exportConfig)
         throws DirectoryException
  {
    // TODO
  }
  /**
   * {@inheritDoc}
   */
  public boolean supportsLDIFImport()
  {
    return false;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized LDIFImportResult importLDIF(LDIFImportConfig importConfig)
         throws DirectoryException
  {
      return new LDIFImportResult(0, 0, 0);
  }
  /**
   * {@inheritDoc}
   */
  public boolean supportsBackup()
  {
    // This backend does not provide a backup/restore mechanism.
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public boolean supportsBackup(BackupConfig backupConfig,
                                StringBuilder unsupportedReason)
  {
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public void createBackup(BackupConfig backupConfig)
         throws DirectoryException
  {
    BackupManager backupManager =
      new BackupManager(getBackendID());
    backupManager.createBackup(cfg, backupConfig);
  }
  /**
   * {@inheritDoc}
   */
  public void removeBackup(BackupDirectory backupDirectory,
                           String backupID)
         throws DirectoryException
  {
    BackupManager backupManager =
      new BackupManager(getBackendID());
    backupManager.removeBackup(this.backendDirectory, backupID);
  }
  /**
   * {@inheritDoc}
   */
  public boolean supportsRestore()
  {
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public void restoreBackup(RestoreConfig restoreConfig)
         throws DirectoryException
  {
    BackupManager backupManager =
      new BackupManager(getBackendID());
    backupManager.restoreBackup(cfg, restoreConfig);
  }
  /**
   * Retrieves the number of subordinates for the requested entry.
   *
   * @param entryDN The distinguished name of the entry.
   *
   * @return The number of subordinate entries for the requested entry
   *         or -1 if it can not be determined.
   *
   * @throws DirectoryException  If a problem occurs while trying to
   *                              retrieve the entry.
   */
  public long numSubordinates(DN entryDN)
      throws DirectoryException
  {
    Message message = WARN_ROOTDSE_GET_ENTRY_NONROOT.
    get(entryDN.toNormalizedString());
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * Indicates whether the requested entry has any subordinates.
   *
   * @param entryDN The distinguished name of the entry.
   *
   * @return {@code ConditionResult.TRUE} if the entry has one or more
   *         subordinates or {@code ConditionResult.FALSE} otherwise
   *         or {@code ConditionResult.UNDEFINED} if it can not be
   *         determined.
   *
   * @throws DirectoryException  If a problem occurs while trying to
   *                              retrieve the entry.
   */
  public ConditionResult hasSubordinates(DN entryDN)
        throws DirectoryException
  {
    Message message = WARN_ROOTDSE_GET_ENTRY_NONROOT.
      get(entryDN.toNormalizedString());
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * Set the replication server associated with this backend.
   * @param server The replication server.
   */
  public void setServer(ReplicationServer server)
  {
    this.server = server;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -25,16 +25,16 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.ServerConstants.EOL;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -47,22 +47,35 @@
import java.util.List;
import java.util.Set;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.Backend;
import org.opends.server.api.BackupTaskListener;
import org.opends.server.api.ExportTaskListener;
import org.opends.server.api.ImportTaskListener;
import org.opends.server.api.MonitorProvider;
import org.opends.server.api.RestoreTaskListener;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.loggers.LogLevel;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.LDIFReader;
import com.sleepycat.je.DatabaseException;
@@ -77,7 +90,9 @@
 * It is responsible for creating the replication server cache and managing it
 */
public class ReplicationServer extends MonitorProvider<MonitorProviderCfg>
  implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>
  implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>,
             BackupTaskListener, RestoreTaskListener, ImportTaskListener,
             ExportTaskListener
{
  private short serverId;
  private String serverURL;
@@ -108,6 +123,12 @@
  private boolean stopListen = false;
  private ReplSessionSecurity replSessionSecurity;
  // For the backend associated to this replication server,
  // DN of the config entry of the backend
  private DN backendConfigEntryDN;
  // ID of the backend
  private static final String backendId = "replicationChanges";
  /**
   * The tracer object for the debug logger.
   */
@@ -120,11 +141,10 @@
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(ReplicationServerCfg configuration)
         throws ConfigException
    throws ConfigException
  {
    super("Replication Server" + configuration.getReplicationPort());
    shutdown = false;
    replicationPort = configuration.getReplicationPort();
    replicationServerId = (short) configuration.getReplicationServerId();
    replicationServers = configuration.getReplicationServer();
@@ -162,6 +182,21 @@
    initialize(replicationServerId, replicationPort);
    configuration.addChangeListener(this);
    DirectoryServer.registerMonitorProvider(this);
    try
    {
      backendConfigEntryDN = DN.decode(
      "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
    } catch (Exception e) {}
    // Creates the backend associated to this ReplicationServer
    // if it does not exist.
    createBackend();
    DirectoryServer.registerBackupTaskListener(this);
    DirectoryServer.registerRestoreTaskListener(this);
    DirectoryServer.registerExportTaskListener(this);
    DirectoryServer.registerImportTaskListener(this);
  }
@@ -315,6 +350,8 @@
   */
  private void initialize(short changelogId, int changelogPort)
  {
    shutdown = false;
    try
    {
      /*
@@ -458,7 +495,7 @@
      dbEnv.shutdown();
    }
    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
  }
}
  /**
@@ -492,10 +529,7 @@
    }
    catch(Exception e)
    {
      TRACER.debugInfo(
          "In RS <" + getMonitorInstanceName() +
          " Exception in clearGenerationId" +
          stackTraceToSingleLineString(e) + e.getLocalizedMessage());
      TRACER.debugCaught(LogLevel.ALL, e);
    }
  }
@@ -730,4 +764,180 @@
  {
    return serverId;
  }
  /**
   * Creates the backend associated to this replication server.
   * @throws ConfigException
   */
  private void createBackend()
  throws ConfigException
  {
    try
    {
      String ldif = makeLdif(
          "dn: ds-cfg-backend-id="+backendId+",cn=Backends,cn=config",
          "objectClass: top",
          "objectClass: ds-cfg-backend",
          "objectClass: ds-cfg-je-backend",
          "ds-cfg-backend-base-dn: dc="+backendId,
          "ds-cfg-backend-enabled: true",
          "ds-cfg-backend-writability-mode: enabled",
          "ds-cfg-backend-class: " +
            "org.opends.server.replication.server.ReplicationBackend",
          "ds-cfg-backend-id: " + backendId,
          "ds-cfg-backend-import-temp-directory: importTmp",
          "ds-cfg-backend-directory: " + getFileForPath(dbDirname));
      LDIFImportConfig ldifImportConfig = new LDIFImportConfig(
          new StringReader(ldif));
      LDIFReader reader = new LDIFReader(ldifImportConfig);
      Entry backendConfigEntry = reader.readEntry();
      if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
      {
        // Add the replication backend
        DirectoryServer.getConfigHandler().addEntry(backendConfigEntry, null);
      }
    }
    catch(Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(e.getLocalizedMessage());
      Message msg = ERR_CHECK_CREATE_REPL_BACKEND_FAILED.get(mb.toString());
      throw new ConfigException(msg, e);
    }
  }
  private static String makeLdif(String... lines)
  {
    StringBuilder buffer = new StringBuilder();
    for (String line : lines) {
      buffer.append(line).append(EOL);
    }
    // Append an extra line so we can append LDIF Strings.
    buffer.append(EOL);
    return buffer.toString();
  }
  /**
   * Do what needed when the config object related to this replication server
   * is deleted from the server configuration.
   */
  public void remove()
  {
    if (debugEnabled())
      TRACER.debugInfo("RS " +getMonitorInstanceName()+
          " starts removing");
    shutdown();
    removeBackend();
    DirectoryServer.deregisterBackupTaskListener(this);
    DirectoryServer.deregisterRestoreTaskListener(this);
    DirectoryServer.deregisterExportTaskListener(this);
    DirectoryServer.deregisterImportTaskListener(this);
  }
  /**
   * Removes the backend associated to this Replication Server that has been
   * created when this replication server was created.
   */
  protected void removeBackend()
  {
    try
    {
      if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
      {
        // Delete the replication backend
        DirectoryServer.getConfigHandler().deleteEntry(backendConfigEntryDN,
            null);
      }
    }
    catch(Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(e.getLocalizedMessage());
      Message msg = ERR_DELETE_REPL_BACKEND_FAILED.get(mb.toString());
      logError(msg);
    }
  }
  /**
   * {@inheritDoc}
   */
  public void processBackupBegin(Backend backend, BackupConfig config)
  {
    // Nothing is needed at the moment
  }
  /**
   * {@inheritDoc}
   */
  public void processBackupEnd(Backend backend, BackupConfig config,
                               boolean successful)
  {
    // Nothing is needed at the moment
  }
  /**
   * {@inheritDoc}
   */
  public void processRestoreBegin(Backend backend, RestoreConfig config)
  {
    if (backend.getBackendID().equals(backendId))
      shutdown();
  }
  /**
   * {@inheritDoc}
   */
  public void processRestoreEnd(Backend backend, RestoreConfig config,
                                boolean successful)
  {
    if (backend.getBackendID().equals(backendId))
      initialize(this.replicationServerId, this.replicationPort);
  }
  /**
   * {@inheritDoc}
   */
  public void processImportBegin(Backend backend, LDIFImportConfig config)
  {
    // Nothing is needed at the moment
  }
  /**
   * {@inheritDoc}
   */
  public void processImportEnd(Backend backend, LDIFImportConfig config,
                               boolean successful)
  {
    // Nothing is needed at the moment
  }
  /**
   * {@inheritDoc}
   */
  public void processExportBegin(Backend backend, LDIFExportConfig config)
  {
    if (debugEnabled())
      TRACER.debugInfo("RS " +getMonitorInstanceName()+
          " Export starts");
    if (backend.getBackendID().equals(backendId))
    {
      // Retrieves the backend related to this domain
      // backend =
      ReplicationBackend b =
      (ReplicationBackend)DirectoryServer.getBackend(backendId);
      b.setServer(this);
    }
  }
  /**
   * {@inheritDoc}
   */
  public void processExportEnd(Backend backend, LDIFExportConfig config,
                               boolean successful)
  {
    // Nothing is needed at the moment
  }
}
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -100,7 +100,7 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS " + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" RS ":" LS")+
          " reader starting for serverId=" + serverId);
@@ -117,22 +117,11 @@
        if (debugEnabled())
        {
          if (handler.isReplicationServer())
          {
            TRACER.debugInfo(
                "In RS <" + replicationCache.getReplicationServer().
                getMonitorInstanceName() +
                "> from RS server with serverId=" + serverId +
                " receives " + msg);
          }
          else
          {
            TRACER.debugInfo(
                "In RS <" + replicationCache.getReplicationServer().
                getMonitorInstanceName() +
                "> from LDAP server with serverId=" + serverId +
                " receives " + msg);
          }
          TRACER.debugInfo(
              "In RS " + replicationCache.getReplicationServer().
              getMonitorInstanceName() +
              (handler.isReplicationServer()?" From RS ":" From LS")+
              " with serverId=" + serverId + " receives " + msg);
        }
        if (msg instanceof AckMessage)
        {
@@ -271,11 +260,10 @@
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS " + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " reader IO EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e) + e.getLocalizedMessage() +
          e.getCause());
          " reader IO EXCEPTION for serverID=" + serverId
          + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
      Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
      logError(message);
    } catch (ClassNotFoundException e)
@@ -316,10 +304,10 @@
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS " + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " reader CLOSE serverID=" + serverId
          + stackTraceToSingleLineString(new Exception()));
          " server reader for serverID=" + serverId +
          " is closing the session");
      try
      {
        session.close();
@@ -331,10 +319,9 @@
    }
    if (debugEnabled())
      TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS " + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?"RS":"LDAP") +
          " server reader stopped for serverID=" + serverId
          + stackTraceToSingleLineString(new Exception()));
          (handler.isReplicationServer()?" RS":" LDAP") +
          " server reader stopped for serverID=" + serverId);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -300,186 +300,6 @@
    return found;
  }
  /**
   * Add a task to the configuration of the current running DS.
   * @param taskEntry The task to add.
   * @param expectedResult The expected result code for the ADD.
   * @param errorMessageID The expected error messageID when the expected
   * result code is not SUCCESS
   */
  private void addTask(Entry taskEntry, ResultCode expectedResult,
      Message errorMessage)
  {
    try
    {
      debugInfo("AddTask/" + taskEntry);
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      // Add the task.
      AddOperation addOperation =
        connection.processAdd(taskEntry.getDN(),
            taskEntry.getObjectClasses(),
            taskEntry.getUserAttributes(),
            taskEntry.getOperationalAttributes());
      assertEquals(addOperation.getResultCode(), expectedResult,
          "Result of ADD operation of the task is: "
          + addOperation.getResultCode()
          + " Expected:"
          + expectedResult + " Details:" + addOperation.getErrorMessage()
          + addOperation.getAdditionalLogMessage());
      if (expectedResult != ResultCode.SUCCESS)
      {
        assertTrue(addOperation.getErrorMessage().toString().
            startsWith(errorMessage.toString()),
            "Error MsgID of the task <"
            + addOperation.getErrorMessage()
            + "> equals <"
            + errorMessage + ">");
        debugInfo("Create config task: <"+ errorMessage.getDescriptor().getId()
                + addOperation.getErrorMessage() + ">");
      }
      else
      {
        waitTaskState(taskEntry, TaskState.RUNNING, null);
      }
      // Entry will be removed at the end of the test
      entryList.addLast(taskEntry.getDN());
      debugInfo("AddedTask/" + taskEntry.getDN());
    }
    catch(Exception e)
    {
      fail("Exception when adding task:"+ e.getMessage());
    }
  }
  private void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
      Message expectedMessage)
  {
    TaskState taskState = null;
    try
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          fail("Task entry was not returned from the search.");
          continue;
        }
        try
        {
          // Check that the task state is as expected.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          String stateString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          taskState = TaskState.fromString(stateString);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        Thread.sleep(500);
      }
      while ((taskState != expectedTaskState) &&
          (taskState != TaskState.STOPPED_BY_ERROR));
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
          && (taskState != TaskState.RUNNING))
      {
        if (logMessages.size() == 0)
        {
          fail("No log messages were written to the task entry on a failed task");
        }
        else
        {
          if (expectedMessage != null)
          {
            debugInfo(logMessages.get(0));
            debugInfo(expectedMessage.toString());
            assertTrue(logMessages.get(0).indexOf(
                expectedMessage.toString())>0);
          }
        }
      }
      assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
          " Expected task state:" + expectedTaskState);
    }
    catch(Exception e)
    {
      fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Add to the current DB the entries necessary to the test
   */
  private void addTestEntriesToDB(String[] ldifEntries)
  {
    try
    {
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      for (String ldifEntry : ldifEntries)
      {
        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
        AddOperationBasis addOp = new AddOperationBasis(
            connection,
            InternalClientConnection.nextOperationID(),
            InternalClientConnection.nextMessageID(),
            null,
            entry.getDN(),
            entry.getObjectClasses(),
            entry.getUserAttributes(),
            entry.getOperationalAttributes());
        addOp.setInternalOperation(true);
        addOp.run();
        if (addOp.getResultCode() != ResultCode.SUCCESS)
        {
          debugInfo("addEntry: Failed" + addOp.getResultCode());
        }
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /*
   * Creates entries necessary to the test.
   */
@@ -1373,11 +1193,11 @@
    broker3 = null;
    if (replServer1 != null)
      replServer1.shutdown();
      replServer1.remove();
    if (replServer2 != null)
      replServer2.shutdown();
    if (replServer2 != null)
      replServer2.shutdown();
      replServer2.remove();
    if (replServer3 != null)
      replServer3.remove();
    replServer1 = null;
    replServer2 = null;
    replServer3 = null;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -278,67 +278,6 @@
  }
  /**
   * Add a task to the configuration of the current running DS.
   * @param taskEntry The task to add.
   * @param expectedResult The expected result code for the ADD.
   * @param errorMessage The expected error messageID when the expected
   * result code is not SUCCESS
   */
  private void addTask(Entry taskEntry, ResultCode expectedResult,
      Message errorMessage)
  {
    try
    {
      log("AddTask/" + taskEntry);
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      // Add the task.
      AddOperation addOperation =
        connection.processAdd(taskEntry.getDN(),
            taskEntry.getObjectClasses(),
            taskEntry.getUserAttributes(),
            taskEntry.getOperationalAttributes());
      assertEquals(addOperation.getResultCode(), expectedResult,
          "Result of ADD operation of the task is: "
          + addOperation.getResultCode()
          + " Expected:"
          + expectedResult + " Details:" + addOperation.getErrorMessage()
          + addOperation.getAdditionalLogMessage());
      if (expectedResult != ResultCode.SUCCESS)
      {
        assertTrue(addOperation.getErrorMessage().toString().
            startsWith(errorMessage.toString()),
            "Error MsgID of the task <"
            + addOperation.getErrorMessage()
            + "> equals <"
            + errorMessage + ">");
        log("Create config task: <"+ errorMessage.getDescriptor().getId()
                + addOperation.getErrorMessage() + ">");
      }
      else
      {
        waitTaskState(taskEntry, TaskState.RUNNING, null);
      }
      // Entry will be removed at the end of the test
      entryList.addLast(taskEntry.getDN());
      log("AddedTask/" + taskEntry.getDN());
    }
    catch(Exception e)
    {
      fail("Exception when adding task:"+ e.getMessage());
    }
  }
  /**
   * Wait a task to be completed and check the expected state and expected
   * stats.
   * @param taskEntry The task to process.
@@ -454,107 +393,6 @@
    }
  }
  private void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
      Message expectedMessage)
  {
    TaskState taskState = null;
    try
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          // FIXME How is this possible?  Must be issue 858.
          fail("Task entry was not returned from the search.");
          continue;
        }
        try
        {
          // Check that the task state is as expected.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          String stateString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          taskState = TaskState.fromString(stateString);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        try
        {
          // Check that the left counter.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
          resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          // Check that the total counter.
          taskStateType =
           DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
          resultEntry.getAttributeValue(taskStateType,
               DirectoryStringSyntax.DECODER);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        Thread.sleep(2000);
      }
      while ((taskState != expectedTaskState) &&
          (taskState != TaskState.STOPPED_BY_ERROR));
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
          && (taskState != TaskState.RUNNING))
      {
        if (logMessages.size() == 0)
        {
          fail("No log messages were written to the task entry on a failed task");
        }
        else
        {
          if (expectedMessage != null)
          {
            log(logMessages.get(0));
            log(expectedMessage.toString());
            assertTrue(logMessages.get(0).indexOf(
                expectedMessage.toString())>=0);
          }
        }
      }
      assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
          " Expected task state:" + expectedTaskState);
    }
    catch(Exception e)
    {
      fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Add to the current DB the entries necessary to the test
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -26,9 +26,10 @@
 */
package org.opends.server.replication;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -51,8 +52,10 @@
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DeleteOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
@@ -90,6 +93,9 @@
public abstract class ReplicationTestCase extends DirectoryServerTestCase
{
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  /**
  * The internal connection used for operation
  */
@@ -726,4 +732,187 @@
    return new ReplSessionSecurity(null, null, null, true);
  }
  /**
   * Add a task to the configuration of the current running DS.
   * @param taskEntry The task to add.
   * @param expectedResult The expected result code for the ADD.
   * @param errorMessageID The expected error messageID when the expected
   * result code is not SUCCESS
   */
  protected void addTask(Entry taskEntry, ResultCode expectedResult,
      Message errorMessage)
  {
    try
    {
      TRACER.debugInfo("AddTask/" + taskEntry);
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      // Add the task.
      AddOperation addOperation =
        connection.processAdd(taskEntry.getDN(),
            taskEntry.getObjectClasses(),
            taskEntry.getUserAttributes(),
            taskEntry.getOperationalAttributes());
      assertEquals(addOperation.getResultCode(), expectedResult,
          "Result of ADD operation of the task is: "
          + addOperation.getResultCode()
          + " Expected:"
          + expectedResult + " Details:" + addOperation.getErrorMessage()
          + addOperation.getAdditionalLogMessage());
      if (expectedResult != ResultCode.SUCCESS)
      {
        assertTrue(addOperation.getErrorMessage().toString().
            startsWith(errorMessage.toString()),
            "Error MsgID of the task <"
            + addOperation.getErrorMessage()
            + "> equals <"
            + errorMessage + ">");
        TRACER.debugInfo("Create config task: <"+ errorMessage.getDescriptor().getId()
                + addOperation.getErrorMessage() + ">");
      }
      else
      {
        waitTaskState(taskEntry, TaskState.RUNNING, null);
      }
      // Entry will be removed at the end of the test
      entryList.addLast(taskEntry.getDN());
      TRACER.debugInfo("AddedTask/" + taskEntry.getDN());
    }
    catch(Exception e)
    {
      fail("Exception when adding task:"+ e.getMessage());
    }
  }
  protected void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
      Message expectedMessage)
  {
    TaskState taskState = null;
    int cpt=10;
    try
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          fail("Task entry was not returned from the search.");
          continue;
        }
        try
        {
          // Check that the task state is as expected.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          String stateString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          taskState = TaskState.fromString(stateString);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        Thread.sleep(500);
        cpt--;
      }
      while ((taskState != expectedTaskState) &&
             (taskState != TaskState.STOPPED_BY_ERROR) &&
             (taskState != TaskState.COMPLETED_SUCCESSFULLY) &&
             (cpt > 0));
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
          && (taskState != TaskState.RUNNING))
      {
        if (logMessages.size() == 0)
        {
          fail("No log messages were written to the task entry on a failed task");
        }
        else
        {
          TRACER.debugInfo(logMessages.get(0));
          if (expectedMessage != null)
          {
            TRACER.debugInfo(expectedMessage.toString());
            assertTrue(logMessages.get(0).indexOf(
                expectedMessage.toString())>0);
          }
        }
      }
      assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
          " Expected task state:" + expectedTaskState);
    }
    catch(Exception e)
    {
      fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Add to the current DB the entries necessary to the test
   */
  protected void addTestEntriesToDB(String[] ldifEntries)
  {
    try
    {
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      for (String ldifEntry : ldifEntries)
      {
        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
        AddOperationBasis addOp = new AddOperationBasis(
            connection,
            InternalClientConnection.nextOperationID(),
            InternalClientConnection.nextMessageID(),
            null,
            entry.getDN(),
            entry.getObjectClasses(),
            entry.getUserAttributes(),
            entry.getOperationalAttributes());
        addOp.setInternalOperation(true);
        addOp.run();
        if (addOp.getResultCode() != ResultCode.SUCCESS)
        {
          TRACER.debugInfo("Failed to add entry " + entry.getDN() +
              "Result code = : " + addOp.getResultCode());
        }
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -34,6 +34,7 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.OperationContext.*;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -42,11 +43,10 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
@@ -62,6 +62,7 @@
import org.opends.server.types.ModificationType;
import org.opends.server.types.RDN;
import org.opends.server.types.DirectoryConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
import org.testng.annotations.AfterClass;
@@ -413,7 +414,7 @@
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void stopChangelog() throws Exception
  {
    replicationServer.shutdown();
    replicationServer.remove();
    configure();
    newClient();
    newClientWithFirstChanges();
@@ -628,7 +629,7 @@
        ReplServerFakeConfiguration conf =
          new ReplServerFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0,
                                         changelogIds[i], 0, 100, servers);
        replicationServer = new ReplicationServer(conf);
        changelogs[i] = new ReplicationServer(conf);
      }
      ReplicationBroker broker1 = null;
@@ -763,9 +764,9 @@
      finally
      {
        if (changelogs[0] != null)
          changelogs[0].shutdown();
          changelogs[0].remove();
        if (changelogs[1] != null)
          changelogs[1].shutdown();
          changelogs[1].remove();
        if (broker1 != null)
          broker1.stop();
        if (broker2 != null)
@@ -972,4 +973,53 @@
      }
    }
  }
  /*
   * Test backup and restore of the Replication server backend
   */
   @Test(enabled=true)
   public void backupRestore() throws Exception
   {
     debugInfo("Starting backupRestore");
     Entry backupTask = createBackupTask();
     Entry restoreTask = createRestoreTask();
     addTask(backupTask, ResultCode.SUCCESS, null);
     waitTaskState(backupTask, TaskState.COMPLETED_SUCCESSFULLY, null);
     addTask(restoreTask, ResultCode.SUCCESS, null);
     waitTaskState(restoreTask, TaskState.COMPLETED_SUCCESSFULLY, null);
     debugInfo("Ending   backupRestore");
   }
   private Entry createBackupTask()
   throws Exception
   {
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
     "objectclass: top",
     "objectclass: ds-task",
     "objectclass: ds-task-backup",
     "ds-task-class-name: org.opends.server.tasks.BackupTask",
     "ds-backup-directory-path: bak" + File.separator +
                        "replicationChanges",
     "ds-task-backup-backend-id: replicationChanges");
   }
   private Entry createRestoreTask()
   throws Exception
   {
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
     "objectclass: top",
     "objectclass: ds-task",
     "objectclass: ds-task-restore",
     "ds-task-class-name: org.opends.server.tasks.RestoreTask",
     "ds-backup-directory-path: bak" + File.separator +
                        "replicationChanges");
   }
}