mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
07.57.2007 59a0e0cae4a717d92ccf08aa56d427015498becf
opendj-sdk/opends/src/messages/messages/replication.properties
@@ -80,7 +80,7 @@
 changes that this server has already processed on suffix %s
NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \
 server should be configured
NOTICE_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
 communication on domain %s with replication server %s : %s
MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \
 database for base DN %s
@@ -224,4 +224,9 @@
SEVERE_ERR_ERROR_CLEARING_DB_87=While clearing the database %s , the following \
 error happened: %s
 NOTICE_ERR_ROUTING_TO_SERVER_88=Protocol error : a replication server is not expected \
 to be the destination of a message of type %s
 to be the destination of a message of type %s
 SEVERE_ERR_CHECK_CREATE_REPL_BACKEND_FAILED_89=An unexpected error occured when \
 testing existence or creating the replication backend : %s
 SEVERE_ERR_DELETE_REPL_BACKEND_FAILED_90=An unexpected error occured when \
 deleting the replication backend : %s
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -26,10 +26,14 @@
 */
package org.opends.server.replication.plugin;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.MultimasterDomainCfg;
@@ -42,7 +46,6 @@
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.Control;
@@ -66,10 +69,6 @@
import org.opends.server.types.operation.PreOperationDeleteOperation;
import org.opends.server.types.operation.PreOperationModifyDNOperation;
import org.opends.server.types.operation.PreOperationModifyOperation;
import org.opends.messages.Message;
import static org.opends.server.replication.plugin.
              ReplicationRepairRequestControl.*;
/**
 * This class is used to load the Replication code inside the JVM
@@ -86,7 +85,7 @@
                  BackupTaskListener, RestoreTaskListener, ImportTaskListener,
                  ExportTaskListener
{
  private ReplicationServerListener replicationServer = null;
  private ReplicationServerListener replicationServerListener = null;
  private static Map<DN, ReplicationDomain> domains =
    new HashMap<DN, ReplicationDomain>() ;
@@ -193,7 +192,7 @@
      MultimasterSynchronizationProviderCfg configuration)
  throws ConfigException
  {
    replicationServer = new ReplicationServerListener(configuration);
    replicationServerListener = new ReplicationServerListener(configuration);
    // Register as an add and delete listener with the root configuration so we
    // can be notified if Multimaster domain entries are added or removed.
@@ -438,8 +437,8 @@
    }
    // shutdown the ReplicationServer Service if necessary
    if (replicationServer != null)
      replicationServer.shutdown();
    if (replicationServerListener != null)
      replicationServerListener.shutdown();
    DirectoryServer.deregisterBackupTaskListener(this);
    DirectoryServer.deregisterRestoreTaskListener(this);
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -392,7 +392,7 @@
          }
          catch (Exception e)
          {
            Message message = NOTE_EXCEPTION_STARTING_SESSION.get(
            Message message = ERR_EXCEPTION_STARTING_SESSION.get(
                baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
                stackTraceToSingleLineString(e));
            logError(message);
@@ -733,7 +733,7 @@
      if (debugEnabled())
      {
        debugInfo("ReplicationBroker is stopping. and will" +
          "close the connection");
          " close the connection");
      }
      if (session != null)
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -212,9 +212,6 @@
  // Null when none is being processed.
  private IEContext ieContext = null;
  // The backend information necessary to make an import or export.
  private Backend backend;
  private int listenerThreadNumber = 10;
  private Collection<String> replicationServers;
@@ -383,7 +380,7 @@
    monitor = new ReplicationMonitor(this);
    DirectoryServer.registerMonitorProvider(monitor);
    backend = retrievesBackend(baseDN);
    Backend backend = retrievesBackend(baseDN);
    if (backend == null)
    {
      throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
@@ -855,8 +852,6 @@
                                   de.getMessageObject());
                MessageBuilder mb = new MessageBuilder();
                mb.append(de.getMessageObject());
                mb.append("Backend ID: ");
                mb.append(backend.getBackendID());
                TRACER.debugInfo(Message.toString(mb.toMessage()));
                broker.publish(errorMsg);
              }
@@ -2224,10 +2219,8 @@
   */
  public long computeGenerationId() throws DirectoryException
  {
    Backend backend = this.retrievesBackend(baseDN);
    long bec = backend.getEntryCount();
    if (bec<0)
      backend = this.retrievesBackend(baseDN);
    bec = backend.getEntryCount();
    this.acquireIEContext();
    ieContext.checksumOutput = true;
    ieContext.entryCount = (bec<1000?bec:1000);
@@ -2598,9 +2591,7 @@
  protected void exportBackend()
  throws DirectoryException
  {
    // FIXME Temporary workaround - will probably be fixed when implementing
    // dynamic config
    backend = retrievesBackend(this.baseDN);
    Backend backend = retrievesBackend(this.baseDN);
    //  Acquire a shared lock for the backend.
    try
@@ -2938,9 +2929,7 @@
  {
    try
    {
      // FIXME Temporary workaround - will probably be fixed when implementing
      // dynamic config
      backend = retrievesBackend(this.baseDN);
      Backend backend = retrievesBackend(this.baseDN);
      if (!backend.supportsLDIFExport())
      {
@@ -3027,6 +3016,8 @@
    LDIFImportConfig importConfig = null;
    DirectoryException de = null;
    Backend backend = this.retrievesBackend(baseDN);
    if (!backend.supportsLDIFImport())
    {
      Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
@@ -3051,7 +3042,7 @@
      ieContext.entryLeftCount = initializeMessage.getEntryCount();
      ieContext.initImportExportCounters(initializeMessage.getEntryCount());
      preBackendImport(this.backend);
      preBackendImport(backend);
      ieContext.ldifImportInputStream = new ReplLDIFInputStream(this);
      importConfig =
@@ -3066,7 +3057,7 @@
      // ExistingFileBehavior.OVERWRITE);
      // Process import
      this.backend.importLDIF(importConfig);
      backend.importLDIF(importConfig);
      TRACER.debugInfo("The import has ended successfully.");
      stateSavingDisabled = false;
@@ -3083,7 +3074,7 @@
      importConfig.close();
      // Re-enable backend
      closeBackendImport(this.backend);
      closeBackendImport(backend);
      // Update the task that initiated the import
      if ((ieContext != null ) && (ieContext.initializeTask != null))
@@ -3202,7 +3193,7 @@
   */
  public Backend getBackend()
  {
    return backend;
    return retrievesBackend(baseDN);
  }
  /**
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
@@ -25,6 +25,7 @@
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import java.util.List;
@@ -105,7 +106,7 @@
  }
  /**
   * Shutdown the Replication servers.
   * Shutdown the replication server.
   */
  public void shutdown()
  {
@@ -123,7 +124,7 @@
    // replicationServer currently configured.
    if (replicationServer != null)
    {
      replicationServer.shutdown();
      replicationServer.remove();
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
New file
@@ -0,0 +1,493 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.getExceptionMessage;
import java.util.HashMap;
import java.util.HashSet;
import org.opends.messages.Message;
import org.opends.server.admin.Configuration;
import org.opends.server.admin.std.server.BackendCfg;
import org.opends.server.admin.std.server.JEBackendCfg;
import org.opends.server.api.Backend;
import org.opends.server.backends.jeb.BackupManager;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.BackupDirectory;
import org.opends.server.types.ConditionResult;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.Validator;
/**
 * This class defines a backend that stores its information in an
 * associated replication server object.
 * This is primarily intended to take advantage of the backup/restore/
 * import/export of the backend API, and to provide an LDAP access
 * to the replication server database.
 * <BR><BR>
 * Entries stored in this backend are held in the DB associated with
 * the replication server.
 * <BR><BR>
 * Currently are only implemented the create and restore backup features.
 *
 */
public class ReplicationBackend
       extends Backend
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  // The base DNs for this backend.
  private DN[] baseDNs;
  // The mapping between parent DNs and their immediate children.
  private HashMap<DN,HashSet<DN>> childDNs;
  // The base DNs for this backend, in a hash set.
  private HashSet<DN> baseDNSet;
  // The set of supported controls for this backend.
  private HashSet<String> supportedControls;
  // The set of supported features for this backend.
  private HashSet<String> supportedFeatures;
  // The directory associated with this backend.
  private BackupDirectory backendDirectory;
  ReplicationServer server;
  /**
   * The configuration of this backend.
   */
  private JEBackendCfg cfg;
  /**
   * Creates a new backend with the provided information.  All backend
   * implementations must implement a default constructor that use
   * <CODE>super()</CODE> to invoke this constructor.
   */
  public ReplicationBackend()
  {
    super();
    // Perform all initialization in initializeBackend.
  }
  /**
   * Set the base DNs for this backend.  This is used by the unit tests
   * to set the base DNs without having to provide a configuration
   * object when initializing the backend.
   * @param baseDNs The set of base DNs to be served by this memory backend.
   */
  public void setBaseDNs(DN[] baseDNs)
  {
    this.baseDNs = baseDNs;
  }
  /**
   * {@inheritDoc}
   */
  public void configureBackend(Configuration config) throws ConfigException
  {
    if (config != null)
    {
      Validator.ensureTrue(config instanceof BackendCfg);
      cfg = (JEBackendCfg)config;
      DN[] baseDNs = new DN[cfg.getBackendBaseDN().size()];
      cfg.getBackendBaseDN().toArray(baseDNs);
      setBaseDNs(baseDNs);
      backendDirectory = new BackupDirectory(
          cfg.getBackendDirectory(), null);
    }
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void initializeBackend()
       throws ConfigException, InitializationException
  {
    if ((baseDNs == null) || (baseDNs.length != 1))
    {
      Message message = ERR_MEMORYBACKEND_REQUIRE_EXACTLY_ONE_BASE.get();
      throw new ConfigException(message);
    }
    baseDNSet = new HashSet<DN>();
    for (DN dn : baseDNs)
    {
      baseDNSet.add(dn);
    }
    childDNs = new HashMap<DN,HashSet<DN>>();
    supportedControls = new HashSet<String>();
    supportedFeatures = new HashSet<String>();
    for (DN dn : baseDNs)
    {
      try
      {
        DirectoryServer.registerBaseDN(dn, this, false, false);
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message message = ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(
            dn.toString(), getExceptionMessage(e));
        throw new InitializationException(message, e);
      }
    }
  }
  /**
   * Removes any data that may have been stored in this backend.
   */
  public synchronized void clearMemoryBackend()
  {
    childDNs.clear();
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void finalizeBackend()
  {
    for (DN dn : baseDNs)
    {
      try
      {
        DirectoryServer.deregisterBaseDN(dn, false);
      }
      catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
    }
  }
  /**
   * {@inheritDoc}
   */
  public DN[] getBaseDNs()
  {
    return baseDNs;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized long getEntryCount()
  {
    return -1;
  }
  /**
   * {@inheritDoc}
   */
  public boolean isLocal()
  {
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized Entry getEntry(DN entryDN)
  {
    return null;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized boolean entryExists(DN entryDN)
  {
    return false;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void addEntry(Entry entry, AddOperation addOperation)
         throws DirectoryException
  {
    Message message = ERR_BACKUP_ADD_NOT_SUPPORTED.get();
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void deleteEntry(DN entryDN,
                                       DeleteOperation deleteOperation)
         throws DirectoryException
  {
    Message message = ERR_BACKUP_DELETE_NOT_SUPPORTED.get();
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void replaceEntry(Entry entry,
                                        ModifyOperation modifyOperation)
         throws DirectoryException
  {
    Message message = ERR_BACKUP_MODIFY_NOT_SUPPORTED.get();
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void renameEntry(DN currentDN, Entry entry,
                                       ModifyDNOperation modifyDNOperation)
         throws DirectoryException
  {
    Message message = ERR_BACKUP_MODIFY_DN_NOT_SUPPORTED.get();
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void search(SearchOperation searchOperation)
         throws DirectoryException
  {
    DN matchedDN = baseDNs[0];
    DN baseDN = searchOperation.getBaseDN();
    // FIXME Remove this error message or replace when implementing
    //       the search.
    Message message =
      ERR_MEMORYBACKEND_ENTRY_DOESNT_EXIST.get(String.valueOf(baseDN));
    throw new DirectoryException(
          ResultCode.NO_SUCH_OBJECT, message, matchedDN, null);
  }
  /**
   * {@inheritDoc}
   */
  public HashSet<String> getSupportedControls()
  {
    return supportedControls;
  }
  /**
   * {@inheritDoc}
   */
  public HashSet<String> getSupportedFeatures()
  {
    return supportedFeatures;
  }
  /**
   * {@inheritDoc}
   */
  public boolean supportsLDIFExport()
  {
    return false;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void exportLDIF(LDIFExportConfig exportConfig)
         throws DirectoryException
  {
    // TODO
  }
  /**
   * {@inheritDoc}
   */
  public boolean supportsLDIFImport()
  {
    return false;
  }
  /**
   * {@inheritDoc}
   */
  public synchronized LDIFImportResult importLDIF(LDIFImportConfig importConfig)
         throws DirectoryException
  {
      return new LDIFImportResult(0, 0, 0);
  }
  /**
   * {@inheritDoc}
   */
  public boolean supportsBackup()
  {
    // This backend does not provide a backup/restore mechanism.
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public boolean supportsBackup(BackupConfig backupConfig,
                                StringBuilder unsupportedReason)
  {
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public void createBackup(BackupConfig backupConfig)
         throws DirectoryException
  {
    BackupManager backupManager =
      new BackupManager(getBackendID());
    backupManager.createBackup(cfg, backupConfig);
  }
  /**
   * {@inheritDoc}
   */
  public void removeBackup(BackupDirectory backupDirectory,
                           String backupID)
         throws DirectoryException
  {
    BackupManager backupManager =
      new BackupManager(getBackendID());
    backupManager.removeBackup(this.backendDirectory, backupID);
  }
  /**
   * {@inheritDoc}
   */
  public boolean supportsRestore()
  {
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public void restoreBackup(RestoreConfig restoreConfig)
         throws DirectoryException
  {
    BackupManager backupManager =
      new BackupManager(getBackendID());
    backupManager.restoreBackup(cfg, restoreConfig);
  }
  /**
   * Retrieves the number of subordinates for the requested entry.
   *
   * @param entryDN The distinguished name of the entry.
   *
   * @return The number of subordinate entries for the requested entry
   *         or -1 if it can not be determined.
   *
   * @throws DirectoryException  If a problem occurs while trying to
   *                              retrieve the entry.
   */
  public long numSubordinates(DN entryDN)
      throws DirectoryException
  {
    Message message = WARN_ROOTDSE_GET_ENTRY_NONROOT.
    get(entryDN.toNormalizedString());
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * Indicates whether the requested entry has any subordinates.
   *
   * @param entryDN The distinguished name of the entry.
   *
   * @return {@code ConditionResult.TRUE} if the entry has one or more
   *         subordinates or {@code ConditionResult.FALSE} otherwise
   *         or {@code ConditionResult.UNDEFINED} if it can not be
   *         determined.
   *
   * @throws DirectoryException  If a problem occurs while trying to
   *                              retrieve the entry.
   */
  public ConditionResult hasSubordinates(DN entryDN)
        throws DirectoryException
  {
    Message message = WARN_ROOTDSE_GET_ENTRY_NONROOT.
      get(entryDN.toNormalizedString());
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
  }
  /**
   * Set the replication server associated with this backend.
   * @param server The replication server.
   */
  public void setServer(ReplicationServer server)
  {
    this.server = server;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -25,16 +25,16 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.ServerConstants.EOL;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -47,22 +47,35 @@
import java.util.List;
import java.util.Set;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.Backend;
import org.opends.server.api.BackupTaskListener;
import org.opends.server.api.ExportTaskListener;
import org.opends.server.api.ImportTaskListener;
import org.opends.server.api.MonitorProvider;
import org.opends.server.api.RestoreTaskListener;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.loggers.LogLevel;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.LDIFReader;
import com.sleepycat.je.DatabaseException;
@@ -77,7 +90,9 @@
 * It is responsible for creating the replication server cache and managing it
 */
public class ReplicationServer extends MonitorProvider<MonitorProviderCfg>
  implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>
  implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>,
             BackupTaskListener, RestoreTaskListener, ImportTaskListener,
             ExportTaskListener
{
  private short serverId;
  private String serverURL;
@@ -108,6 +123,12 @@
  private boolean stopListen = false;
  private ReplSessionSecurity replSessionSecurity;
  // For the backend associated to this replication server,
  // DN of the config entry of the backend
  private DN backendConfigEntryDN;
  // ID of the backend
  private static final String backendId = "replicationChanges";
  /**
   * The tracer object for the debug logger.
   */
@@ -120,11 +141,10 @@
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(ReplicationServerCfg configuration)
         throws ConfigException
    throws ConfigException
  {
    super("Replication Server" + configuration.getReplicationPort());
    shutdown = false;
    replicationPort = configuration.getReplicationPort();
    replicationServerId = (short) configuration.getReplicationServerId();
    replicationServers = configuration.getReplicationServer();
@@ -162,6 +182,21 @@
    initialize(replicationServerId, replicationPort);
    configuration.addChangeListener(this);
    DirectoryServer.registerMonitorProvider(this);
    try
    {
      backendConfigEntryDN = DN.decode(
      "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
    } catch (Exception e) {}
    // Creates the backend associated to this ReplicationServer
    // if it does not exist.
    createBackend();
    DirectoryServer.registerBackupTaskListener(this);
    DirectoryServer.registerRestoreTaskListener(this);
    DirectoryServer.registerExportTaskListener(this);
    DirectoryServer.registerImportTaskListener(this);
  }
@@ -315,6 +350,8 @@
   */
  private void initialize(short changelogId, int changelogPort)
  {
    shutdown = false;
    try
    {
      /*
@@ -458,7 +495,7 @@
      dbEnv.shutdown();
    }
    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
  }
}
  /**
@@ -492,10 +529,7 @@
    }
    catch(Exception e)
    {
      TRACER.debugInfo(
          "In RS <" + getMonitorInstanceName() +
          " Exception in clearGenerationId" +
          stackTraceToSingleLineString(e) + e.getLocalizedMessage());
      TRACER.debugCaught(LogLevel.ALL, e);
    }
  }
@@ -730,4 +764,180 @@
  {
    return serverId;
  }
  /**
   * Creates the backend associated to this replication server.
   * @throws ConfigException
   */
  private void createBackend()
  throws ConfigException
  {
    try
    {
      String ldif = makeLdif(
          "dn: ds-cfg-backend-id="+backendId+",cn=Backends,cn=config",
          "objectClass: top",
          "objectClass: ds-cfg-backend",
          "objectClass: ds-cfg-je-backend",
          "ds-cfg-backend-base-dn: dc="+backendId,
          "ds-cfg-backend-enabled: true",
          "ds-cfg-backend-writability-mode: enabled",
          "ds-cfg-backend-class: " +
            "org.opends.server.replication.server.ReplicationBackend",
          "ds-cfg-backend-id: " + backendId,
          "ds-cfg-backend-import-temp-directory: importTmp",
          "ds-cfg-backend-directory: " + getFileForPath(dbDirname));
      LDIFImportConfig ldifImportConfig = new LDIFImportConfig(
          new StringReader(ldif));
      LDIFReader reader = new LDIFReader(ldifImportConfig);
      Entry backendConfigEntry = reader.readEntry();
      if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
      {
        // Add the replication backend
        DirectoryServer.getConfigHandler().addEntry(backendConfigEntry, null);
      }
    }
    catch(Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(e.getLocalizedMessage());
      Message msg = ERR_CHECK_CREATE_REPL_BACKEND_FAILED.get(mb.toString());
      throw new ConfigException(msg, e);
    }
  }
  private static String makeLdif(String... lines)
  {
    StringBuilder buffer = new StringBuilder();
    for (String line : lines) {
      buffer.append(line).append(EOL);
    }
    // Append an extra line so we can append LDIF Strings.
    buffer.append(EOL);
    return buffer.toString();
  }
  /**
   * Do what needed when the config object related to this replication server
   * is deleted from the server configuration.
   */
  public void remove()
  {
    if (debugEnabled())
      TRACER.debugInfo("RS " +getMonitorInstanceName()+
          " starts removing");
    shutdown();
    removeBackend();
    DirectoryServer.deregisterBackupTaskListener(this);
    DirectoryServer.deregisterRestoreTaskListener(this);
    DirectoryServer.deregisterExportTaskListener(this);
    DirectoryServer.deregisterImportTaskListener(this);
  }
  /**
   * Removes the backend associated to this Replication Server that has been
   * created when this replication server was created.
   */
  protected void removeBackend()
  {
    try
    {
      if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
      {
        // Delete the replication backend
        DirectoryServer.getConfigHandler().deleteEntry(backendConfigEntryDN,
            null);
      }
    }
    catch(Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(e.getLocalizedMessage());
      Message msg = ERR_DELETE_REPL_BACKEND_FAILED.get(mb.toString());
      logError(msg);
    }
  }
  /**
   * {@inheritDoc}
   */
  public void processBackupBegin(Backend backend, BackupConfig config)
  {
    // Nothing is needed at the moment
  }
  /**
   * {@inheritDoc}
   */
  public void processBackupEnd(Backend backend, BackupConfig config,
                               boolean successful)
  {
    // Nothing is needed at the moment
  }
  /**
   * {@inheritDoc}
   */
  public void processRestoreBegin(Backend backend, RestoreConfig config)
  {
    if (backend.getBackendID().equals(backendId))
      shutdown();
  }
  /**
   * {@inheritDoc}
   */
  public void processRestoreEnd(Backend backend, RestoreConfig config,
                                boolean successful)
  {
    if (backend.getBackendID().equals(backendId))
      initialize(this.replicationServerId, this.replicationPort);
  }
  /**
   * {@inheritDoc}
   */
  public void processImportBegin(Backend backend, LDIFImportConfig config)
  {
    // Nothing is needed at the moment
  }
  /**
   * {@inheritDoc}
   */
  public void processImportEnd(Backend backend, LDIFImportConfig config,
                               boolean successful)
  {
    // Nothing is needed at the moment
  }
  /**
   * {@inheritDoc}
   */
  public void processExportBegin(Backend backend, LDIFExportConfig config)
  {
    if (debugEnabled())
      TRACER.debugInfo("RS " +getMonitorInstanceName()+
          " Export starts");
    if (backend.getBackendID().equals(backendId))
    {
      // Retrieves the backend related to this domain
      // backend =
      ReplicationBackend b =
      (ReplicationBackend)DirectoryServer.getBackend(backendId);
      b.setServer(this);
    }
  }
  /**
   * {@inheritDoc}
   */
  public void processExportEnd(Backend backend, LDIFExportConfig config,
                               boolean successful)
  {
    // Nothing is needed at the moment
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -100,7 +100,7 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS " + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" RS ":" LS")+
          " reader starting for serverId=" + serverId);
@@ -117,22 +117,11 @@
        if (debugEnabled())
        {
          if (handler.isReplicationServer())
          {
            TRACER.debugInfo(
                "In RS <" + replicationCache.getReplicationServer().
                getMonitorInstanceName() +
                "> from RS server with serverId=" + serverId +
                " receives " + msg);
          }
          else
          {
            TRACER.debugInfo(
                "In RS <" + replicationCache.getReplicationServer().
                getMonitorInstanceName() +
                "> from LDAP server with serverId=" + serverId +
                " receives " + msg);
          }
          TRACER.debugInfo(
              "In RS " + replicationCache.getReplicationServer().
              getMonitorInstanceName() +
              (handler.isReplicationServer()?" From RS ":" From LS")+
              " with serverId=" + serverId + " receives " + msg);
        }
        if (msg instanceof AckMessage)
        {
@@ -271,11 +260,10 @@
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS " + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " reader IO EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e) + e.getLocalizedMessage() +
          e.getCause());
          " reader IO EXCEPTION for serverID=" + serverId
          + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
      Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
      logError(message);
    } catch (ClassNotFoundException e)
@@ -316,10 +304,10 @@
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS " + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          " reader CLOSE serverID=" + serverId
          + stackTraceToSingleLineString(new Exception()));
          " server reader for serverID=" + serverId +
          " is closing the session");
      try
      {
        session.close();
@@ -331,10 +319,9 @@
    }
    if (debugEnabled())
      TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS " + replicationCache.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?"RS":"LDAP") +
          " server reader stopped for serverID=" + serverId
          + stackTraceToSingleLineString(new Exception()));
          (handler.isReplicationServer()?" RS":" LDAP") +
          " server reader stopped for serverID=" + serverId);
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -300,186 +300,6 @@
    return found;
  }
  /**
   * Add a task to the configuration of the current running DS.
   * @param taskEntry The task to add.
   * @param expectedResult The expected result code for the ADD.
   * @param errorMessageID The expected error messageID when the expected
   * result code is not SUCCESS
   */
  private void addTask(Entry taskEntry, ResultCode expectedResult,
      Message errorMessage)
  {
    try
    {
      debugInfo("AddTask/" + taskEntry);
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      // Add the task.
      AddOperation addOperation =
        connection.processAdd(taskEntry.getDN(),
            taskEntry.getObjectClasses(),
            taskEntry.getUserAttributes(),
            taskEntry.getOperationalAttributes());
      assertEquals(addOperation.getResultCode(), expectedResult,
          "Result of ADD operation of the task is: "
          + addOperation.getResultCode()
          + " Expected:"
          + expectedResult + " Details:" + addOperation.getErrorMessage()
          + addOperation.getAdditionalLogMessage());
      if (expectedResult != ResultCode.SUCCESS)
      {
        assertTrue(addOperation.getErrorMessage().toString().
            startsWith(errorMessage.toString()),
            "Error MsgID of the task <"
            + addOperation.getErrorMessage()
            + "> equals <"
            + errorMessage + ">");
        debugInfo("Create config task: <"+ errorMessage.getDescriptor().getId()
                + addOperation.getErrorMessage() + ">");
      }
      else
      {
        waitTaskState(taskEntry, TaskState.RUNNING, null);
      }
      // Entry will be removed at the end of the test
      entryList.addLast(taskEntry.getDN());
      debugInfo("AddedTask/" + taskEntry.getDN());
    }
    catch(Exception e)
    {
      fail("Exception when adding task:"+ e.getMessage());
    }
  }
  private void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
      Message expectedMessage)
  {
    TaskState taskState = null;
    try
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          fail("Task entry was not returned from the search.");
          continue;
        }
        try
        {
          // Check that the task state is as expected.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          String stateString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          taskState = TaskState.fromString(stateString);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        Thread.sleep(500);
      }
      while ((taskState != expectedTaskState) &&
          (taskState != TaskState.STOPPED_BY_ERROR));
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
          && (taskState != TaskState.RUNNING))
      {
        if (logMessages.size() == 0)
        {
          fail("No log messages were written to the task entry on a failed task");
        }
        else
        {
          if (expectedMessage != null)
          {
            debugInfo(logMessages.get(0));
            debugInfo(expectedMessage.toString());
            assertTrue(logMessages.get(0).indexOf(
                expectedMessage.toString())>0);
          }
        }
      }
      assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
          " Expected task state:" + expectedTaskState);
    }
    catch(Exception e)
    {
      fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Add to the current DB the entries necessary to the test
   */
  private void addTestEntriesToDB(String[] ldifEntries)
  {
    try
    {
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      for (String ldifEntry : ldifEntries)
      {
        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
        AddOperationBasis addOp = new AddOperationBasis(
            connection,
            InternalClientConnection.nextOperationID(),
            InternalClientConnection.nextMessageID(),
            null,
            entry.getDN(),
            entry.getObjectClasses(),
            entry.getUserAttributes(),
            entry.getOperationalAttributes());
        addOp.setInternalOperation(true);
        addOp.run();
        if (addOp.getResultCode() != ResultCode.SUCCESS)
        {
          debugInfo("addEntry: Failed" + addOp.getResultCode());
        }
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /*
   * Creates entries necessary to the test.
   */
@@ -1373,11 +1193,11 @@
    broker3 = null;
    if (replServer1 != null)
      replServer1.shutdown();
      replServer1.remove();
    if (replServer2 != null)
      replServer2.shutdown();
    if (replServer2 != null)
      replServer2.shutdown();
      replServer2.remove();
    if (replServer3 != null)
      replServer3.remove();
    replServer1 = null;
    replServer2 = null;
    replServer3 = null;
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -278,67 +278,6 @@
  }
  /**
   * Add a task to the configuration of the current running DS.
   * @param taskEntry The task to add.
   * @param expectedResult The expected result code for the ADD.
   * @param errorMessage The expected error messageID when the expected
   * result code is not SUCCESS
   */
  private void addTask(Entry taskEntry, ResultCode expectedResult,
      Message errorMessage)
  {
    try
    {
      log("AddTask/" + taskEntry);
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      // Add the task.
      AddOperation addOperation =
        connection.processAdd(taskEntry.getDN(),
            taskEntry.getObjectClasses(),
            taskEntry.getUserAttributes(),
            taskEntry.getOperationalAttributes());
      assertEquals(addOperation.getResultCode(), expectedResult,
          "Result of ADD operation of the task is: "
          + addOperation.getResultCode()
          + " Expected:"
          + expectedResult + " Details:" + addOperation.getErrorMessage()
          + addOperation.getAdditionalLogMessage());
      if (expectedResult != ResultCode.SUCCESS)
      {
        assertTrue(addOperation.getErrorMessage().toString().
            startsWith(errorMessage.toString()),
            "Error MsgID of the task <"
            + addOperation.getErrorMessage()
            + "> equals <"
            + errorMessage + ">");
        log("Create config task: <"+ errorMessage.getDescriptor().getId()
                + addOperation.getErrorMessage() + ">");
      }
      else
      {
        waitTaskState(taskEntry, TaskState.RUNNING, null);
      }
      // Entry will be removed at the end of the test
      entryList.addLast(taskEntry.getDN());
      log("AddedTask/" + taskEntry.getDN());
    }
    catch(Exception e)
    {
      fail("Exception when adding task:"+ e.getMessage());
    }
  }
  /**
   * Wait a task to be completed and check the expected state and expected
   * stats.
   * @param taskEntry The task to process.
@@ -454,107 +393,6 @@
    }
  }
  private void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
      Message expectedMessage)
  {
    TaskState taskState = null;
    try
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          // FIXME How is this possible?  Must be issue 858.
          fail("Task entry was not returned from the search.");
          continue;
        }
        try
        {
          // Check that the task state is as expected.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          String stateString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          taskState = TaskState.fromString(stateString);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        try
        {
          // Check that the left counter.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
          resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          // Check that the total counter.
          taskStateType =
           DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
          resultEntry.getAttributeValue(taskStateType,
               DirectoryStringSyntax.DECODER);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        Thread.sleep(2000);
      }
      while ((taskState != expectedTaskState) &&
          (taskState != TaskState.STOPPED_BY_ERROR));
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
          && (taskState != TaskState.RUNNING))
      {
        if (logMessages.size() == 0)
        {
          fail("No log messages were written to the task entry on a failed task");
        }
        else
        {
          if (expectedMessage != null)
          {
            log(logMessages.get(0));
            log(expectedMessage.toString());
            assertTrue(logMessages.get(0).indexOf(
                expectedMessage.toString())>=0);
          }
        }
      }
      assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
          " Expected task state:" + expectedTaskState);
    }
    catch(Exception e)
    {
      fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Add to the current DB the entries necessary to the test
   */
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -26,9 +26,10 @@
 */
package org.opends.server.replication;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -51,8 +52,10 @@
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DeleteOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
@@ -90,6 +93,9 @@
public abstract class ReplicationTestCase extends DirectoryServerTestCase
{
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  /**
  * The internal connection used for operation
  */
@@ -726,4 +732,187 @@
    return new ReplSessionSecurity(null, null, null, true);
  }
  /**
   * Add a task to the configuration of the current running DS.
   * @param taskEntry The task to add.
   * @param expectedResult The expected result code for the ADD.
   * @param errorMessageID The expected error messageID when the expected
   * result code is not SUCCESS
   */
  protected void addTask(Entry taskEntry, ResultCode expectedResult,
      Message errorMessage)
  {
    try
    {
      TRACER.debugInfo("AddTask/" + taskEntry);
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      // Add the task.
      AddOperation addOperation =
        connection.processAdd(taskEntry.getDN(),
            taskEntry.getObjectClasses(),
            taskEntry.getUserAttributes(),
            taskEntry.getOperationalAttributes());
      assertEquals(addOperation.getResultCode(), expectedResult,
          "Result of ADD operation of the task is: "
          + addOperation.getResultCode()
          + " Expected:"
          + expectedResult + " Details:" + addOperation.getErrorMessage()
          + addOperation.getAdditionalLogMessage());
      if (expectedResult != ResultCode.SUCCESS)
      {
        assertTrue(addOperation.getErrorMessage().toString().
            startsWith(errorMessage.toString()),
            "Error MsgID of the task <"
            + addOperation.getErrorMessage()
            + "> equals <"
            + errorMessage + ">");
        TRACER.debugInfo("Create config task: <"+ errorMessage.getDescriptor().getId()
                + addOperation.getErrorMessage() + ">");
      }
      else
      {
        waitTaskState(taskEntry, TaskState.RUNNING, null);
      }
      // Entry will be removed at the end of the test
      entryList.addLast(taskEntry.getDN());
      TRACER.debugInfo("AddedTask/" + taskEntry.getDN());
    }
    catch(Exception e)
    {
      fail("Exception when adding task:"+ e.getMessage());
    }
  }
  protected void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
      Message expectedMessage)
  {
    TaskState taskState = null;
    int cpt=10;
    try
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          fail("Task entry was not returned from the search.");
          continue;
        }
        try
        {
          // Check that the task state is as expected.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          String stateString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          taskState = TaskState.fromString(stateString);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        Thread.sleep(500);
        cpt--;
      }
      while ((taskState != expectedTaskState) &&
             (taskState != TaskState.STOPPED_BY_ERROR) &&
             (taskState != TaskState.COMPLETED_SUCCESSFULLY) &&
             (cpt > 0));
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
          && (taskState != TaskState.RUNNING))
      {
        if (logMessages.size() == 0)
        {
          fail("No log messages were written to the task entry on a failed task");
        }
        else
        {
          TRACER.debugInfo(logMessages.get(0));
          if (expectedMessage != null)
          {
            TRACER.debugInfo(expectedMessage.toString());
            assertTrue(logMessages.get(0).indexOf(
                expectedMessage.toString())>0);
          }
        }
      }
      assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
          " Expected task state:" + expectedTaskState);
    }
    catch(Exception e)
    {
      fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
  /**
   * Add to the current DB the entries necessary to the test
   */
  protected void addTestEntriesToDB(String[] ldifEntries)
  {
    try
    {
      // Change config of DS to launch the total update task
      InternalClientConnection connection =
        InternalClientConnection.getRootConnection();
      for (String ldifEntry : ldifEntries)
      {
        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
        AddOperationBasis addOp = new AddOperationBasis(
            connection,
            InternalClientConnection.nextOperationID(),
            InternalClientConnection.nextMessageID(),
            null,
            entry.getDN(),
            entry.getObjectClasses(),
            entry.getUserAttributes(),
            entry.getOperationalAttributes());
        addOp.setInternalOperation(true);
        addOp.run();
        if (addOp.getResultCode() != ResultCode.SUCCESS)
        {
          TRACER.debugInfo("Failed to add entry " + entry.getDN() +
              "Result code = : " + addOp.getResultCode());
        }
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -34,6 +34,7 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.OperationContext.*;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -42,11 +43,10 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
@@ -62,6 +62,7 @@
import org.opends.server.types.ModificationType;
import org.opends.server.types.RDN;
import org.opends.server.types.DirectoryConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
import org.testng.annotations.AfterClass;
@@ -413,7 +414,7 @@
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void stopChangelog() throws Exception
  {
    replicationServer.shutdown();
    replicationServer.remove();
    configure();
    newClient();
    newClientWithFirstChanges();
@@ -628,7 +629,7 @@
        ReplServerFakeConfiguration conf =
          new ReplServerFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0,
                                         changelogIds[i], 0, 100, servers);
        replicationServer = new ReplicationServer(conf);
        changelogs[i] = new ReplicationServer(conf);
      }
      ReplicationBroker broker1 = null;
@@ -763,9 +764,9 @@
      finally
      {
        if (changelogs[0] != null)
          changelogs[0].shutdown();
          changelogs[0].remove();
        if (changelogs[1] != null)
          changelogs[1].shutdown();
          changelogs[1].remove();
        if (broker1 != null)
          broker1.stop();
        if (broker2 != null)
@@ -972,4 +973,53 @@
      }
    }
  }
  /*
   * Test backup and restore of the Replication server backend
   */
   @Test(enabled=true)
   public void backupRestore() throws Exception
   {
     debugInfo("Starting backupRestore");
     Entry backupTask = createBackupTask();
     Entry restoreTask = createRestoreTask();
     addTask(backupTask, ResultCode.SUCCESS, null);
     waitTaskState(backupTask, TaskState.COMPLETED_SUCCESSFULLY, null);
     addTask(restoreTask, ResultCode.SUCCESS, null);
     waitTaskState(restoreTask, TaskState.COMPLETED_SUCCESSFULLY, null);
     debugInfo("Ending   backupRestore");
   }
   private Entry createBackupTask()
   throws Exception
   {
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
     "objectclass: top",
     "objectclass: ds-task",
     "objectclass: ds-task-backup",
     "ds-task-class-name: org.opends.server.tasks.BackupTask",
     "ds-backup-directory-path: bak" + File.separator +
                        "replicationChanges",
     "ds-task-backup-backend-id: replicationChanges");
   }
   private Entry createRestoreTask()
   throws Exception
   {
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
     "objectclass: top",
     "objectclass: ds-task",
     "objectclass: ds-task-restore",
     "ds-task-class-name: org.opends.server.tasks.RestoreTask",
     "ds-backup-directory-path: bak" + File.separator +
                        "replicationChanges");
   }
}