From d04fb0f282e0fd9a4bc80d3f9d5ee15506a3b83b Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  450 +++++++++++++++++++++----------------------------------
 1 files changed, 174 insertions(+), 276 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
similarity index 81%
rename from opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
rename to opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 27a09c7..16e428d 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -24,12 +24,15 @@
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
  */
-package org.opends.server.replication.plugin;
+package org.opends.server.replication.service;
+
+import org.opends.server.replication.protocol.HeartbeatMonitor;
 
 import org.opends.messages.*;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+
 import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -45,34 +48,22 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.TreeSet;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import org.opends.server.api.DirectoryThread;
-import org.opends.server.protocols.asn1.ASN1OctetString;
-import org.opends.server.protocols.internal.InternalClientConnection;
-import org.opends.server.protocols.internal.InternalSearchListener;
-import org.opends.server.protocols.internal.InternalSearchOperation;
-import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.DSInfo;
 import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.*;
-import org.opends.server.types.DN;
-import org.opends.server.types.DereferencePolicy;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SearchResultEntry;
-import org.opends.server.types.SearchResultReference;
-import org.opends.server.types.SearchScope;
 
 /**
  * The broker for Multi-master Replication.
  */
-public class ReplicationBroker implements InternalSearchListener
+public class ReplicationBroker
 {
 
   /**
@@ -83,23 +74,17 @@
   private Collection<String> servers;
   private boolean connected = false;
   private String replicationServer = "Not connected";
-  private TreeSet<FakeOperation> replayOperations;
   private ProtocolSession session = null;
   private final ServerState state;
-  private final DN baseDn;
+  private final String baseDn;
   private final short serverId;
-  private int maxSendDelay;
-  private int maxReceiveDelay;
-  private int maxSendQueue;
-  private int maxReceiveQueue;
   private Semaphore sendWindow;
   private int maxSendWindow;
-  private int rcvWindow;
-  private int halfRcvWindow;
-  private int maxRcvWindow;
+  private int rcvWindow = 100;
+  private int halfRcvWindow = rcvWindow/2;
+  private int maxRcvWindow = rcvWindow;
   private int timeout = 0;
   private short protocolVersion;
-  private long generationId = -1;
   private ReplSessionSecurity replSessionSecurity;
   // My group id
   private byte groupId = (byte) -1;
@@ -110,7 +95,7 @@
   // The server URL of the RS we are connected to
   private String rsServerUrl = null;
   // Our replication domain
-  private ReplicationDomain replicationDomain = null;
+  private ReplicationDomain domain = null;
 
   // Trick for avoiding a inner class for many parameters return for
   // performPhaseOneHandshake method.
@@ -142,6 +127,17 @@
   // Same group id poller thread
   private SameGroupIdPoller sameGroupIdPoller = null;
 
+  /*
+   * Properties for the last topology info received from the network.
+   */
+  // Info for other DSs.
+  // Warning: does not contain info for us (for our server id)
+  private List<DSInfo> dsList = new ArrayList<DSInfo>();
+  // Info for other RSs.
+  private List<RSInfo> rsList = new ArrayList<RSInfo>();
+
+  private long generationID;
+
   /**
    * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
    *
@@ -152,13 +148,6 @@
    *              when negotiating the session with the replicationServer.
    * @param serverId The server ID that should be used by this broker
    *              when negotiating the session with the replicationServer.
-   * @param maxReceiveQueue The maximum size of the receive queue to use on
-   *                         the replicationServer.
-   * @param maxReceiveDelay The maximum replication delay to use on the
-   *                        replicationServer.
-   * @param maxSendQueue The maximum size of the send queue to use on
-   *                     the replicationServer.
-   * @param maxSendDelay The maximum send delay to use on the replicationServer.
    * @param window The size of the send and receive window to use.
    * @param heartbeatInterval The interval between heartbeats requested of the
    * replicationServer, or zero if no heartbeats are requested.
@@ -169,29 +158,32 @@
    * @param groupId The group id of our domain.
    */
   public ReplicationBroker(ReplicationDomain replicationDomain,
-    ServerState state, DN baseDn, short serverId, int maxReceiveQueue,
-    int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window,
-    long heartbeatInterval, long generationId,
+    ServerState state, String baseDn, short serverId, int window,
+    long generationId, long heartbeatInterval,
     ReplSessionSecurity replSessionSecurity, byte groupId)
   {
-    this.replicationDomain = replicationDomain;
+    this.domain = replicationDomain;
     this.baseDn = baseDn;
     this.serverId = serverId;
-    this.maxReceiveDelay = maxReceiveDelay;
-    this.maxSendDelay = maxSendDelay;
-    this.maxReceiveQueue = maxReceiveQueue;
-    this.maxSendQueue = maxSendQueue;
     this.state = state;
-    replayOperations =
-      new TreeSet<FakeOperation>(new FakeOperationComparator());
-    this.rcvWindow = window;
-    this.maxRcvWindow = window;
-    this.halfRcvWindow = window / 2;
-    this.heartbeatInterval = heartbeatInterval;
     this.protocolVersion = ProtocolVersion.getCurrentVersion();
-    this.generationId = generationId;
     this.replSessionSecurity = replSessionSecurity;
     this.groupId = groupId;
+    this.generationID = generationId;
+    this.heartbeatInterval = heartbeatInterval;
+    this.maxRcvWindow = window;
+    this.maxRcvWindow = window;
+    this.halfRcvWindow = window /2;
+  }
+
+  /**
+   * Start the ReplicationBroker.
+   */
+  public void start()
+  {
+    shutdown = false;
+    this.rcvWindow = this.maxRcvWindow;
+    this.connect();
   }
 
   /**
@@ -207,6 +199,7 @@
      */
     shutdown = false;
     this.servers = servers;
+
     if (servers.size() < 1)
     {
       Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
@@ -245,6 +238,18 @@
   }
 
   /**
+   * Gets the server id.
+   * @return The server id
+   */
+  private long getGenerationID()
+  {
+    if (domain != null)
+      return domain.getGenerationID();
+    else
+      return generationID;
+  }
+
+  /**
    * Gets the server url of the RS we are connected to.
    * @return The server url of the RS we are connected to
    */
@@ -324,12 +329,12 @@
 
     // May have created a broker with null replication domain for
     // unit test purpose.
-    if (replicationDomain != null)
+    if (domain != null)
     {
       // If a first connect or a connection failure occur, we go through here.
       // force status machine to NOT_CONNECTED_STATUS so that monitoring can
       // see that we are not connected.
-      replicationDomain.toNotConnectedStatus();
+      domain.toNotConnectedStatus();
     }
 
     // Stop any existing poller and heartbeat monitor from a previous session.
@@ -380,7 +385,8 @@
           ServerStatus initStatus =
             computeInitialServerStatus(replServerStartMsg.getGenerationId(),
             bestServerInfo.getServerState(),
-            replServerStartMsg.getDegradedStatusThreshold(), generationId);
+            replServerStartMsg.getDegradedStatusThreshold(),
+            this.getGenerationID());
 
           // Perfom session start (handshake phase 2)
           TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer,
@@ -414,81 +420,6 @@
               if ((tmpRsGroupId == groupId) ||
                 ((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
               {
-                /*
-                 * We must not publish changes to a replicationServer that has
-                 * not seen all our previous changes because this could cause
-                 * some other ldap servers to miss those changes.
-                 * Check that the ReplicationServer has seen all our previous
-                 * changes.
-                 */
-                ChangeNumber replServerMaxChangeNumber =
-                  replServerStartMsg.getServerState().
-                  getMaxChangeNumber(serverId);
-
-                if (replServerMaxChangeNumber == null)
-                {
-                  replServerMaxChangeNumber = new ChangeNumber(0, 0, serverId);
-                }
-                ChangeNumber ourMaxChangeNumber =
-                  state.getMaxChangeNumber(serverId);
-
-                if ((ourMaxChangeNumber != null) &&
-                  (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
-                {
-
-                  // Replication server is missing some of our changes: let's
-                  // send them to him.
-
-                  Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
-                  logError(message);
-
-                  /*
-                   * Get all the changes that have not been seen by this
-                   * replication server and populate the replayOperations
-                   * list.
-                   */
-                  InternalSearchOperation op = searchForChangedEntries(
-                    baseDn, replServerMaxChangeNumber, this);
-                  if (op.getResultCode() != ResultCode.SUCCESS)
-                  {
-                    /*
-                     * An error happened trying to search for the updates
-                     * This server will start acepting again new updates but
-                     * some inconsistencies will stay between servers.
-                     * Log an error for the repair tool
-                     * that will need to resynchronize the servers.
-                     */
-                    message = ERR_CANNOT_RECOVER_CHANGES.get(
-                      baseDn.toNormalizedString());
-                    logError(message);
-                  } else
-                  {
-                    for (FakeOperation replayOp : replayOperations)
-                    {
-                      ChangeNumber cn = replayOp.getChangeNumber();
-                      /*
-                       * Because the entry returned by the search operation
-                       * can contain old historical information, it is
-                       * possible that some of the FakeOperation are
-                       * actually older than the
-                       * Only send the Operation if it was newer than
-                       * the last ChangeNumber known by the Replication Server.
-                       */
-                      if (cn.newer(replServerMaxChangeNumber))
-                      {
-                        message =
-                          DEBUG_SENDING_CHANGE.get(
-                              replayOp.getChangeNumber().toString());
-                        logError(message);
-                        session.publish(replayOp.generateMessage());
-                      }
-                    }
-                    message = DEBUG_CHANGES_SENT.get();
-                    logError(message);
-                  }
-                  replayOperations.clear();
-                }
-
                 replicationServer = tmpReadableServerName;
                 maxSendWindow = replServerStartMsg.getWindowSize();
                 rsGroupId = replServerStartMsg.getGroupId();
@@ -497,11 +428,13 @@
 
                 // May have created a broker with null replication domain for
                 // unit test purpose.
-                if (replicationDomain != null)
+                if (domain != null)
                 {
-                  replicationDomain.setInitialStatus(initStatus);
-                  replicationDomain.receiveTopo(topologyMsg);
+                  domain.sessionInitiated(
+                      initStatus, replServerStartMsg.getServerState(),
+                      session);
                 }
+                receiveTopo(topologyMsg);
                 connected = true;
                 if (getRsGroupId() != groupId)
                 {
@@ -528,16 +461,10 @@
                 // Do not log connection error
                 newServerWithSameGroupId = true;
               }
-            } catch (IOException e)
-            {
-              Message message = ERR_PUBLISHING_FAKE_OPS.get(
-                baseDn.toNormalizedString(), bestServer,
-                e.getLocalizedMessage() + stackTraceToSingleLineString(e));
-              logError(message);
             } catch (Exception e)
             {
               Message message = ERR_COMPUTING_FAKE_OPS.get(
-                baseDn.toNormalizedString(), bestServer,
+                baseDn, bestServer,
                 e.getLocalizedMessage() + stackTraceToSingleLineString(e));
               logError(message);
             } finally
@@ -577,7 +504,7 @@
         this.sendWindow = new Semaphore(maxSendWindow);
         connectPhaseLock.notify();
 
-        if ((replServerStartMsg.getGenerationId() == this.generationId) ||
+        if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) ||
           (replServerStartMsg.getGenerationId() == -1))
         {
           Message message =
@@ -586,7 +513,7 @@
             Short.toString(rsServerId),
             replicationServer,
             Short.toString(serverId),
-            Long.toString(this.generationId));
+            Long.toString(this.getGenerationID()));
           logError(message);
         } else
         {
@@ -594,7 +521,7 @@
             NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
             baseDn.toString(),
             replicationServer,
-            Long.toString(this.generationId),
+            Long.toString(this.getGenerationID()),
             Long.toString(replServerStartMsg.getGenerationId()));
           logError(message);
         }
@@ -664,7 +591,7 @@
 
         if (debugEnabled())
         {
-          TRACER.debugInfo("RB for dn " + baseDn.toNormalizedString() +
+          TRACER.debugInfo("RB for dn " + baseDn +
             " and with server id " + Short.toString(serverId) + " computed " +
             Integer.toString(nChanges) + " changes late.");
         }
@@ -744,10 +671,11 @@
       /*
        * Send our ServerStartMsg.
        */
-      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn,
-        maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
-        halfRcvWindow * 2, heartbeatInterval, state,
-        ProtocolVersion.getCurrentVersion(), generationId, isSslEncryption,
+      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId,
+          baseDn, 0, 0, 0, 0,
+        maxRcvWindow, heartbeatInterval, state,
+        ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
+        isSslEncryption,
         groupId);
       localSession.publish(serverStartMsg);
 
@@ -764,11 +692,11 @@
       }
 
       // Sanity check
-      DN repDn = replServerStartMsg.getBaseDn();
+      String repDn = replServerStartMsg.getBaseDn();
       if (!(this.baseDn.equals(repDn)))
       {
         Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
-          this.baseDn.toString());
+          this.baseDn);
         logError(message);
         error = true;
       }
@@ -811,10 +739,10 @@
       if ( (e instanceof SocketTimeoutException) && debugEnabled() )
       {
         TRACER.debugInfo("Timeout trying to connect to RS " + server +
-          " for dn: " + baseDn.toNormalizedString());
+          " for dn: " + baseDn);
       }
       Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
-        baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
+        baseDn, server, e.getLocalizedMessage() +
         stackTraceToSingleLineString(e));
       if (keepConnection) // Log error message only for final connection
       {
@@ -880,13 +808,15 @@
       StartSessionMsg startSessionMsg = null;
       // May have created a broker with null replication domain for
       // unit test purpose.
-      if (replicationDomain != null)
+      if (domain != null)
       {
-        startSessionMsg = new StartSessionMsg(initStatus,
-          replicationDomain.getRefUrls(),
-          replicationDomain.isAssured(),
-          replicationDomain.getAssuredMode(),
-          replicationDomain.getAssuredSdLevel());
+        startSessionMsg =
+          new StartSessionMsg(
+              initStatus,
+              domain.getRefUrls(),
+              domain.isAssured(),
+              domain.getAssuredMode(),
+              domain.getAssuredSdLevel());
       } else
       {
         startSessionMsg =
@@ -912,7 +842,7 @@
     } catch (Exception e)
     {
       Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
-        baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
+        baseDn, server, e.getLocalizedMessage() +
         stackTraceToSingleLineString(e));
       logError(message);
 
@@ -950,7 +880,7 @@
    * @return The computed best replication server.
    */
   public static String computeBestReplicationServer(ServerState myState,
-    HashMap<String, ServerInfo> rsInfos, short serverId, DN baseDn,
+    HashMap<String, ServerInfo> rsInfos, short serverId, String baseDn,
     byte groupId)
   {
     /*
@@ -997,7 +927,7 @@
    * @return The computed best replication server.
    */
   private static String searchForBestReplicationServer(ServerState myState,
-    HashMap<String, ServerInfo> rsInfos, short serverId, DN baseDn)
+    HashMap<String, ServerInfo> rsInfos, short serverId, String baseDn)
   {
     /*
      * Find replication servers who are up to date (or more up to date than us,
@@ -1072,9 +1002,7 @@
        */
 
       Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
-        upToDateServers.size(),
-        baseDn.toNormalizedString(),
-        Short.toString(serverId));
+        upToDateServers.size(), baseDn, Short.toString(serverId));
       logError(message);
 
       /*
@@ -1154,7 +1082,7 @@
        */
       // lateOnes cannot be empty
       Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
-        baseDn.toNormalizedString(), lateOnes.size());
+        baseDn, lateOnes.size());
       logError(message);
 
       // Min of the shifts
@@ -1190,48 +1118,6 @@
   }
 
   /**
-   * Search for the changes that happened since fromChangeNumber
-   * based on the historical attribute. The only changes that will
-   * be send will be the one generated on the serverId provided in
-   * fromChangeNumber.
-   * @param baseDn the base DN
-   * @param fromChangeNumber The change number from which we want the changes
-   * @param resultListener that will process the entries returned.
-   * @return the internal search operation
-   * @throws Exception when raised.
-   */
-  public static InternalSearchOperation searchForChangedEntries(
-    DN baseDn,
-    ChangeNumber fromChangeNumber,
-    InternalSearchListener resultListener)
-    throws Exception
-  {
-    InternalClientConnection conn =
-      InternalClientConnection.getRootConnection();
-    Short serverId = fromChangeNumber.getServerId();
-
-    String maxValueForId = "ffffffffffffffff" +
-      String.format("%04x", serverId) + "ffffffff";
-
-    LDAPFilter filter = LDAPFilter.decode(
-       "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
-       + fromChangeNumber + ")(" + Historical.HISTORICALATTRIBUTENAME +
-       "<=dummy:" + maxValueForId + "))");
-
-    LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
-    attrs.add(Historical.HISTORICALATTRIBUTENAME);
-    attrs.add(Historical.ENTRYUIDNAME);
-    attrs.add("*");
-    return conn.processSearch(
-      new ASN1OctetString(baseDn.toString()),
-      SearchScope.WHOLE_SUBTREE,
-      DereferencePolicy.NEVER_DEREF_ALIASES,
-      0, 0, false, filter,
-      attrs,
-      resultListener);
-  }
-
-  /**
    * Start the heartbeat monitor thread.
    */
   private void startHeartBeat()
@@ -1325,7 +1211,7 @@
       {
         MessageBuilder mb = new MessageBuilder();
         mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
-          baseDn.toNormalizedString(), e.getLocalizedMessage()));
+          baseDn, e.getLocalizedMessage()));
         mb.append(stackTraceToSingleLineString(e));
         logError(mb.toMessage());
       }
@@ -1413,7 +1299,7 @@
             }
           }
         }
-        if (!credit)
+        if ((!credit) && (currentWindowSemaphore.availablePermits() == 0))
         {
           // the window is still closed.
           // Send a WindowProbeMsg message to wakeup the receiver in case the
@@ -1436,7 +1322,7 @@
             if (debugEnabled())
             {
               debugInfo("ReplicationBroker.publish() " +
-                "IO exception raised : " + e.getLocalizedMessage());
+                "Interrupted exception raised : " + e.getLocalizedMessage());
             }
           }
         }
@@ -1479,7 +1365,13 @@
         {
           WindowMsg windowMsg = (WindowMsg) msg;
           sendWindow.release(windowMsg.getNumAck());
-        } else
+        }
+        else if (msg instanceof TopologyMsg)
+        {
+          TopologyMsg topoMsg = (TopologyMsg)msg;
+          receiveTopo(topoMsg);
+        }
+        else
         {
           return msg;
         }
@@ -1580,20 +1472,6 @@
   }
 
   /**
-   * Set the value of the generationId for that broker. Normally the
-   * generationId is set through the constructor but there are cases
-   * where the value of the generationId must be changed while the broker
-   * already exist for example after an on-line import.
-   *
-   * @param generationId The value of the generationId.
-   *
-   */
-  public void setGenerationId(long generationId)
-  {
-    this.generationId = generationId;
-  }
-
-  /**
    * Get the name of the replicationServer to which this broker is currently
    * connected.
    *
@@ -1606,39 +1484,6 @@
   }
 
   /**
-   * {@inheritDoc}
-   */
-  public void handleInternalSearchEntry(
-    InternalSearchOperation searchOperation,
-    SearchResultEntry searchEntry)
-  {
-    /*
-     * This call back is called at session establishment phase
-     * for each entry that has been changed by this server and the changes
-     * have not been sent to any Replication Server.
-     * The role of this method is to build equivalent operation from
-     * the historical information and add them in the replayOperations
-     * table.
-     */
-    Iterable<FakeOperation> updates =
-      Historical.generateFakeOperations(searchEntry);
-    for (FakeOperation op : updates)
-    {
-      replayOperations.add(op);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public void handleInternalSearchReference(
-    InternalSearchOperation searchOperation,
-    SearchResultReference searchReference)
-  {
-    // TODO to be implemented
-  }
-
-  /**
    * Get the maximum receive window size.
    *
    * @return The maximum receive window size.
@@ -1694,31 +1539,41 @@
   }
 
   /**
-   * Change some config parameters.
+   * Change some configuration parameters.
    *
-   * @param replicationServers    The new list of replication servers.
-   * @param maxReceiveQueue     The max size of receive queue.
-   * @param maxReceiveDelay     The max receive delay.
-   * @param maxSendQueue        The max send queue.
-   * @param maxSendDelay        The max Send Delay.
+   * @param replicationServers  The new list of replication servers.
    * @param window              The max window size.
-   * @param heartbeatInterval   The heartbeat interval.
+   * @param heartbeatInterval   The heartBeat interval.
+   *
+   * @return                    A boolean indicating if the changes
+   *                            requires to restart the service.
    */
-  public void changeConfig(Collection<String> replicationServers,
-    int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
-    int maxSendDelay, int window, long heartbeatInterval)
+  public boolean changeConfig(
+      Collection<String> replicationServers, int window, long heartbeatInterval)
   {
-    this.servers = replicationServers;
-    this.maxRcvWindow = window;
-    this.heartbeatInterval = heartbeatInterval;
-    this.maxReceiveDelay = maxReceiveDelay;
-    this.maxReceiveQueue = maxReceiveQueue;
-    this.maxSendDelay = maxSendDelay;
-    this.maxSendQueue = maxSendQueue;
+    // These parameters needs to be renegociated with the ReplicationServer
+    // so if they have changed, that requires restarting the session with
+    // the ReplicationServer.
+    Boolean needToRestartSession = false;
 
-    // For info, a new session with the replicationServer
-    // will be recreated in the replication domain
-    // to take into account the new configuration.
+    // A new session is necessary only when information regarding
+    // the connection is modified
+    if ((servers == null) ||
+        (!(replicationServers.size() == servers.size()
+        && replicationServers.containsAll(servers))) ||
+        window != this.maxRcvWindow  ||
+        heartbeatInterval != this.heartbeatInterval)
+    {
+      needToRestartSession = true;
+    }
+
+    this.servers = replicationServers;
+    this.rcvWindow = window;
+    this.maxRcvWindow = window;
+    this.halfRcvWindow = window / 2;
+    this.heartbeatInterval = heartbeatInterval;
+
+    return needToRestartSession;
   }
 
   /**
@@ -1900,14 +1755,13 @@
   {
     try
     {
-
       ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS,
         newStatus);
       session.publish(csMsg);
     } catch (IOException ex)
     {
       Message message = ERR_EXCEPTION_SENDING_CS.get(
-        baseDn.toNormalizedString(),
+        baseDn,
         Short.toString(serverId),
         ex.getLocalizedMessage() + stackTraceToSingleLineString(ex));
       logError(message);
@@ -1922,4 +1776,48 @@
   {
     this.groupId = groupId;
   }
+
+  /**
+   * Gets the info for DSs in the topology (except us).
+   * @return The info for DSs in the topology (except us)
+   */
+  public List<DSInfo> getDsList()
+  {
+    return dsList;
+  }
+
+  /**
+   * Gets the info for RSs in the topology (except the one we are connected
+   * to).
+   * @return The info for RSs in the topology (except the one we are connected
+   * to)
+   */
+  public List<RSInfo> getRsList()
+  {
+    return rsList;
+  }
+
+  /**
+   * Processes an incoming TopologyMsg.
+   * Updates the structures for the local view of the topology.
+   *
+   * @param topoMsg The topology information received from RS.
+   */
+  public void receiveTopo(TopologyMsg topoMsg)
+  {
+
+    if (debugEnabled())
+      TRACER.debugInfo("Replication domain " + baseDn
+        + " received topology info update:\n" + topoMsg);
+
+    // Store new lists
+    synchronized(getDsList())
+    {
+      synchronized(getRsList())
+      {
+        dsList = topoMsg.getDsList();
+        rsList = topoMsg.getRsList();
+      }
+    }
+  }
 }

--
Gitblit v1.10.0