From d04fb0f282e0fd9a4bc80d3f9d5ee15506a3b83b Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk

---
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 1874 ++++++++++++----------------------------------------------
 1 files changed, 399 insertions(+), 1,475 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
similarity index 65%
rename from opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
rename to opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index ed583c6..87cb100 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -25,6 +25,7 @@
  *      Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
+
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.messages.ToolMessages.*;
 import static org.opends.server.loggers.ErrorLogger.logError;
@@ -36,28 +37,31 @@
 import static org.opends.server.util.StaticUtils.createEntry;
 import static org.opends.server.util.StaticUtils.getFileForPath;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import static org.opends.server.replication.common.StatusMachine.*;
+
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
+
+import org.opends.server.replication.service.ReplicationMonitor;
+
+import java.util.Collection;
+
+import org.opends.server.types.Attributes;
 
 import java.io.File;
-import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.SocketTimeoutException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CheckedOutputStream;
 import java.util.zip.DataFormatException;
-
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.server.ConfigurationChangeListener;
@@ -68,7 +72,6 @@
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.api.SynchronizationProvider;
 import org.opends.server.backends.jeb.BackendImpl;
-import org.opends.server.backends.task.Task;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
@@ -82,49 +85,31 @@
 import org.opends.server.protocols.asn1.ASN1Exception;
 import org.opends.server.protocols.asn1.ASN1OctetString;
 import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchListener;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.LDAPAttribute;
 import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.protocols.ldap.LDAPModification;
+import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.replication.common.AssuredMode;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachine;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AckMsg;
 import org.opends.server.replication.protocol.AddContext;
 import org.opends.server.replication.protocol.AddMsg;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
 import org.opends.server.replication.protocol.DeleteContext;
-import org.opends.server.replication.protocol.DoneMsg;
-import org.opends.server.replication.protocol.EntryMsg;
-import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.HeartbeatMsg;
-import org.opends.server.replication.protocol.InitializeRequestMsg;
-import org.opends.server.replication.protocol.InitializeTargetMsg;
 import org.opends.server.replication.protocol.ModifyContext;
 import org.opends.server.replication.protocol.ModifyDNMsg;
 import org.opends.server.replication.protocol.ModifyDnContext;
 import org.opends.server.replication.protocol.ModifyMsg;
 import org.opends.server.replication.protocol.OperationContext;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.tasks.InitializeTargetTask;
-import org.opends.server.tasks.InitializeTask;
 import org.opends.server.tasks.TaskUtils;
-import org.opends.server.types.AttributeBuilder;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.ExistingFileBehavior;
 import org.opends.server.types.AbstractOperation;
 import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeBuilder;
 import org.opends.server.types.AttributeType;
 import org.opends.server.types.AttributeValue;
 import org.opends.server.types.ConfigChangeResult;
@@ -133,6 +118,7 @@
 import org.opends.server.types.DereferencePolicy;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
+import org.opends.server.types.ExistingFileBehavior;
 import org.opends.server.types.LDAPException;
 import org.opends.server.types.LDIFExportConfig;
 import org.opends.server.types.LDIFImportConfig;
@@ -144,6 +130,7 @@
 import org.opends.server.types.ResultCode;
 import org.opends.server.types.SearchFilter;
 import org.opends.server.types.SearchResultEntry;
+import org.opends.server.types.SearchResultReference;
 import org.opends.server.types.SearchScope;
 import org.opends.server.types.SynchronizationProviderResult;
 import org.opends.server.types.operation.PluginOperation;
@@ -163,9 +150,9 @@
  *  handle conflict resolution,
  *  handle protocol messages from the replicationServer.
  */
-public class ReplicationDomain extends DirectoryThread
+public class LDAPReplicationDomain extends ReplicationDomain
        implements ConfigurationChangeListener<ReplicationDomainCfg>,
-                  AlertGenerator
+                  AlertGenerator, InternalSearchListener
 {
   /**
    * The fully-qualified name of this class.
@@ -185,37 +172,20 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private ReplicationMonitor monitor;
-
-  private ReplicationBroker broker;
-  // Thread waiting for incoming update messages for this domain and pushing
-  // them to the global incoming update message queue for later processing by
-  // replay threads.
-  private ListenerThread listenerThread;
   // The update to replay message queue where the listener thread is going to
   // push incoming update messages.
   private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
-  private SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs =
-    new TreeMap<ChangeNumber, UpdateMsg>();
-  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
-  private AtomicInteger numSentUpdates = new AtomicInteger(0);
-  private AtomicInteger numProcessedUpdates = new AtomicInteger();
   private AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
   private AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
   private AtomicInteger numUnresolvedNamingConflicts = new AtomicInteger();
   private int debugCount = 0;
-  private PersistentServerState state;
+  private final PersistentServerState state;
   private int numReplayedPostOpCalled = 0;
 
-  private int maxReceiveQueue = 0;
-  private int maxSendQueue = 0;
-  private int maxReceiveDelay = 0;
-  private int maxSendDelay = 0;
-
   private long generationId = -1;
   private boolean generationIdSavedStatus = false;
 
-  ChangeNumberGenerator generator;
+  private ChangeNumberGenerator generator;
 
   /**
    * This object is used to store the list of update currently being
@@ -235,19 +205,8 @@
    */
   private RemotePendingChanges remotePendingChanges;
 
-  /**
-   * The time in milliseconds between heartbeats from the replication
-   * server.  Zero means heartbeats are off.
-   */
-  private long heartbeatInterval = 0;
   private short serverId;
 
-  // The context related to an import or export being processed
-  // Null when none is being processed.
-  private IEContext ieContext = null;
-
-  private Collection<String> replicationServers;
-
   private DN baseDn;
 
   private boolean shutdown = false;
@@ -260,37 +219,10 @@
   private boolean disabled = false;
   private boolean stateSavingDisabled = false;
 
-  private int window = 100;
-
-  /*
-   * Assured mode properties
-   */
-  // Is assured mode enabled or not for this domain ?
-  private boolean assured = false;
-  // Assured sub mode (used when assured is true)
-  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
-  // Safe Data level (used when assuredMode is SAFE_DATA)
-  private byte assuredSdLevel = (byte)1;
-  // Timeout (in milliseconds) when waiting for acknowledgments
-  private long assuredTimeout = 1000;
-
-  // Group id
-  private byte groupId = (byte)1;
-  // Referrals urls to be published to other servers of the topology
-  // TODO: fill that with all currently opened urls if no urls configured
-  private List<String> refUrls = new ArrayList<String>();
-
-  // Current status for this replicated domain
-  private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
-
-  /*
-   * Properties for the last topology info received from the network.
-   */
-  // Info for other DSs.
-  // Warning: does not contain info for us (for our server id)
-  private List<DSInfo> dsList = new ArrayList<DSInfo>();
-  // Info for other RSs.
-  private List<RSInfo> rsList = new ArrayList<RSInfo>();
+  // This list is used to temporary store operations that needs
+  // to be replayed at session establishment time.
+  private TreeSet<FakeOperation> replayOperations  =
+    new TreeSet<FakeOperation>(new FakeOperationComparator());;
 
   /**
    * The isolation policy that this domain is going to use.
@@ -312,131 +244,47 @@
    */
   private boolean done = true;
 
+  private ServerStateFlush flushThread;
+
   /**
-   * This class contain the context related to an import or export
-   * launched on the domain.
+   * The thread that periodically saves the ServerState of this
+   * LDAPReplicationDomain in the database.
    */
-  private class IEContext
+  private class  ServerStateFlush extends DirectoryThread
   {
-    // The task that initiated the operation.
-    Task initializeTask;
-    // The input stream for the import
-    ReplLDIFInputStream ldifImportInputStream = null;
-    // The target in the case of an export
-    short exportTarget = RoutableMsg.UNKNOWN_SERVER;
-    // The source in the case of an import
-    short importSource = RoutableMsg.UNKNOWN_SERVER;
-
-    // The total entry count expected to be processed
-    long entryCount = 0;
-    // The count for the entry not yet processed
-    long entryLeftCount = 0;
-
-    // The exception raised when any
-    DirectoryException exception = null;
-
-    /**
-     * Initializes the import/export counters with the provider value.
-     * @param total
-     * @param left
-     * @throws DirectoryException
-     */
-    public void setCounters(long total, long left)
-      throws DirectoryException
+    protected ServerStateFlush()
     {
-      entryCount = total;
-      entryLeftCount = left;
-
-      if (initializeTask != null)
-      {
-        if (initializeTask instanceof InitializeTask)
-        {
-          ((InitializeTask)initializeTask).setTotal(entryCount);
-          ((InitializeTask)initializeTask).setLeft(entryCount);
-        }
-        else if (initializeTask instanceof InitializeTargetTask)
-        {
-          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
-          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
-        }
-      }
-    }
-
-    /**
-     * Update the counters of the task for each entry processed during
-     * an import or export.
-     * @throws DirectoryException
-     */
-    public void updateCounters()
-      throws DirectoryException
-    {
-      entryLeftCount--;
-
-      if (initializeTask != null)
-      {
-        if (initializeTask instanceof InitializeTask)
-        {
-          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
-        }
-        else if (initializeTask instanceof InitializeTargetTask)
-        {
-          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
-        }
-      }
+      super("Replication State Saver for server id " +
+            serverId + " and domain " + baseDn.toString());
     }
 
     /**
      * {@inheritDoc}
      */
-    public String toString()
-    {
-      return new String("[ Entry count=" + this.entryCount +
-                        ", Entry left count=" + this.entryLeftCount + "]");
-    }
-  }
-
-  /**
-   * This thread is launched when we want to export data to another server that
-   * has requested to be initialized with the data of our backend.
-   */
-  private class ExportThread extends DirectoryThread
-  {
-    // Id of server that will receive updates
-    private short target;
-
-    /**
-     * Constructor for the ExportThread.
-     *
-     * @param target Id of server that will receive updates
-     */
-    public ExportThread(short target)
-    {
-      super("Export thread " + serverId);
-      this.target = target;
-    }
-
-    /**
-     * Run method for this class.
-     */
+    @Override
     public void run()
     {
-      if (debugEnabled())
-      {
-        TRACER.debugInfo("Export thread starting.");
-      }
+      done = false;
 
-      try
+      while (shutdown  == false)
       {
-        initializeRemote(target, target, null);
-      } catch (DirectoryException de)
-      {
-      // An error message has been sent to the peer
-      // Nothing more to do locally
+        try
+        {
+          synchronized (this)
+          {
+            this.wait(1000);
+            if (!disabled && !stateSavingDisabled )
+            {
+              // save the ServerState
+              state.save();
+            }
+          }
+        } catch (InterruptedException e)
+        { }
       }
-      if (debugEnabled())
-      {
-        TRACER.debugInfo("Export thread stopping.");
-      }
+      state.save();
+
+      done = true;
     }
   }
 
@@ -447,18 +295,24 @@
    * @param updateToReplayQueue The queue for update messages to replay.
    * @throws ConfigException In case of invalid configuration.
    */
-  public ReplicationDomain(ReplicationDomainCfg configuration,
+  public LDAPReplicationDomain(ReplicationDomainCfg configuration,
     LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
     throws ConfigException
   {
-    super("Replication State Saver for server id " + configuration.getServerId()
-      + " and domain " + configuration.getBaseDN());
+    super(configuration.getBaseDN().toNormalizedString(),
+        (short) configuration.getServerId());
+
+    /**
+     * The time in milliseconds between heartbeats from the replication
+     * server.  Zero means heartbeats are off.
+     */
+    long heartbeatInterval = 0;
 
     // Read the configuration parameters.
-    replicationServers = configuration.getReplicationServer();
+    Set<String> replicationServers = configuration.getReplicationServer();
     serverId = (short) configuration.getServerId();
     baseDn = configuration.getBaseDN();
-    window  = configuration.getWindowSize();
+    int window  = configuration.getWindowSize();
     heartbeatInterval = configuration.getHeartbeatInterval();
     isolationpolicy = configuration.getIsolationPolicy();
     configDn = configuration.dn();
@@ -471,28 +325,22 @@
     switch (assuredType)
     {
       case NOT_ASSURED:
-        assured = false;
+        setAssured(false);
         break;
       case SAFE_DATA:
-        assured = true;
-        this.assuredMode = AssuredMode.SAFE_DATA_MODE;
+        setAssured(true);
+        setAssuredMode(AssuredMode.SAFE_DATA_MODE);
         break;
       case SAFE_READ:
-        assured = true;
-        this.assuredMode = AssuredMode.SAFE_READ_MODE;
+        setAssured(true);
+        setAssuredMode(AssuredMode.SAFE_READ_MODE);
         break;
     }
-    this.assuredSdLevel = (byte)configuration.getAssuredSdLevel();
-    this.groupId = (byte)configuration.getGroupId();
-    this.assuredTimeout = configuration.getAssuredTimeout();
-    SortedSet<String> urls = configuration.getReferralsUrl();
-    if (urls != null)
-    {
-      for (String url : urls)
-      {
-        this.refUrls.add(url);
-      }
-    }
+    setAssuredSdLevel((byte)configuration.getAssuredSdLevel());
+    setAssuredTimeout(configuration.getAssuredTimeout());
+
+    setGroupId((byte)configuration.getGroupId());
+    setURLs(configuration.getReferralsUrl());
 
     /*
      * Modify conflicts are solved for all suffixes but the schema suffix
@@ -510,19 +358,6 @@
       solveConflictFlag = true;
     }
 
-    /*
-     * Create a new Persistent Server State that will be used to store
-     * the last ChangeNmber seen from all LDAP servers in the topology.
-     */
-    state = new PersistentServerState(baseDn, serverId);
-
-    /*
-     * Create a replication monitor object responsible for publishing
-     * monitoring information below cn=monitor.
-     */
-    monitor = new ReplicationMonitor(this);
-    DirectoryServer.registerMonitorProvider(monitor);
-
     Backend backend = retrievesBackend(baseDn);
     if (backend == null)
     {
@@ -541,14 +376,12 @@
     }
 
     /*
-     * create the broker object used to publish and receive changes
+     * Create a new Persistent Server State that will be used to store
+     * the last ChangeNmber seen from all LDAP servers in the topology.
      */
-    broker = new ReplicationBroker(this, state, baseDn, serverId,
-        maxReceiveQueue, maxReceiveDelay, maxSendQueue, maxSendDelay, window,
-        heartbeatInterval, generationId,
-        new ReplSessionSecurity(configuration),getGroupId());
+    state = new PersistentServerState(baseDn, serverId, getServerState());
 
-    broker.start(replicationServers);
+    startPublishService(replicationServers, window, heartbeatInterval);
 
     /*
      * ChangeNumberGenerator is used to create new unique ChangeNumbers
@@ -557,14 +390,12 @@
      * The generator time is adjusted to the time of the last CN received from
      * remote other servers.
      */
-    generator =
-      new ChangeNumberGenerator(serverId, state);
+    generator = getGenerator();
 
     pendingChanges =
-      new PendingChanges(generator,
-                         broker, state);
+      new PendingChanges(generator, this);
 
-    remotePendingChanges = new RemotePendingChanges(generator, state);
+    remotePendingChanges = new RemotePendingChanges(getServerState());
 
     // listen for changes on the configuration
     configuration.addChangeListener(this);
@@ -573,7 +404,6 @@
     DirectoryServer.registerAlertGenerator(this);
   }
 
-
   /**
    * Returns the base DN of this ReplicationDomain.
    *
@@ -741,7 +571,7 @@
     {
       // this isolation policy specifies that the updates are denied
       // when the broker is not connected.
-      return broker.isConnected();
+      return isConnected();
     }
     // we should never get there as the only possible policies are
     // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
@@ -931,322 +761,6 @@
   }
 
   /**
-   * Receives an update message from the replicationServer.
-   * also responsible for updating the list of pending changes
-   * @return the received message - null if none
-   */
-  public UpdateMsg receive()
-  {
-    UpdateMsg update = null;
-
-    while ( (update == null) && (!shutdown) )
-    {
-      InitializeRequestMsg initMsg = null;
-      ReplicationMsg msg;
-      try
-      {
-        msg = broker.receive();
-        if (msg == null)
-        {
-          // The server is in the shutdown process
-          return null;
-        }
-
-        if (debugEnabled())
-          if (!(msg instanceof HeartbeatMsg))
-            TRACER.debugVerbose("Message received <" + msg + ">");
-
-        if (msg instanceof AckMsg)
-        {
-          AckMsg ack = (AckMsg) msg;
-          receiveAck(ack);
-        }
-        else if (msg instanceof InitializeRequestMsg)
-        {
-          // Another server requests us to provide entries
-          // for a total update
-          initMsg = (InitializeRequestMsg)msg;
-        }
-        else if (msg instanceof InitializeTargetMsg)
-        {
-          // Another server is exporting its entries to us
-          InitializeTargetMsg importMsg = (InitializeTargetMsg) msg;
-
-          try
-          {
-            // This must be done while we are still holding the
-            // broker lock because we are now going to receive a
-            // bunch of entries from the remote server and we
-            // want the import thread to catch them and
-            // not the ListenerThread.
-            initialize(importMsg);
-          }
-          catch(DirectoryException de)
-          {
-            // Returns an error message to notify the sender
-            ErrorMsg errorMsg =
-              new ErrorMsg(importMsg.getsenderID(),
-                  de.getMessageObject());
-            MessageBuilder mb = new MessageBuilder();
-            mb.append(de.getMessageObject());
-            TRACER.debugInfo(Message.toString(mb.toMessage()));
-            broker.publish(errorMsg);
-          }
-        }
-        else if (msg instanceof ErrorMsg)
-        {
-          if (ieContext != null)
-          {
-            // This is an error termination for the 2 following cases :
-            // - either during an export
-            // - or before an import really started
-            //   For example, when we publish a request and the
-            //  replicationServer did not find any import source.
-            abandonImportExport((ErrorMsg)msg);
-          }
-          else
-          {
-            /*
-             * Log error message
-             */
-            ErrorMsg errorMsg = (ErrorMsg)msg;
-            logError(ERR_ERROR_MSG_RECEIVED.get(
-                errorMsg.getDetails()));
-          }
-        }
-        if (msg instanceof TopologyMsg)
-        {
-          TopologyMsg topoMsg = (TopologyMsg)msg;
-          receiveTopo(topoMsg);
-        }
-        if (msg instanceof ChangeStatusMsg)
-        {
-          ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
-          receiveChangeStatus(csMsg);
-        }
-        else if (msg instanceof UpdateMsg)
-        {
-          update = (UpdateMsg) msg;
-          receiveUpdate(update);
-        }
-      }
-      catch (SocketTimeoutException e)
-      {
-        // just retry
-      }
-      // Test if we have received and export request message and
-      // if that's the case handle it now.
-      // This must be done outside of the portion of code protected
-      // by the broker lock so that we keep receiveing update
-      // when we are doing and export and so that a possible
-      // closure of the socket happening when we are publishing the
-      // entries to the remote can be handled by the other
-      // replay thread when they call this method and therefore the
-      // broker.receive() method.
-      if (initMsg != null)
-      {
-        // Do this work in a thread to allow replay thread continue working
-        ExportThread exportThread = new ExportThread(initMsg.getsenderID());
-        exportThread.start();
-      }
-    }
-    return update;
-  }
-
-  /**
-   * Processes an incoming TopologyMsg.
-   * Updates the structures for the local view of the topology.
-   *
-   * @param topoMsg The topology information received from RS.
-   */
-  public void receiveTopo(TopologyMsg topoMsg)
-  {
-
-    if (debugEnabled())
-      TRACER.debugInfo("Replication domain " + baseDn
-        + " received topology info update:\n" + topoMsg);
-
-    // Store new lists
-    synchronized(getDsList())
-    {
-      synchronized(getRsList())
-      {
-        dsList = topoMsg.getDsList();
-        rsList = topoMsg.getRsList();
-      }
-    }
-  }
-
-  /**
-   * Set the initial status of the domain, once he is connected to the topology.
-   * @param initStatus The status to enter the state machine with
-   */
-  public void setInitialStatus(ServerStatus initStatus)
-  {
-    // Sanity check: is it a valid initial status?
-    if (!isValidInitialStatus(initStatus))
-    {
-      Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
-        baseDn.toString(), Short.toString(serverId));
-      logError(msg);
-    } else
-    {
-      status = initStatus;
-    }
-  }
-
-  /**
-   * Processes an incoming ChangeStatusMsg. Compute new status according to
-   * given order. Then update domain for being compliant with new status
-   * definition.
-   * @param csMsg The received status message
-   */
-  private void receiveChangeStatus(ChangeStatusMsg csMsg)
-  {
-    if (debugEnabled())
-      TRACER.debugInfo("Replication domain " + baseDn +
-        " received change status message:\n" + csMsg);
-
-    ServerStatus reqStatus = csMsg.getRequestedStatus();
-
-    // Translate requested status to a state machine event
-    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
-    if (event == StatusMachineEvent.INVALID_EVENT)
-    {
-      Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
-        baseDn.toString(), Short.toString(serverId));
-      logError(msg);
-      return;
-    }
-
-    // Compute new status and do matching tasks
-    // Use synchronized as admin task (thread) could order to go in admin status
-    // for instance (concurrent with receive thread).
-    synchronized (status)
-    {
-      ServerStatus newStatus =
-        StatusMachine.computeNewStatus(status, event);
-
-      if (newStatus == ServerStatus.INVALID_STATUS)
-      {
-        Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
-          Short.toString(serverId), status.toString(), event.toString());
-        logError(msg);
-        return;
-      }
-
-      // Store new status
-      status = newStatus;
-
-      if (debugEnabled())
-      TRACER.debugInfo("Replication domain " + baseDn +
-        " new status is: " + status);
-
-      // Perform whatever actions are needed to apply properties for being
-      // compliant with new status
-      updateDomainForNewStatus();
-    }
-  }
-
-  /**
-   * Called when first connection or disconnection detected.
-   */
-  public void toNotConnectedStatus()
-  {
-    // Go into not connected status
-    // Use synchronized as somebody could ask another status change at the same
-    // time
-    synchronized (status)
-    {
-      StatusMachineEvent event =
-        StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT;
-      ServerStatus newStatus =
-        StatusMachine.computeNewStatus(status, event);
-
-      if (newStatus == ServerStatus.INVALID_STATUS)
-      {
-        Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
-          Short.toString(serverId), status.toString(), event.toString());
-        logError(msg);
-        return;
-      }
-
-      // Store new status
-      status = newStatus;
-
-      if (debugEnabled())
-        TRACER.debugInfo("Replication domain " + baseDn +
-          " new status is: " + status);
-
-      // Perform whatever actions are needed to apply properties for being
-      // compliant with new status
-      updateDomainForNewStatus();
-    }
-  }
-
-  /**
-   * Perform whatever actions are needed to apply properties for being
-   * compliant with new status. Must be called in synchronized section for
-   * status. The new status is already set in status variable.
-   */
-  private void updateDomainForNewStatus()
-  {
-    switch (status)
-    {
-      case NOT_CONNECTED_STATUS:
-        break;
-      case NORMAL_STATUS:
-        break;
-      case DEGRADED_STATUS:
-        break;
-      case FULL_UPDATE_STATUS:
-        // Signal RS we just entered the full update status
-        broker.signalStatusChange(status);
-        break;
-      case BAD_GEN_ID_STATUS:
-        break;
-      default:
-        if (debugEnabled())
-          TRACER.debugInfo("updateDomainForNewStatus: unexpected status: " +
-            status);
-    }
-  }
-
-  /**
-   * Do the necessary processing when an UpdateMsg was received.
-   *
-   * @param update The received UpdateMsg.
-   */
-  public void receiveUpdate(UpdateMsg update)
-  {
-    remotePendingChanges.putRemoteUpdate(update);
-    numRcvdUpdates.incrementAndGet();
-  }
-
-  /**
-   * Do the necessary processing when an AckMsg is received.
-   *
-   * @param ack The AckMsg that was received.
-   */
-  public void receiveAck(AckMsg ack)
-  {
-    UpdateMsg update;
-    ChangeNumber changeNumber = ack.getChangeNumber();
-
-    synchronized (waitingAckMsgs)
-    {
-      update = waitingAckMsgs.remove(changeNumber);
-    }
-    if (update != null)
-    {
-      synchronized (update)
-      {
-        update.notify();
-      }
-    }
-  }
-
-  /**
    * Check if an operation must be synchronized.
    * Also update the list of pending changes and the server RUV
    * @param op the operation
@@ -1258,19 +772,17 @@
     {
       numReplayedPostOpCalled++;
     }
-    UpdateMsg msg = null;
+    LDAPUpdateMsg msg = null;
 
     // Note that a failed non-replication operation might not have a change
     // number.
     ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
 
-    boolean isAssured = isAssured(op);
-
     if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
     {
       // Generate a replication message for a successful non-replication
       // operation.
-      msg = UpdateMsg.generateMsg(op);
+      msg = LDAPUpdateMsg.generateMsg(op);
 
       if (msg == null)
       {
@@ -1307,16 +819,6 @@
         return;
       }
 
-      if (msg != null && isAssured)
-      {
-        synchronized (waitingAckMsgs)
-        {
-          // Add the assured message to the list of update that are
-          // waiting acknowledgements
-          waitingAckMsgs.put(curChangeNumber, msg);
-        }
-      }
-
       if (generationIdSavedStatus != true)
       {
         this.saveGenerationId(generationId);
@@ -1334,53 +836,8 @@
 
     if (!op.isSynchronizationOperation())
     {
-      int pushedChanges = pendingChanges.pushCommittedChanges();
-      numSentUpdates.addAndGet(pushedChanges);
+      pendingChanges.pushCommittedChanges();
     }
-
-    // Wait for acknowledgement of an assured message.
-    if (msg != null && isAssured)
-    {
-      synchronized (msg)
-      {
-        while (waitingAckMsgs.containsKey(msg.getChangeNumber()))
-        {
-          // TODO : should have a configurable timeout to get
-          // out of this loop
-          try
-          {
-            msg.wait(1000);
-          } catch (InterruptedException e)
-          { }
-        }
-      }
-    }
-  }
-
-  /**
-   * get the number of updates received by the replication plugin.
-   *
-   * @return the number of updates received
-   */
-  public int getNumRcvdUpdates()
-  {
-    if (numRcvdUpdates != null)
-      return numRcvdUpdates.get();
-    else
-      return 0;
-  }
-
-  /**
-   * Get the number of updates sent by the replication plugin.
-   *
-   * @return the number of updates sent
-   */
-  public int getNumSentUpdates()
-  {
-    if (numSentUpdates != null)
-      return numSentUpdates.get();
-    else
-      return 0;
   }
 
   /**
@@ -1397,27 +854,6 @@
   }
 
   /**
-   * Increment the number of processed updates.
-   */
-  public void incProcessedUpdates()
-  {
-    numProcessedUpdates.incrementAndGet();
-  }
-
-  /**
-   * get the number of updates replayed by the replication.
-   *
-   * @return The number of updates replayed by the replication
-   */
-  public int getNumProcessedUpdates()
-  {
-    if (numProcessedUpdates != null)
-      return numProcessedUpdates.get();
-    else
-      return 0;
-  }
-
-  /**
    * get the number of updates replayed successfully by the replication.
    *
    * @return The number of updates replayed successfully
@@ -1428,16 +864,6 @@
   }
 
   /**
-   * get the ServerState.
-   *
-   * @return the ServerState
-   */
-  public ServerState getServerState()
-  {
-    return state;
-  }
-
-  /**
    * Get the debugCount.
    *
    * @return Returns the debugCount.
@@ -1448,49 +874,6 @@
   }
 
   /**
-   * Send an Ack message.
-   *
-   * @param changeNumber The ChangeNumber for which the ack must be sent.
-   */
-  public void ack(ChangeNumber changeNumber)
-  {
-    broker.publish(new AckMsg(changeNumber));
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void run()
-  {
-    done = false;
-
-    // Create the listener thread
-    listenerThread = new ListenerThread(this, updateToReplayQueue);
-    listenerThread.start();
-
-    while (shutdown  == false)
-    {
-      try
-      {
-        synchronized (this)
-        {
-          this.wait(1000);
-          if (!disabled && !stateSavingDisabled )
-          {
-            // save the RUV
-            state.save();
-          }
-        }
-      } catch (InterruptedException e)
-      { }
-    }
-    state.save();
-
-    done = true;
-  }
-
-  /**
    * Shutdown this ReplicationDomain.
    */
   public void shutdown()
@@ -1498,27 +881,19 @@
     // stop the flush thread
     shutdown = true;
 
-    // Stop the listener thread
-    if (listenerThread != null)
+    // stop the thread in charge of flushing the ServerState.
+    if (flushThread != null)
     {
-      listenerThread.shutdown();
+      synchronized (flushThread)
+      {
+        flushThread.notify();
+      }
     }
 
-    synchronized (this)
-    {
-      this.notify();
-    }
-
-    DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
-
     DirectoryServer.deregisterAlertGenerator(this);
 
-    // stop the ReplicationBroker
-    broker.stop();
-
-    // Wait for the listener thread to stop
-    if (listenerThread != null)
-      listenerThread.waitForShutdown();
+    // stop the ReplicationDomain
+    stopDomain();
 
     // wait for completion of the persistentServerState thread.
     try
@@ -1534,26 +909,11 @@
   }
 
   /**
-   * Get the name of the replicationServer to which this domain is currently
-   * connected.
-   *
-   * @return the name of the replicationServer to which this domain
-   *         is currently connected.
-   */
-  public String getReplicationServer()
-  {
-    if (broker != null)
-      return broker.getReplicationServer();
-    else
-      return "Not connected";
-  }
-
-  /**
    * Create and replay a synchronized Operation from an UpdateMsg.
    *
    * @param msg The UpdateMsg to be replayed.
    */
-  public void replay(UpdateMsg msg)
+  public void replay(LDAPUpdateMsg msg)
   {
     Operation op = null;
     boolean done = false;
@@ -1565,6 +925,7 @@
     // whose dependency has been replayed until no more left.
     do
     {
+      String replayErrorMsg = null;
       try
       {
         op = msg.createOperation(conn);
@@ -1572,12 +933,12 @@
 
         while ((!dependency) && (!done) && (retryCount-- > 0))
         {
+          // Try replay the operation
           op.setInternalOperation(true);
           op.setSynchronizationOperation(true);
           changeNumber = OperationContext.getChangeNumber(op);
           ((AbstractOperation) op).run();
 
-          // Try replay the operation
           ResultCode result = op.getResultCode();
 
           if (result != ResultCode.SUCCESS)
@@ -1638,7 +999,7 @@
             op.getErrorMessage().toString());
           logError(message);
           numUnresolvedNamingConflicts.incrementAndGet();
-
+          replayErrorMsg = message.toString();
           updateError(changeNumber);
         }
       } catch (ASN1Exception e)
@@ -1646,16 +1007,19 @@
         Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
           String.valueOf(msg) + stackTraceToSingleLineString(e));
         logError(message);
+        replayErrorMsg = message.toString();
       } catch (LDAPException e)
       {
         Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
           String.valueOf(msg) + stackTraceToSingleLineString(e));
         logError(message);
+        replayErrorMsg = message.toString();
       } catch (DataFormatException e)
       {
         Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
           String.valueOf(msg) + stackTraceToSingleLineString(e));
         logError(message);
+        replayErrorMsg = message.toString();
       } catch (Exception e)
       {
         if (changeNumber != null)
@@ -1669,21 +1033,20 @@
           Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
             stackTraceToSingleLineString(e), op.toString());
           logError(message);
+          replayErrorMsg = message.toString();
           updateError(changeNumber);
         } else
         {
           Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
             String.valueOf(msg) + stackTraceToSingleLineString(e));
           logError(message);
+          replayErrorMsg = message.toString();
         }
       } finally
       {
         if (!dependency)
         {
-          broker.updateWindowAfterReplay();
-          if (msg.isAssured())
-            ack(msg.getChangeNumber());
-          incProcessedUpdates();
+          processUpdateDone(msg, replayErrorMsg);
         }
       }
 
@@ -1913,7 +1276,7 @@
   * @return true if the process is completed, false if it must continue..
   */
  private boolean solveNamingConflict(DeleteOperation op,
-     UpdateMsg msg)
+     LDAPUpdateMsg msg)
  {
    ResultCode result = op.getResultCode();
    DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
@@ -1983,7 +1346,7 @@
  * @throws Exception When the operation is not valid.
  */
 private boolean solveNamingConflict(ModifyDNOperation op,
-    UpdateMsg msg) throws Exception
+    LDAPUpdateMsg msg) throws Exception
 {
   ResultCode result = op.getResultCode();
   ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
@@ -2391,83 +1754,6 @@
   }
 
   /**
-   * Check if an operation must be processed as an assured operation.
-   *
-   * @param op the operation to be checked.
-   * @return true if the operations must be processed as an assured operation.
-   */
-  private boolean isAssured(PostOperationOperation op)
-  {
-    // TODO : should have a filtering mechanism for checking
-    // operation that are assured and operations that are not.
-    return false;
-  }
-
-  /**
-   * Get the maximum receive window size.
-   *
-   * @return The maximum receive window size.
-   */
-  public int getMaxRcvWindow()
-  {
-    if (broker != null)
-      return broker.getMaxRcvWindow();
-    else
-      return 0;
-  }
-
-  /**
-   * Get the current receive window size.
-   *
-   * @return The current receive window size.
-   */
-  public int getCurrentRcvWindow()
-  {
-    if (broker != null)
-      return broker.getCurrentRcvWindow();
-    else
-      return 0;
-  }
-
-  /**
-   * Get the maximum send window size.
-   *
-   * @return The maximum send window size.
-   */
-  public int getMaxSendWindow()
-  {
-    if (broker != null)
-      return broker.getMaxSendWindow();
-    else
-      return 0;
-  }
-
-  /**
-   * Get the current send window size.
-   *
-   * @return The current send window size.
-   */
-  public int getCurrentSendWindow()
-  {
-    if (broker != null)
-      return broker.getCurrentSendWindow();
-    else
-      return 0;
-  }
-
-  /**
-   * Get the number of times the replication connection was lost.
-   * @return The number of times the replication connection was lost.
-   */
-  public int getNumLostConnections()
-  {
-    if (broker != null)
-      return broker.getNumLostConnections();
-    else
-      return 0;
-  }
-
-  /**
    * Get the number of modify conflicts successfully resolved.
    * @return The number of modify conflicts successfully resolved.
    */
@@ -2477,7 +1763,7 @@
   }
 
   /**
-   * Get the number of namign conflicts successfully resolved.
+   * Get the number of naming conflicts successfully resolved.
    * @return The number of naming conflicts successfully resolved.
    */
   public int getNumResolvedNamingConflicts()
@@ -2495,18 +1781,9 @@
   }
 
   /**
-   * Get the server ID.
-   * @return The server ID.
-   */
-  public short getServerId()
-  {
-    return serverId;
-  }
-
-  /**
    * Check if the domain solve conflicts.
    *
-   * @return a boolean indicating if the domain should sove conflicts.
+   * @return a boolean indicating if the domain should solve conflicts.
    */
   public boolean solveConflict()
   {
@@ -2526,16 +1803,7 @@
     state.save();
     state.clearInMemory();
     disabled = true;
-
-    // Stop the listener thread
-    if (listenerThread != null)
-      listenerThread.shutdown();
-
-    broker.stop(); // This will cut the session and wake up the listener
-
-    // Wait for the listener thread to stop
-    if (listenerThread != null)
-      listenerThread.waitForShutdown();
+    disableService(); // This will cut the session and wake up the listener
   }
 
   /**
@@ -2578,16 +1846,7 @@
       return;
     }
 
-    // After an on-line import, the value of the generationId is new
-    // and it is necessary for the broker to send this new value as part
-    // of the serverStart message.
-    broker.setGenerationId(generationId);
-
-    broker.start(replicationServers);
-
-    // Create the listener thread
-    listenerThread = new ListenerThread(this, updateToReplayQueue);
-    listenerThread.start();
+    enableService();
 
     disabled = false;
   }
@@ -2600,7 +1859,7 @@
    */
   public long computeGenerationId() throws DirectoryException
   {
-    long genId = exportBackend(true);
+    long genId = exportBackend(null, true);
 
     if (debugEnabled())
       TRACER.debugInfo("Computed generationId: generationId=" + genId);
@@ -2609,11 +1868,9 @@
   }
 
   /**
-   * Returns the generationId set for this domain.
-   *
-   * @return The generationId.
+   * {@inheritDoc}
    */
-  public long getGenerationId()
+  public long getGenerationID()
   {
     return generationId;
   }
@@ -2627,7 +1884,7 @@
   /**
    * Stores the value of the generationId.
    * @param generationId The value of the generationId.
-   * @return a ResultCode indicating if the method was successfull.
+   * @return a ResultCode indicating if the method was successful.
    */
   public ResultCode saveGenerationId(long generationId)
   {
@@ -2792,45 +2049,8 @@
   }
 
   /**
-   * Reset the generationId of this domain in the whole topology.
-   * A message is sent to the Replication Servers for them to reset
-   * their change dbs.
-   *
-   * @param generationIdNewValue The new value of the generation Id.
-   * @throws DirectoryException when an error occurs
-   */
-  public void resetGenerationId(Long generationIdNewValue)
-  throws DirectoryException
-  {
-    if (debugEnabled())
-      TRACER.debugInfo(
-          this.getName() + "resetGenerationId" + generationIdNewValue);
-
-    if (!isConnected())
-    {
-      ResultCode resultCode = ResultCode.OTHER;
-      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
-          baseDn.toNormalizedString());
-      throw new DirectoryException(
-         resultCode, message);
-    }
-
-    ResetGenerationIdMsg genIdMessage = null;
-
-    if (generationIdNewValue == null)
-    {
-      genIdMessage = new ResetGenerationIdMsg(this.generationId);
-    }
-    else
-    {
-      genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
-    }
-    broker.publish(genIdMessage);
-  }
-
-  /**
    * Do whatever is needed when a backup is started.
-   * We need to make sure that the serverState is correclty save.
+   * We need to make sure that the serverState is correctly save.
    */
   public void backupStart()
   {
@@ -2849,103 +2069,7 @@
    * Total Update >>
    */
 
-  /**
-   * Receives bytes related to an entry in the context of an import to
-   * initialize the domain (called by ReplLDIFInputStream).
-   *
-   * @return The bytes. Null when the Done or Err message has been received
-   */
-  public byte[] receiveEntryBytes()
-  {
-    ReplicationMsg msg;
-    while (true)
-    {
-      try
-      {
-        msg = broker.receive();
 
-        if (debugEnabled())
-          TRACER.debugVerbose(
-              " sid:" + this.serverId +
-              " base DN:" + this.baseDn +
-              " Import EntryBytes received " + msg);
-        if (msg == null)
-        {
-          // The server is in the shutdown process
-          return null;
-        }
-
-        if (msg instanceof EntryMsg)
-        {
-          EntryMsg entryMsg = (EntryMsg)msg;
-          byte[] entryBytes = entryMsg.getEntryBytes();
-          ieContext.updateCounters();
-          return entryBytes;
-        }
-        else if (msg instanceof DoneMsg)
-        {
-          // This is the normal termination of the import
-          // No error is stored and the import is ended
-          // by returning null
-          return null;
-        }
-        else if (msg instanceof ErrorMsg)
-        {
-          // This is an error termination during the import
-          // The error is stored and the import is ended
-          // by returning null
-          ErrorMsg errorMsg = (ErrorMsg)msg;
-          ieContext.exception = new DirectoryException(
-                                      ResultCode.OTHER,
-                                      errorMsg.getDetails());
-          return null;
-        }
-        else
-        {
-          // Other messages received during an import are trashed
-        }
-      }
-      catch(Exception e)
-      {
-        // TODO: i18n
-        ieContext.exception = new DirectoryException(ResultCode.OTHER,
-            Message.raw("received an unexpected message type" +
-                e.getLocalizedMessage()));
-      }
-    }
-  }
-
-  /**
-   * Processes an error message received while an import/export is
-   * on going.
-   * @param errorMsg The error message received.
-   */
-  protected void abandonImportExport(ErrorMsg errorMsg)
-  {
-    // FIXME TBD Treat the case where the error happens while entries
-    // are being exported
-
-    if (debugEnabled())
-      TRACER.debugVerbose(
-          " abandonImportExport:" + this.serverId +
-          " base DN:" + this.baseDn +
-          " Error Msg received " + errorMsg);
-
-    if (ieContext != null)
-    {
-      ieContext.exception = new DirectoryException(ResultCode.OTHER,
-          errorMsg.getDetails());
-
-      if (ieContext.initializeTask instanceof InitializeTask)
-      {
-        // Update the task that initiated the import
-        ((InitializeTask)ieContext.initializeTask).
-        updateTaskCompletionState(ieContext.exception);
-
-        releaseIEContext();
-      }
-    }
-  }
 
   /**
    * Clears all the entries from the JE backend determined by the
@@ -3000,16 +2124,31 @@
   }
 
   /**
+   * This method trigger an export of the replicated data.
+   *
+   * @param output               The OutputStream where the export should
+   *                             be produced.
+   * @throws DirectoryException  When needed.
+   */
+  protected void exportBackend(OutputStream output) throws DirectoryException
+  {
+    exportBackend(output, false);
+  }
+
+  /**
    * Export the entries from the backend and/or compute the generation ID.
    * The ieContext must have been set before calling.
-   * @param checksumOutput true is the exportBackend is called to compute
-   *                       the generationID
    *
-   * @return The computed  generationID.
+   * @param output              The OutputStream where the export should
+   *                            be produced.
+   * @param checksumOutput      A boolean indicating if this export is
+   *                            invoked to perform a checksum only
+   *
+   * @return The computed       GenerationID.
    *
    * @throws DirectoryException when an error occurred
    */
-  protected long exportBackend(boolean checksumOutput)
+  protected long exportBackend(OutputStream output, boolean checksumOutput)
   throws DirectoryException
   {
     long genID = 0;
@@ -3042,7 +2181,7 @@
     }
 
     OutputStream os;
-    ReplLDIFOutputStream ros;
+    ReplLDIFOutputStream ros = null;
 
     if (checksumOutput)
     {
@@ -3060,8 +2199,7 @@
     }
     else
     {
-      ros = new ReplLDIFOutputStream(this, (short)-1);
-      os = ros;
+      os = output;
     }
     LDIFExportConfig exportConfig = new LDIFExportConfig(os);
 
@@ -3096,7 +2234,7 @@
     }
     catch (DirectoryException de)
     {
-      if ((checksumOutput) &&
+      if ((ros != null) &&
           (ros.getNumExportedEntries() >= entryCount))
       {
         // This is the normal end when computing the generationId
@@ -3170,277 +2308,6 @@
   }
 
   /**
-   * Get the internal broker to perform some operations on it.
-   *
-   * @return The broker for this domain.
-   */
-  ReplicationBroker getBroker()
-  {
-    return broker;
-  }
-
-  /**
-   * Exports an entry in LDIF format.
-   *
-   * @param  lDIFEntry The entry to be exported..
-   *
-   * @throws IOException when an error occurred.
-   */
-  public void exportLDIFEntry(String lDIFEntry) throws IOException
-  {
-    // If an error was raised - like receiving an ErrorMsg
-    // we just let down the export.
-    if (ieContext.exception != null)
-    {
-      IOException ioe = new IOException(ieContext.exception.getMessage());
-      ieContext = null;
-      throw ioe;
-    }
-
-    EntryMsg entryMessage = new EntryMsg(
-        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
-    broker.publish(entryMessage);
-
-    try
-    {
-      ieContext.updateCounters();
-    }
-    catch (DirectoryException de)
-    {
-      throw new IOException(de.getMessage());
-    }
-  }
-
-  /**
-   * Initializes this domain from another source server.
-   *
-   * @param source The source from which to initialize
-   * @param initTask The task that launched the initialization
-   *                 and should be updated of its progress.
-   * @throws DirectoryException when an error occurs
-   */
-  public void initializeFromRemote(short source, Task initTask)
-  throws DirectoryException
-  {
-    if (debugEnabled())
-      TRACER.debugInfo("Entering initializeFromRemote");
-
-    acquireIEContext();
-    ieContext.initializeTask = initTask;
-
-    InitializeRequestMsg initializeMsg = new InitializeRequestMsg(
-        baseDn, serverId, source);
-
-    // Publish Init request msg
-    broker.publish(initializeMsg);
-
-    // .. we expect to receive entries or err after that
-  }
-
-  /**
-   * Verifies that the given string represents a valid source
-   * from which this server can be initialized.
-   * @param sourceString The string representing the source
-   * @return The source as a short value
-   * @throws DirectoryException if the string is not valid
-   */
-  public short decodeSource(String sourceString)
-  throws DirectoryException
-  {
-    short  source = 0;
-    Throwable cause = null;
-    try
-    {
-      source = Integer.decode(sourceString).shortValue();
-      if ((source >= -1) && (source != serverId))
-      {
-        // TODO Verifies serverID is in the domain
-        // We shold check here that this is a server implied
-        // in the current domain.
-        return source;
-      }
-    }
-    catch(Exception e)
-    {
-      cause = e;
-    }
-
-    ResultCode resultCode = ResultCode.OTHER;
-    Message message = ERR_INVALID_IMPORT_SOURCE.get();
-    if (cause != null)
-    {
-      throw new DirectoryException(
-          resultCode, message, cause);
-    }
-    else
-    {
-      throw new DirectoryException(
-          resultCode, message);
-    }
-  }
-
-  /**
-   * Verifies that the given string represents a valid source
-   * from which this server can be initialized.
-   * @param targetString The string representing the source
-   * @return The source as a short value
-   * @throws DirectoryException if the string is not valid
-   */
-  public short decodeTarget(String targetString)
-  throws DirectoryException
-  {
-    short  target = 0;
-    Throwable cause;
-    if (targetString.equalsIgnoreCase("all"))
-    {
-      return RoutableMsg.ALL_SERVERS;
-    }
-
-    // So should be a serverID
-    try
-    {
-      target = Integer.decode(targetString).shortValue();
-      if (target >= 0)
-      {
-        // FIXME Could we check now that it is a know server in the domain ?
-      }
-      return target;
-    }
-    catch(Exception e)
-    {
-      cause = e;
-    }
-    ResultCode resultCode = ResultCode.OTHER;
-    Message message = ERR_INVALID_EXPORT_TARGET.get();
-
-    if (cause != null)
-      throw new DirectoryException(
-          resultCode, message, cause);
-    else
-      throw new DirectoryException(
-          resultCode, message);
-
-  }
-
-  private synchronized void acquireIEContext()
-  throws DirectoryException
-  {
-    if (ieContext != null)
-    {
-      // Rejects 2 simultaneous exports
-      Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
-      throw new DirectoryException(ResultCode.OTHER,
-          message);
-    }
-
-    ieContext = new IEContext();
-  }
-
-  private synchronized void releaseIEContext()
-  {
-    ieContext = null;
-  }
-
-  /**
-   * Process the initialization of some other server or servers in the topology
-   * specified by the target argument.
-   * @param target The target that should be initialized
-   * @param initTask The task that triggers this initialization and that should
-   *                 be updated with its progress.
-   *
-   * @exception DirectoryException When an error occurs.
-   */
-  public void initializeRemote(short target, Task initTask)
-  throws DirectoryException
-  {
-    initializeRemote(target, serverId, initTask);
-  }
-
-  /**
-   * Process the initialization of some other server or servers in the topology
-   * specified by the target argument when this initialization specifying the
-   * server that requests the initialization.
-   *
-   * @param target The target that should be initialized.
-   * @param requestorID The server that initiated the export.
-   * @param initTask The task that triggers this initialization and that should
-   *  be updated with its progress.
-   *
-   * @exception DirectoryException When an error occurs.
-   */
-  public void initializeRemote(short target, short requestorID, Task initTask)
-  throws DirectoryException
-  {
-    Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
-      Short.toString(serverId),
-      baseDn.toString(),
-      Short.toString(requestorID));
-    logError(msg);
-
-    boolean contextAcquired=false;
-
-    try
-    {
-      Backend backend = retrievesBackend(this.baseDn);
-
-      if (!backend.supportsLDIFExport())
-      {
-        Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
-                            backend.getBackendID().toString());
-        logError(message);
-        throw new DirectoryException(ResultCode.OTHER, message);
-      }
-
-      acquireIEContext();
-      contextAcquired = true;
-
-      // The number of entries to be exported is the number of entries under
-      // the base DN entry and the base entry itself.
-      long entryCount = backend.numSubordinates(baseDn, true) + 1;
-      ieContext.exportTarget = target;
-      if (initTask != null)
-      {
-        ieContext.initializeTask = initTask;
-      }
-      ieContext.setCounters(entryCount, entryCount);
-
-      // Send start message to the peer
-      InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
-          baseDn, serverId, ieContext.exportTarget, requestorID, entryCount);
-
-      broker.publish(initializeMessage);
-
-      exportBackend(false);
-
-      // Notify the peer of the success
-      DoneMsg doneMsg = new DoneMsg(serverId,
-          initializeMessage.getDestination());
-      broker.publish(doneMsg);
-
-      releaseIEContext();
-    }
-    catch(DirectoryException de)
-    {
-      // Notify the peer of the failure
-      ErrorMsg errorMsg =
-        new ErrorMsg(target,
-                         de.getMessageObject());
-      broker.publish(errorMsg);
-
-      if (contextAcquired)
-        releaseIEContext();
-
-      throw(de);
-    }
-
-    msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
-      Short.toString(serverId),
-      baseDn.toString(),
-      Short.toString(requestorID));
-    logError(msg);
-  }
-
-  /**
    * Process backend before import.
    * @param backend The backend.
    * @throws Exception
@@ -3468,51 +2335,16 @@
   }
 
   /**
-   * Initializes the domain's backend with received entries.
-   * @param initializeMessage The message that initiated the import.
-   * @exception DirectoryException Thrown when an error occurs.
+   * This method should trigger an import of the replicated data.
+   *
+   * @param input                The InputStream from which
+   * @throws DirectoryException  When needed.
    */
-  protected void initialize(InitializeTargetMsg initializeMessage)
-  throws DirectoryException
+  public void importBackend(InputStream input) throws DirectoryException
   {
     LDIFImportConfig importConfig = null;
     DirectoryException de = null;
 
-    Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
-      Short.toString(serverId),
-      baseDn.toString(),
-      Long.toString(initializeMessage.getRequestorID()));
-    logError(msg);
-
-    // Go into full update status
-    // Use synchronized as somebody could ask another status change at the same
-    // time
-    synchronized (status)
-    {
-      StatusMachineEvent event = StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT;
-      ServerStatus newStatus =
-        StatusMachine.computeNewStatus(status, event);
-
-      if (newStatus == ServerStatus.INVALID_STATUS)
-      {
-        msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
-          Short.toString(serverId), status.toString(), event.toString());
-        logError(msg);
-        return;
-      }
-
-      // Store new status
-      status = newStatus;
-
-      if (debugEnabled())
-      TRACER.debugInfo("Replication domain " + baseDn +
-        " new status is: " + status);
-
-      // Perform whatever actions are needed to apply properties for being
-      // compliant with new status
-      updateDomainForNewStatus();
-    }
-
     Backend backend = retrievesBackend(baseDn);
 
     try
@@ -3526,26 +2358,8 @@
       }
       else
       {
-        if (initializeMessage.getRequestorID() == serverId)
-        {
-          // The import responds to a request we did so the IEContext
-          // is already acquired
-        }
-        else
-        {
-          acquireIEContext();
-        }
-
-        ieContext.importSource = initializeMessage.getsenderID();
-        ieContext.entryLeftCount = initializeMessage.getEntryCount();
-        ieContext.setCounters(initializeMessage.getEntryCount(),
-            initializeMessage.getEntryCount());
-
-        preBackendImport(backend);
-
-        ieContext.ldifImportInputStream = new ReplLDIFInputStream(this);
         importConfig =
-          new LDIFImportConfig(ieContext.ldifImportInputStream);
+          new LDIFImportConfig(input);
         List<DN> includeBranches = new ArrayList<DN>();
         includeBranches.add(this.baseDn);
         importConfig.setIncludeBranches(includeBranches);
@@ -3553,11 +2367,12 @@
 
         // TODO How to deal with rejected entries during the import
         importConfig.writeRejectedEntries(
-          getFileForPath("logs" + File.separator +
-              "replInitRejectedEntries").getAbsolutePath(),
-          ExistingFileBehavior.OVERWRITE);
+            getFileForPath("logs" + File.separator +
+            "replInitRejectedEntries").getAbsolutePath(),
+            ExistingFileBehavior.OVERWRITE);
 
         // Process import
+        preBackendImport(backend);
         backend.importLDIF(importConfig);
 
         stateSavingDisabled = false;
@@ -3570,9 +2385,6 @@
     }
     finally
     {
-      if ((ieContext != null)  && (ieContext.exception != null))
-        de = ieContext.exception;
-
       // Cleanup
       if (importConfig != null)
       {
@@ -3592,7 +2404,6 @@
           TRACER.debugInfo(
               "After import, the replication plugin restarts connections" +
               " to all RSs to provide new generation ID=" + generationId);
-        broker.setGenerationId(generationId);
       }
       catch (DirectoryException fe)
       {
@@ -3604,29 +2415,12 @@
         if (de == null)
           de = fe;
       }
-
-      // Re-exchange generationID and state with RS
-      broker.reStart();
-
-      // Update the task that initiated the import
-      if ((ieContext != null ) && (ieContext.initializeTask != null))
-      {
-        ((InitializeTask)ieContext.initializeTask).
-        updateTaskCompletionState(de);
-      }
-      releaseIEContext();
     }
     // Sends up the root error.
     if (de != null)
     {
       throw de;
     }
-
-    msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
-      Short.toString(serverId),
-      baseDn.toString(),
-      Long.toString(initializeMessage.getRequestorID()));
-    logError(msg);
   }
 
   /**
@@ -3660,10 +2454,10 @@
    * @throws DirectoryException When an error occurred or no domain
    * match the provided baseDn.
    */
-  public static ReplicationDomain retrievesReplicationDomain(DN baseDn)
+  public static LDAPReplicationDomain retrievesReplicationDomain(DN baseDn)
   throws DirectoryException
   {
-    ReplicationDomain replicationDomain = null;
+    LDAPReplicationDomain replicationDomain = null;
 
     // Retrieves the domain
     DirectoryServer.getSynchronizationProviders();
@@ -3678,7 +2472,7 @@
       }
 
       // From the domainDN retrieves the replication domain
-      ReplicationDomain sdomain =
+      LDAPReplicationDomain sdomain =
         MultimasterReplication.findDomain(baseDn, null);
       if (sdomain == null)
       {
@@ -3714,15 +2508,6 @@
     return retrievesBackend(baseDn);
   }
 
-  /**
-   * Returns a boolean indicating if an import or export is currently
-   * processed.
-   * @return The status
-   */
-  public boolean ieRunning()
-  {
-    return (ieContext != null);
-  }
   /*
    * <<Total Update
    */
@@ -3769,7 +2554,7 @@
   {
     // Check that there is not already a domain with the same DN
     DN dn = configuration.getBaseDN();
-    ReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
+    LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
     if ((domain != null) && (domain.baseDn.equals(dn)))
     {
       Message message = ERR_SYNC_INVALID_DN.get();
@@ -3793,37 +2578,12 @@
   public ConfigChangeResult applyConfigurationChange(
          ReplicationDomainCfg configuration)
   {
-    // server id and base dn are readonly.
-    // isolationPolicy can be set immediately and will apply
-    // to the next updates.
-    // The other parameters needs to be renegociated with the ReplicationServer
-    // so that requires restarting the session with the ReplicationServer.
-    Boolean needToRestartSession = false;
-    Collection<String> newReplServers = configuration.getReplicationServer();
-
-    // A new session is necessary only when information regarding
-    // the connection is modified
-    if ((!(replicationServers.size() == newReplServers.size()
-        && replicationServers.containsAll(newReplServers))) ||
-        window != configuration.getWindowSize() ||
-        heartbeatInterval != configuration.getHeartbeatInterval())
-      needToRestartSession = true;
-
-    replicationServers = newReplServers;
-    window = configuration.getWindowSize();
-    heartbeatInterval = configuration.getHeartbeatInterval();
-    broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay,
-        maxSendQueue, maxSendDelay, window, heartbeatInterval);
     isolationpolicy = configuration.getIsolationPolicy();
 
-    // To be able to stop and restart the broker properly just
-    // disable and enable the domain. That way a new session
-    // with the new configuration is available.
-    if (needToRestartSession)
-    {
-      this.disable();
-      this.enable();
-    }
+    changeConfig(
+        configuration.getReplicationServer(),
+        configuration.getWindowSize(),
+        configuration.getHeartbeatInterval());
 
     return new ConfigChangeResult(ResultCode.SUCCESS, false);
   }
@@ -3867,101 +2627,265 @@
   }
 
   /**
-   * Check if the domain is connected to a ReplicationServer.
+   * Starts the Replication Domain.
+   */
+  public void start()
+  {
+    // Create the ServerStateFlush thread
+    flushThread = new ServerStateFlush();
+    flushThread.start();
+
+    startListenService();
+  }
+
+
+  /**
+   * {@inheritDoc}
+   */
+  public void sessionInitiated(
+      ServerStatus initStatus,
+      ServerState replicationServerState,
+      ProtocolSession session)
+  {
+    super.sessionInitiated(initStatus, replicationServerState, session);
+    try
+    {
+      /*
+       * We must not publish changes to a replicationServer that has
+       * not seen all our previous changes because this could cause
+       * some other ldap servers to miss those changes.
+       * Check that the ReplicationServer has seen all our previous
+       * changes.
+       */
+      ChangeNumber replServerMaxChangeNumber =
+        replicationServerState.getMaxChangeNumber(serverId);
+
+      if (replServerMaxChangeNumber == null)
+      {
+        replServerMaxChangeNumber = new ChangeNumber(0, 0, serverId);
+      }
+      ChangeNumber ourMaxChangeNumber =
+        state.getMaxChangeNumber(serverId);
+
+      if ((ourMaxChangeNumber != null) &&
+          (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
+      {
+
+        // Replication server is missing some of our changes: let's
+        // send them to him.
+
+        Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
+        logError(message);
+
+        /*
+         * Get all the changes that have not been seen by this
+         * replication server and populate the replayOperations
+         * list.
+         */
+        InternalSearchOperation op = searchForChangedEntries(
+            baseDn, replServerMaxChangeNumber, this);
+        if (op.getResultCode() != ResultCode.SUCCESS)
+        {
+          /*
+           * An error happened trying to search for the updates
+           * This server will start accepting again new updates but
+           * some inconsistencies will stay between servers.
+           * Log an error for the repair tool
+           * that will need to re-synchronize the servers.
+           */
+          message = ERR_CANNOT_RECOVER_CHANGES.get(
+              baseDn.toNormalizedString());
+          logError(message);
+        } else
+        {
+          for (FakeOperation replayOp : replayOperations)
+          {
+            ChangeNumber cn = replayOp.getChangeNumber();
+            /*
+             * Because the entry returned by the search operation
+             * can contain old historical information, it is
+             * possible that some of the FakeOperation are
+             * actually older than the
+             * Only send the Operation if it was newer than
+             * the last ChangeNumber known by the Replication Server.
+             */
+            if (cn.newer(replServerMaxChangeNumber))
+            {
+              message =
+                DEBUG_SENDING_CHANGE.get(
+                    replayOp.getChangeNumber().toString());
+              logError(message);
+              session.publish(replayOp.generateMessage());
+            }
+          }
+          message = DEBUG_CHANGES_SENT.get();
+          logError(message);
+        }
+        replayOperations.clear();
+      }
+    } catch (Exception e)
+    {
+      Message message = ERR_PUBLISHING_FAKE_OPS.get(
+          baseDn.toNormalizedString(),
+          e.getLocalizedMessage() + stackTraceToSingleLineString(e));
+      logError(message);
+    }
+  }
+
+  /**
+   * Search for the changes that happened since fromChangeNumber
+   * based on the historical attribute. The only changes that will
+   * be send will be the one generated on the serverId provided in
+   * fromChangeNumber.
+   * @param baseDn the base DN
+   * @param fromChangeNumber The change number from which we want the changes
+   * @param resultListener that will process the entries returned.
+   * @return the internal search operation
+   * @throws Exception when raised.
+   */
+  public static InternalSearchOperation searchForChangedEntries(
+    DN baseDn,
+    ChangeNumber fromChangeNumber,
+    InternalSearchListener resultListener)
+    throws Exception
+  {
+    InternalClientConnection conn =
+      InternalClientConnection.getRootConnection();
+    Short serverId = fromChangeNumber.getServerId();
+
+    String maxValueForId = "ffffffffffffffff" +
+      String.format("%04x", serverId) + "ffffffff";
+
+    LDAPFilter filter = LDAPFilter.decode(
+       "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
+       + fromChangeNumber + ")(" + Historical.HISTORICALATTRIBUTENAME +
+       "<=dummy:" + maxValueForId + "))");
+
+    LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+    attrs.add(Historical.HISTORICALATTRIBUTENAME);
+    attrs.add(Historical.ENTRYUIDNAME);
+    attrs.add("*");
+    return conn.processSearch(
+      new ASN1OctetString(baseDn.toString()),
+      SearchScope.WHOLE_SUBTREE,
+      DereferencePolicy.NEVER_DEREF_ALIASES,
+      0, 0, false, filter,
+      attrs,
+      resultListener);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void handleInternalSearchEntry(
+    InternalSearchOperation searchOperation,
+    SearchResultEntry searchEntry)
+  {
+    /*
+     * This call back is called at session establishment phase
+     * for each entry that has been changed by this server and the changes
+     * have not been sent to any Replication Server.
+     * The role of this method is to build equivalent operation from
+     * the historical information and add them in the replayOperations
+     * table.
+     */
+    Iterable<FakeOperation> updates =
+      Historical.generateFakeOperations(searchEntry);
+    for (FakeOperation op : updates)
+    {
+      replayOperations.add(op);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void handleInternalSearchReference(
+    InternalSearchOperation searchOperation,
+    SearchResultReference searchReference)
+  {
+    // TODO to be implemented
+  }
+
+
+  /**
+   * This method should return the total number of objects in the
+   * replicated domain.
+   * This count will be used for reporting.
    *
-   * @return true if the server is connected, false if not.
+   * @throws DirectoryException when needed.
+   *
+   * @return The number of objects in the replication domain.
    */
-  public boolean isConnected()
+  public long countEntries() throws DirectoryException
   {
-    if (broker != null)
-      return broker.isConnected();
-    else
+    Backend backend = retrievesBackend(baseDn);
+
+    if (!backend.supportsLDIFExport())
+    {
+      Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
+                          backend.getBackendID().toString());
+      logError(message);
+      throw new DirectoryException(ResultCode.OTHER, message);
+    }
+
+    return backend.numSubordinates(baseDn, true) + 1;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean processUpdate(UpdateMsg updateMsg)
+  {
+    if (updateMsg instanceof LDAPUpdateMsg)
+    {
+      LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg;
+
+      // put the UpdateMsg in the RemotePendingChanges list.
+      remotePendingChanges.putRemoteUpdate(msg);
+
+      // Put update message into the replay queue
+      // (block until some place in the queue is available)
+      UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
+      updateToReplayQueue.offer(updateToReplay);
+
       return false;
+    }
+
+    // unknown message type, this should not happen, just ignore it.
+    return true;
   }
 
   /**
-   * Determine whether the connection to the replication server is encrypted.
-   * @return true if the connection is encrypted, false otherwise.
+   * Monitoring information for the LDAPReplicationDomain.
+   *
+   * @return Monitoring attributes specific to the LDAPReplicationDomain.
    */
-  public boolean isSessionEncrypted()
+  public Collection<Attribute> getAdditionalMonitoring()
   {
-    if (broker != null)
-      return broker.isSessionEncrypted();
-    else
-      return false;
-  }
+    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
 
-  /**
-   * Gets the info for DSs in the topology (except us).
-   * @return The info for DSs in the topology (except us)
-   */
-  public List<DSInfo> getDsList()
-  {
-    return dsList;
-  }
+    /* get number of changes in the pending list */
+    ReplicationMonitor.addMonitorData(
+        attributes, "pending-updates", getPendingUpdatesCount());
 
-  /**
-   * Gets the info for RSs in the topology (except the one we are connected
-   * to).
-   * @return The info for RSs in the topology (except the one we are connected
-   * to)
-   */
-  public List<RSInfo> getRsList()
-  {
-    return rsList;
-  }
+    /* get number of changes successfully */
+    ReplicationMonitor.addMonitorData(attributes, "replayed-updates-ok",
+        getNumReplayedPostOpCalled());
 
-  /**
-   * Tells if assured replication is enabled for this domain.
-   * @return True if assured replication is enabled for this domain.
-   */
-  public boolean isAssured()
-  {
-    return assured;
-  }
+    /* get number of modify conflicts */
+    ReplicationMonitor.addMonitorData(attributes, "resolved-modify-conflicts",
+        getNumResolvedModifyConflicts());
 
-  /**
-   * Gives the mode for the assured replication of the domain.
-   * @return The mode for the assured replication of the domain.
-   */
-  public AssuredMode getAssuredMode()
-  {
-    return assuredMode;
-  }
+    /* get number of naming conflicts */
+    ReplicationMonitor.addMonitorData(attributes, "resolved-naming-conflicts",
+        getNumResolvedNamingConflicts());
 
-  /**
-   * Gives the assured level of the replication of the domain.
-   * @return The assured level of the replication of the domain.
-   */
-  public byte getAssuredSdLevel()
-  {
-    return assuredSdLevel;
-  }
+    /* get number of unresolved naming conflicts */
+    ReplicationMonitor.addMonitorData(attributes, "unresolved-naming-conflicts",
+        getNumUnresolvedNamingConflicts());
 
-  /**
-   * Gets the group id for this domain.
-   * @return The group id for this domain.
-   */
-  public byte getGroupId()
-  {
-    return groupId;
-  }
-
-  /**
-   * Gets the referrals URLs this domain publishes.
-   * @return The referrals URLs this domain publishes.
-   */
-  public List<String> getRefUrls()
-  {
-    return refUrls;
-  }
-
-  /**
-   * Gets the status for this domain.
-   * @return The status for this domain.
-   */
-  public ServerStatus getStatus()
-  {
-    return status;
+    return attributes;
   }
 }

--
Gitblit v1.10.0