mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
08.03.2008 7adb93986ace907531875e25be1f94d735fbb068
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -24,12 +24,15 @@
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
package org.opends.server.replication.service;
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -45,34 +48,22 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.server.api.DirectoryThread;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchListener;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
import org.opends.server.types.SearchScope;
/**
 * The broker for Multi-master Replication.
 */
public class ReplicationBroker implements InternalSearchListener
public class ReplicationBroker
{
  /**
@@ -83,23 +74,17 @@
  private Collection<String> servers;
  private boolean connected = false;
  private String replicationServer = "Not connected";
  private TreeSet<FakeOperation> replayOperations;
  private ProtocolSession session = null;
  private final ServerState state;
  private final DN baseDn;
  private final String baseDn;
  private final short serverId;
  private int maxSendDelay;
  private int maxReceiveDelay;
  private int maxSendQueue;
  private int maxReceiveQueue;
  private Semaphore sendWindow;
  private int maxSendWindow;
  private int rcvWindow;
  private int halfRcvWindow;
  private int maxRcvWindow;
  private int rcvWindow = 100;
  private int halfRcvWindow = rcvWindow/2;
  private int maxRcvWindow = rcvWindow;
  private int timeout = 0;
  private short protocolVersion;
  private long generationId = -1;
  private ReplSessionSecurity replSessionSecurity;
  // My group id
  private byte groupId = (byte) -1;
@@ -110,7 +95,7 @@
  // The server URL of the RS we are connected to
  private String rsServerUrl = null;
  // Our replication domain
  private ReplicationDomain replicationDomain = null;
  private ReplicationDomain domain = null;
  // Trick for avoiding a inner class for many parameters return for
  // performPhaseOneHandshake method.
@@ -142,6 +127,17 @@
  // Same group id poller thread
  private SameGroupIdPoller sameGroupIdPoller = null;
  /*
   * Properties for the last topology info received from the network.
   */
  // Info for other DSs.
  // Warning: does not contain info for us (for our server id)
  private List<DSInfo> dsList = new ArrayList<DSInfo>();
  // Info for other RSs.
  private List<RSInfo> rsList = new ArrayList<RSInfo>();
  private long generationID;
  /**
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
   *
@@ -152,13 +148,6 @@
   *              when negotiating the session with the replicationServer.
   * @param serverId The server ID that should be used by this broker
   *              when negotiating the session with the replicationServer.
   * @param maxReceiveQueue The maximum size of the receive queue to use on
   *                         the replicationServer.
   * @param maxReceiveDelay The maximum replication delay to use on the
   *                        replicationServer.
   * @param maxSendQueue The maximum size of the send queue to use on
   *                     the replicationServer.
   * @param maxSendDelay The maximum send delay to use on the replicationServer.
   * @param window The size of the send and receive window to use.
   * @param heartbeatInterval The interval between heartbeats requested of the
   * replicationServer, or zero if no heartbeats are requested.
@@ -169,29 +158,32 @@
   * @param groupId The group id of our domain.
   */
  public ReplicationBroker(ReplicationDomain replicationDomain,
    ServerState state, DN baseDn, short serverId, int maxReceiveQueue,
    int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window,
    long heartbeatInterval, long generationId,
    ServerState state, String baseDn, short serverId, int window,
    long generationId, long heartbeatInterval,
    ReplSessionSecurity replSessionSecurity, byte groupId)
  {
    this.replicationDomain = replicationDomain;
    this.domain = replicationDomain;
    this.baseDn = baseDn;
    this.serverId = serverId;
    this.maxReceiveDelay = maxReceiveDelay;
    this.maxSendDelay = maxSendDelay;
    this.maxReceiveQueue = maxReceiveQueue;
    this.maxSendQueue = maxSendQueue;
    this.state = state;
    replayOperations =
      new TreeSet<FakeOperation>(new FakeOperationComparator());
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window / 2;
    this.heartbeatInterval = heartbeatInterval;
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
    this.generationId = generationId;
    this.replSessionSecurity = replSessionSecurity;
    this.groupId = groupId;
    this.generationID = generationId;
    this.heartbeatInterval = heartbeatInterval;
    this.maxRcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window /2;
  }
  /**
   * Start the ReplicationBroker.
   */
  public void start()
  {
    shutdown = false;
    this.rcvWindow = this.maxRcvWindow;
    this.connect();
  }
  /**
@@ -207,6 +199,7 @@
     */
    shutdown = false;
    this.servers = servers;
    if (servers.size() < 1)
    {
      Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
@@ -245,6 +238,18 @@
  }
  /**
   * Gets the server id.
   * @return The server id
   */
  private long getGenerationID()
  {
    if (domain != null)
      return domain.getGenerationID();
    else
      return generationID;
  }
  /**
   * Gets the server url of the RS we are connected to.
   * @return The server url of the RS we are connected to
   */
@@ -324,12 +329,12 @@
    // May have created a broker with null replication domain for
    // unit test purpose.
    if (replicationDomain != null)
    if (domain != null)
    {
      // If a first connect or a connection failure occur, we go through here.
      // force status machine to NOT_CONNECTED_STATUS so that monitoring can
      // see that we are not connected.
      replicationDomain.toNotConnectedStatus();
      domain.toNotConnectedStatus();
    }
    // Stop any existing poller and heartbeat monitor from a previous session.
@@ -380,7 +385,8 @@
          ServerStatus initStatus =
            computeInitialServerStatus(replServerStartMsg.getGenerationId(),
            bestServerInfo.getServerState(),
            replServerStartMsg.getDegradedStatusThreshold(), generationId);
            replServerStartMsg.getDegradedStatusThreshold(),
            this.getGenerationID());
          // Perfom session start (handshake phase 2)
          TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer,
@@ -414,81 +420,6 @@
              if ((tmpRsGroupId == groupId) ||
                ((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
              {
                /*
                 * We must not publish changes to a replicationServer that has
                 * not seen all our previous changes because this could cause
                 * some other ldap servers to miss those changes.
                 * Check that the ReplicationServer has seen all our previous
                 * changes.
                 */
                ChangeNumber replServerMaxChangeNumber =
                  replServerStartMsg.getServerState().
                  getMaxChangeNumber(serverId);
                if (replServerMaxChangeNumber == null)
                {
                  replServerMaxChangeNumber = new ChangeNumber(0, 0, serverId);
                }
                ChangeNumber ourMaxChangeNumber =
                  state.getMaxChangeNumber(serverId);
                if ((ourMaxChangeNumber != null) &&
                  (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
                {
                  // Replication server is missing some of our changes: let's
                  // send them to him.
                  Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
                  logError(message);
                  /*
                   * Get all the changes that have not been seen by this
                   * replication server and populate the replayOperations
                   * list.
                   */
                  InternalSearchOperation op = searchForChangedEntries(
                    baseDn, replServerMaxChangeNumber, this);
                  if (op.getResultCode() != ResultCode.SUCCESS)
                  {
                    /*
                     * An error happened trying to search for the updates
                     * This server will start acepting again new updates but
                     * some inconsistencies will stay between servers.
                     * Log an error for the repair tool
                     * that will need to resynchronize the servers.
                     */
                    message = ERR_CANNOT_RECOVER_CHANGES.get(
                      baseDn.toNormalizedString());
                    logError(message);
                  } else
                  {
                    for (FakeOperation replayOp : replayOperations)
                    {
                      ChangeNumber cn = replayOp.getChangeNumber();
                      /*
                       * Because the entry returned by the search operation
                       * can contain old historical information, it is
                       * possible that some of the FakeOperation are
                       * actually older than the
                       * Only send the Operation if it was newer than
                       * the last ChangeNumber known by the Replication Server.
                       */
                      if (cn.newer(replServerMaxChangeNumber))
                      {
                        message =
                          DEBUG_SENDING_CHANGE.get(
                              replayOp.getChangeNumber().toString());
                        logError(message);
                        session.publish(replayOp.generateMessage());
                      }
                    }
                    message = DEBUG_CHANGES_SENT.get();
                    logError(message);
                  }
                  replayOperations.clear();
                }
                replicationServer = tmpReadableServerName;
                maxSendWindow = replServerStartMsg.getWindowSize();
                rsGroupId = replServerStartMsg.getGroupId();
@@ -497,11 +428,13 @@
                // May have created a broker with null replication domain for
                // unit test purpose.
                if (replicationDomain != null)
                if (domain != null)
                {
                  replicationDomain.setInitialStatus(initStatus);
                  replicationDomain.receiveTopo(topologyMsg);
                  domain.sessionInitiated(
                      initStatus, replServerStartMsg.getServerState(),
                      session);
                }
                receiveTopo(topologyMsg);
                connected = true;
                if (getRsGroupId() != groupId)
                {
@@ -528,16 +461,10 @@
                // Do not log connection error
                newServerWithSameGroupId = true;
              }
            } catch (IOException e)
            {
              Message message = ERR_PUBLISHING_FAKE_OPS.get(
                baseDn.toNormalizedString(), bestServer,
                e.getLocalizedMessage() + stackTraceToSingleLineString(e));
              logError(message);
            } catch (Exception e)
            {
              Message message = ERR_COMPUTING_FAKE_OPS.get(
                baseDn.toNormalizedString(), bestServer,
                baseDn, bestServer,
                e.getLocalizedMessage() + stackTraceToSingleLineString(e));
              logError(message);
            } finally
@@ -577,7 +504,7 @@
        this.sendWindow = new Semaphore(maxSendWindow);
        connectPhaseLock.notify();
        if ((replServerStartMsg.getGenerationId() == this.generationId) ||
        if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) ||
          (replServerStartMsg.getGenerationId() == -1))
        {
          Message message =
@@ -586,7 +513,7 @@
            Short.toString(rsServerId),
            replicationServer,
            Short.toString(serverId),
            Long.toString(this.generationId));
            Long.toString(this.getGenerationID()));
          logError(message);
        } else
        {
@@ -594,7 +521,7 @@
            NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
            baseDn.toString(),
            replicationServer,
            Long.toString(this.generationId),
            Long.toString(this.getGenerationID()),
            Long.toString(replServerStartMsg.getGenerationId()));
          logError(message);
        }
@@ -664,7 +591,7 @@
        if (debugEnabled())
        {
          TRACER.debugInfo("RB for dn " + baseDn.toNormalizedString() +
          TRACER.debugInfo("RB for dn " + baseDn +
            " and with server id " + Short.toString(serverId) + " computed " +
            Integer.toString(nChanges) + " changes late.");
        }
@@ -744,10 +671,11 @@
      /*
       * Send our ServerStartMsg.
       */
      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn,
        maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
        halfRcvWindow * 2, heartbeatInterval, state,
        ProtocolVersion.getCurrentVersion(), generationId, isSslEncryption,
      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId,
          baseDn, 0, 0, 0, 0,
        maxRcvWindow, heartbeatInterval, state,
        ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
        isSslEncryption,
        groupId);
      localSession.publish(serverStartMsg);
@@ -764,11 +692,11 @@
      }
      // Sanity check
      DN repDn = replServerStartMsg.getBaseDn();
      String repDn = replServerStartMsg.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      {
        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
          this.baseDn.toString());
          this.baseDn);
        logError(message);
        error = true;
      }
@@ -811,10 +739,10 @@
      if ( (e instanceof SocketTimeoutException) && debugEnabled() )
      {
        TRACER.debugInfo("Timeout trying to connect to RS " + server +
          " for dn: " + baseDn.toNormalizedString());
          " for dn: " + baseDn);
      }
      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
        baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
        baseDn, server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      if (keepConnection) // Log error message only for final connection
      {
@@ -880,13 +808,15 @@
      StartSessionMsg startSessionMsg = null;
      // May have created a broker with null replication domain for
      // unit test purpose.
      if (replicationDomain != null)
      if (domain != null)
      {
        startSessionMsg = new StartSessionMsg(initStatus,
          replicationDomain.getRefUrls(),
          replicationDomain.isAssured(),
          replicationDomain.getAssuredMode(),
          replicationDomain.getAssuredSdLevel());
        startSessionMsg =
          new StartSessionMsg(
              initStatus,
              domain.getRefUrls(),
              domain.isAssured(),
              domain.getAssuredMode(),
              domain.getAssuredSdLevel());
      } else
      {
        startSessionMsg =
@@ -912,7 +842,7 @@
    } catch (Exception e)
    {
      Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
        baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
        baseDn, server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      logError(message);
@@ -950,7 +880,7 @@
   * @return The computed best replication server.
   */
  public static String computeBestReplicationServer(ServerState myState,
    HashMap<String, ServerInfo> rsInfos, short serverId, DN baseDn,
    HashMap<String, ServerInfo> rsInfos, short serverId, String baseDn,
    byte groupId)
  {
    /*
@@ -997,7 +927,7 @@
   * @return The computed best replication server.
   */
  private static String searchForBestReplicationServer(ServerState myState,
    HashMap<String, ServerInfo> rsInfos, short serverId, DN baseDn)
    HashMap<String, ServerInfo> rsInfos, short serverId, String baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
@@ -1072,9 +1002,7 @@
       */
      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
        upToDateServers.size(),
        baseDn.toNormalizedString(),
        Short.toString(serverId));
        upToDateServers.size(), baseDn, Short.toString(serverId));
      logError(message);
      /*
@@ -1154,7 +1082,7 @@
       */
      // lateOnes cannot be empty
      Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
        baseDn.toNormalizedString(), lateOnes.size());
        baseDn, lateOnes.size());
      logError(message);
      // Min of the shifts
@@ -1190,48 +1118,6 @@
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute. The only changes that will
   * be send will be the one generated on the serverId provided in
   * fromChangeNumber.
   * @param baseDn the base DN
   * @param fromChangeNumber The change number from which we want the changes
   * @param resultListener that will process the entries returned.
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  {
    InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
    Short serverId = fromChangeNumber.getServerId();
    String maxValueForId = "ffffffffffffffff" +
      String.format("%04x", serverId) + "ffffffff";
    LDAPFilter filter = LDAPFilter.decode(
       "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
       + fromChangeNumber + ")(" + Historical.HISTORICALATTRIBUTENAME +
       "<=dummy:" + maxValueForId + "))");
    LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
    attrs.add(Historical.HISTORICALATTRIBUTENAME);
    attrs.add(Historical.ENTRYUIDNAME);
    attrs.add("*");
    return conn.processSearch(
      new ASN1OctetString(baseDn.toString()),
      SearchScope.WHOLE_SUBTREE,
      DereferencePolicy.NEVER_DEREF_ALIASES,
      0, 0, false, filter,
      attrs,
      resultListener);
  }
  /**
   * Start the heartbeat monitor thread.
   */
  private void startHeartBeat()
@@ -1325,7 +1211,7 @@
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
          baseDn.toNormalizedString(), e.getLocalizedMessage()));
          baseDn, e.getLocalizedMessage()));
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
@@ -1413,7 +1299,7 @@
            }
          }
        }
        if (!credit)
        if ((!credit) && (currentWindowSemaphore.availablePermits() == 0))
        {
          // the window is still closed.
          // Send a WindowProbeMsg message to wakeup the receiver in case the
@@ -1436,7 +1322,7 @@
            if (debugEnabled())
            {
              debugInfo("ReplicationBroker.publish() " +
                "IO exception raised : " + e.getLocalizedMessage());
                "Interrupted exception raised : " + e.getLocalizedMessage());
            }
          }
        }
@@ -1479,7 +1365,13 @@
        {
          WindowMsg windowMsg = (WindowMsg) msg;
          sendWindow.release(windowMsg.getNumAck());
        } else
        }
        else if (msg instanceof TopologyMsg)
        {
          TopologyMsg topoMsg = (TopologyMsg)msg;
          receiveTopo(topoMsg);
        }
        else
        {
          return msg;
        }
@@ -1580,20 +1472,6 @@
  }
  /**
   * Set the value of the generationId for that broker. Normally the
   * generationId is set through the constructor but there are cases
   * where the value of the generationId must be changed while the broker
   * already exist for example after an on-line import.
   *
   * @param generationId The value of the generationId.
   *
   */
  public void setGenerationId(long generationId)
  {
    this.generationId = generationId;
  }
  /**
   * Get the name of the replicationServer to which this broker is currently
   * connected.
   *
@@ -1606,39 +1484,6 @@
  }
  /**
   * {@inheritDoc}
   */
  public void handleInternalSearchEntry(
    InternalSearchOperation searchOperation,
    SearchResultEntry searchEntry)
  {
    /*
     * This call back is called at session establishment phase
     * for each entry that has been changed by this server and the changes
     * have not been sent to any Replication Server.
     * The role of this method is to build equivalent operation from
     * the historical information and add them in the replayOperations
     * table.
     */
    Iterable<FakeOperation> updates =
      Historical.generateFakeOperations(searchEntry);
    for (FakeOperation op : updates)
    {
      replayOperations.add(op);
    }
  }
  /**
   * {@inheritDoc}
   */
  public void handleInternalSearchReference(
    InternalSearchOperation searchOperation,
    SearchResultReference searchReference)
  {
    // TODO to be implemented
  }
  /**
   * Get the maximum receive window size.
   *
   * @return The maximum receive window size.
@@ -1694,31 +1539,41 @@
  }
  /**
   * Change some config parameters.
   * Change some configuration parameters.
   *
   * @param replicationServers    The new list of replication servers.
   * @param maxReceiveQueue     The max size of receive queue.
   * @param maxReceiveDelay     The max receive delay.
   * @param maxSendQueue        The max send queue.
   * @param maxSendDelay        The max Send Delay.
   * @param replicationServers  The new list of replication servers.
   * @param window              The max window size.
   * @param heartbeatInterval   The heartbeat interval.
   * @param heartbeatInterval   The heartBeat interval.
   *
   * @return                    A boolean indicating if the changes
   *                            requires to restart the service.
   */
  public void changeConfig(Collection<String> replicationServers,
    int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
    int maxSendDelay, int window, long heartbeatInterval)
  public boolean changeConfig(
      Collection<String> replicationServers, int window, long heartbeatInterval)
  {
    this.servers = replicationServers;
    this.maxRcvWindow = window;
    this.heartbeatInterval = heartbeatInterval;
    this.maxReceiveDelay = maxReceiveDelay;
    this.maxReceiveQueue = maxReceiveQueue;
    this.maxSendDelay = maxSendDelay;
    this.maxSendQueue = maxSendQueue;
    // These parameters needs to be renegociated with the ReplicationServer
    // so if they have changed, that requires restarting the session with
    // the ReplicationServer.
    Boolean needToRestartSession = false;
    // For info, a new session with the replicationServer
    // will be recreated in the replication domain
    // to take into account the new configuration.
    // A new session is necessary only when information regarding
    // the connection is modified
    if ((servers == null) ||
        (!(replicationServers.size() == servers.size()
        && replicationServers.containsAll(servers))) ||
        window != this.maxRcvWindow  ||
        heartbeatInterval != this.heartbeatInterval)
    {
      needToRestartSession = true;
    }
    this.servers = replicationServers;
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window / 2;
    this.heartbeatInterval = heartbeatInterval;
    return needToRestartSession;
  }
  /**
@@ -1900,14 +1755,13 @@
  {
    try
    {
      ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS,
        newStatus);
      session.publish(csMsg);
    } catch (IOException ex)
    {
      Message message = ERR_EXCEPTION_SENDING_CS.get(
        baseDn.toNormalizedString(),
        baseDn,
        Short.toString(serverId),
        ex.getLocalizedMessage() + stackTraceToSingleLineString(ex));
      logError(message);
@@ -1922,4 +1776,48 @@
  {
    this.groupId = groupId;
  }
  /**
   * Gets the info for DSs in the topology (except us).
   * @return The info for DSs in the topology (except us)
   */
  public List<DSInfo> getDsList()
  {
    return dsList;
  }
  /**
   * Gets the info for RSs in the topology (except the one we are connected
   * to).
   * @return The info for RSs in the topology (except the one we are connected
   * to)
   */
  public List<RSInfo> getRsList()
  {
    return rsList;
  }
  /**
   * Processes an incoming TopologyMsg.
   * Updates the structures for the local view of the topology.
   *
   * @param topoMsg The topology information received from RS.
   */
  public void receiveTopo(TopologyMsg topoMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDn
        + " received topology info update:\n" + topoMsg);
    // Store new lists
    synchronized(getDsList())
    {
      synchronized(getRsList())
      {
        dsList = topoMsg.getDsList();
        rsList = topoMsg.getRsList();
      }
    }
  }
}