From d04fb0f282e0fd9a4bc80d3f9d5ee15506a3b83b Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 450 +++++++++++++++++++++----------------------------------
1 files changed, 174 insertions(+), 276 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
similarity index 81%
rename from opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
rename to opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 27a09c7..16e428d 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -24,12 +24,15 @@
*
* Copyright 2006-2008 Sun Microsystems, Inc.
*/
-package org.opends.server.replication.plugin;
+package org.opends.server.replication.service;
+
+import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
+
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -45,34 +48,22 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.List;
-import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.server.api.DirectoryThread;
-import org.opends.server.protocols.asn1.ASN1OctetString;
-import org.opends.server.protocols.internal.InternalClientConnection;
-import org.opends.server.protocols.internal.InternalSearchListener;
-import org.opends.server.protocols.internal.InternalSearchOperation;
-import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
-import org.opends.server.types.DN;
-import org.opends.server.types.DereferencePolicy;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SearchResultEntry;
-import org.opends.server.types.SearchResultReference;
-import org.opends.server.types.SearchScope;
/**
* The broker for Multi-master Replication.
*/
-public class ReplicationBroker implements InternalSearchListener
+public class ReplicationBroker
{
/**
@@ -83,23 +74,17 @@
private Collection<String> servers;
private boolean connected = false;
private String replicationServer = "Not connected";
- private TreeSet<FakeOperation> replayOperations;
private ProtocolSession session = null;
private final ServerState state;
- private final DN baseDn;
+ private final String baseDn;
private final short serverId;
- private int maxSendDelay;
- private int maxReceiveDelay;
- private int maxSendQueue;
- private int maxReceiveQueue;
private Semaphore sendWindow;
private int maxSendWindow;
- private int rcvWindow;
- private int halfRcvWindow;
- private int maxRcvWindow;
+ private int rcvWindow = 100;
+ private int halfRcvWindow = rcvWindow/2;
+ private int maxRcvWindow = rcvWindow;
private int timeout = 0;
private short protocolVersion;
- private long generationId = -1;
private ReplSessionSecurity replSessionSecurity;
// My group id
private byte groupId = (byte) -1;
@@ -110,7 +95,7 @@
// The server URL of the RS we are connected to
private String rsServerUrl = null;
// Our replication domain
- private ReplicationDomain replicationDomain = null;
+ private ReplicationDomain domain = null;
// Trick for avoiding a inner class for many parameters return for
// performPhaseOneHandshake method.
@@ -142,6 +127,17 @@
// Same group id poller thread
private SameGroupIdPoller sameGroupIdPoller = null;
+ /*
+ * Properties for the last topology info received from the network.
+ */
+ // Info for other DSs.
+ // Warning: does not contain info for us (for our server id)
+ private List<DSInfo> dsList = new ArrayList<DSInfo>();
+ // Info for other RSs.
+ private List<RSInfo> rsList = new ArrayList<RSInfo>();
+
+ private long generationID;
+
/**
* Creates a new ReplicationServer Broker for a particular ReplicationDomain.
*
@@ -152,13 +148,6 @@
* when negotiating the session with the replicationServer.
* @param serverId The server ID that should be used by this broker
* when negotiating the session with the replicationServer.
- * @param maxReceiveQueue The maximum size of the receive queue to use on
- * the replicationServer.
- * @param maxReceiveDelay The maximum replication delay to use on the
- * replicationServer.
- * @param maxSendQueue The maximum size of the send queue to use on
- * the replicationServer.
- * @param maxSendDelay The maximum send delay to use on the replicationServer.
* @param window The size of the send and receive window to use.
* @param heartbeatInterval The interval between heartbeats requested of the
* replicationServer, or zero if no heartbeats are requested.
@@ -169,29 +158,32 @@
* @param groupId The group id of our domain.
*/
public ReplicationBroker(ReplicationDomain replicationDomain,
- ServerState state, DN baseDn, short serverId, int maxReceiveQueue,
- int maxReceiveDelay, int maxSendQueue, int maxSendDelay, int window,
- long heartbeatInterval, long generationId,
+ ServerState state, String baseDn, short serverId, int window,
+ long generationId, long heartbeatInterval,
ReplSessionSecurity replSessionSecurity, byte groupId)
{
- this.replicationDomain = replicationDomain;
+ this.domain = replicationDomain;
this.baseDn = baseDn;
this.serverId = serverId;
- this.maxReceiveDelay = maxReceiveDelay;
- this.maxSendDelay = maxSendDelay;
- this.maxReceiveQueue = maxReceiveQueue;
- this.maxSendQueue = maxSendQueue;
this.state = state;
- replayOperations =
- new TreeSet<FakeOperation>(new FakeOperationComparator());
- this.rcvWindow = window;
- this.maxRcvWindow = window;
- this.halfRcvWindow = window / 2;
- this.heartbeatInterval = heartbeatInterval;
this.protocolVersion = ProtocolVersion.getCurrentVersion();
- this.generationId = generationId;
this.replSessionSecurity = replSessionSecurity;
this.groupId = groupId;
+ this.generationID = generationId;
+ this.heartbeatInterval = heartbeatInterval;
+ this.maxRcvWindow = window;
+ this.maxRcvWindow = window;
+ this.halfRcvWindow = window /2;
+ }
+
+ /**
+ * Start the ReplicationBroker.
+ */
+ public void start()
+ {
+ shutdown = false;
+ this.rcvWindow = this.maxRcvWindow;
+ this.connect();
}
/**
@@ -207,6 +199,7 @@
*/
shutdown = false;
this.servers = servers;
+
if (servers.size() < 1)
{
Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
@@ -245,6 +238,18 @@
}
/**
+ * Gets the server id.
+ * @return The server id
+ */
+ private long getGenerationID()
+ {
+ if (domain != null)
+ return domain.getGenerationID();
+ else
+ return generationID;
+ }
+
+ /**
* Gets the server url of the RS we are connected to.
* @return The server url of the RS we are connected to
*/
@@ -324,12 +329,12 @@
// May have created a broker with null replication domain for
// unit test purpose.
- if (replicationDomain != null)
+ if (domain != null)
{
// If a first connect or a connection failure occur, we go through here.
// force status machine to NOT_CONNECTED_STATUS so that monitoring can
// see that we are not connected.
- replicationDomain.toNotConnectedStatus();
+ domain.toNotConnectedStatus();
}
// Stop any existing poller and heartbeat monitor from a previous session.
@@ -380,7 +385,8 @@
ServerStatus initStatus =
computeInitialServerStatus(replServerStartMsg.getGenerationId(),
bestServerInfo.getServerState(),
- replServerStartMsg.getDegradedStatusThreshold(), generationId);
+ replServerStartMsg.getDegradedStatusThreshold(),
+ this.getGenerationID());
// Perfom session start (handshake phase 2)
TopologyMsg topologyMsg = performPhaseTwoHandshake(bestServer,
@@ -414,81 +420,6 @@
if ((tmpRsGroupId == groupId) ||
((tmpRsGroupId != groupId) && !someServersWithSameGroupId))
{
- /*
- * We must not publish changes to a replicationServer that has
- * not seen all our previous changes because this could cause
- * some other ldap servers to miss those changes.
- * Check that the ReplicationServer has seen all our previous
- * changes.
- */
- ChangeNumber replServerMaxChangeNumber =
- replServerStartMsg.getServerState().
- getMaxChangeNumber(serverId);
-
- if (replServerMaxChangeNumber == null)
- {
- replServerMaxChangeNumber = new ChangeNumber(0, 0, serverId);
- }
- ChangeNumber ourMaxChangeNumber =
- state.getMaxChangeNumber(serverId);
-
- if ((ourMaxChangeNumber != null) &&
- (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
- {
-
- // Replication server is missing some of our changes: let's
- // send them to him.
-
- Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
- logError(message);
-
- /*
- * Get all the changes that have not been seen by this
- * replication server and populate the replayOperations
- * list.
- */
- InternalSearchOperation op = searchForChangedEntries(
- baseDn, replServerMaxChangeNumber, this);
- if (op.getResultCode() != ResultCode.SUCCESS)
- {
- /*
- * An error happened trying to search for the updates
- * This server will start acepting again new updates but
- * some inconsistencies will stay between servers.
- * Log an error for the repair tool
- * that will need to resynchronize the servers.
- */
- message = ERR_CANNOT_RECOVER_CHANGES.get(
- baseDn.toNormalizedString());
- logError(message);
- } else
- {
- for (FakeOperation replayOp : replayOperations)
- {
- ChangeNumber cn = replayOp.getChangeNumber();
- /*
- * Because the entry returned by the search operation
- * can contain old historical information, it is
- * possible that some of the FakeOperation are
- * actually older than the
- * Only send the Operation if it was newer than
- * the last ChangeNumber known by the Replication Server.
- */
- if (cn.newer(replServerMaxChangeNumber))
- {
- message =
- DEBUG_SENDING_CHANGE.get(
- replayOp.getChangeNumber().toString());
- logError(message);
- session.publish(replayOp.generateMessage());
- }
- }
- message = DEBUG_CHANGES_SENT.get();
- logError(message);
- }
- replayOperations.clear();
- }
-
replicationServer = tmpReadableServerName;
maxSendWindow = replServerStartMsg.getWindowSize();
rsGroupId = replServerStartMsg.getGroupId();
@@ -497,11 +428,13 @@
// May have created a broker with null replication domain for
// unit test purpose.
- if (replicationDomain != null)
+ if (domain != null)
{
- replicationDomain.setInitialStatus(initStatus);
- replicationDomain.receiveTopo(topologyMsg);
+ domain.sessionInitiated(
+ initStatus, replServerStartMsg.getServerState(),
+ session);
}
+ receiveTopo(topologyMsg);
connected = true;
if (getRsGroupId() != groupId)
{
@@ -528,16 +461,10 @@
// Do not log connection error
newServerWithSameGroupId = true;
}
- } catch (IOException e)
- {
- Message message = ERR_PUBLISHING_FAKE_OPS.get(
- baseDn.toNormalizedString(), bestServer,
- e.getLocalizedMessage() + stackTraceToSingleLineString(e));
- logError(message);
} catch (Exception e)
{
Message message = ERR_COMPUTING_FAKE_OPS.get(
- baseDn.toNormalizedString(), bestServer,
+ baseDn, bestServer,
e.getLocalizedMessage() + stackTraceToSingleLineString(e));
logError(message);
} finally
@@ -577,7 +504,7 @@
this.sendWindow = new Semaphore(maxSendWindow);
connectPhaseLock.notify();
- if ((replServerStartMsg.getGenerationId() == this.generationId) ||
+ if ((replServerStartMsg.getGenerationId() == this.getGenerationID()) ||
(replServerStartMsg.getGenerationId() == -1))
{
Message message =
@@ -586,7 +513,7 @@
Short.toString(rsServerId),
replicationServer,
Short.toString(serverId),
- Long.toString(this.generationId));
+ Long.toString(this.getGenerationID()));
logError(message);
} else
{
@@ -594,7 +521,7 @@
NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
baseDn.toString(),
replicationServer,
- Long.toString(this.generationId),
+ Long.toString(this.getGenerationID()),
Long.toString(replServerStartMsg.getGenerationId()));
logError(message);
}
@@ -664,7 +591,7 @@
if (debugEnabled())
{
- TRACER.debugInfo("RB for dn " + baseDn.toNormalizedString() +
+ TRACER.debugInfo("RB for dn " + baseDn +
" and with server id " + Short.toString(serverId) + " computed " +
Integer.toString(nChanges) + " changes late.");
}
@@ -744,10 +671,11 @@
/*
* Send our ServerStartMsg.
*/
- ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn,
- maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
- halfRcvWindow * 2, heartbeatInterval, state,
- ProtocolVersion.getCurrentVersion(), generationId, isSslEncryption,
+ ServerStartMsg serverStartMsg = new ServerStartMsg(serverId,
+ baseDn, 0, 0, 0, 0,
+ maxRcvWindow, heartbeatInterval, state,
+ ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
+ isSslEncryption,
groupId);
localSession.publish(serverStartMsg);
@@ -764,11 +692,11 @@
}
// Sanity check
- DN repDn = replServerStartMsg.getBaseDn();
+ String repDn = replServerStartMsg.getBaseDn();
if (!(this.baseDn.equals(repDn)))
{
Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
- this.baseDn.toString());
+ this.baseDn);
logError(message);
error = true;
}
@@ -811,10 +739,10 @@
if ( (e instanceof SocketTimeoutException) && debugEnabled() )
{
TRACER.debugInfo("Timeout trying to connect to RS " + server +
- " for dn: " + baseDn.toNormalizedString());
+ " for dn: " + baseDn);
}
Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
- baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
+ baseDn, server, e.getLocalizedMessage() +
stackTraceToSingleLineString(e));
if (keepConnection) // Log error message only for final connection
{
@@ -880,13 +808,15 @@
StartSessionMsg startSessionMsg = null;
// May have created a broker with null replication domain for
// unit test purpose.
- if (replicationDomain != null)
+ if (domain != null)
{
- startSessionMsg = new StartSessionMsg(initStatus,
- replicationDomain.getRefUrls(),
- replicationDomain.isAssured(),
- replicationDomain.getAssuredMode(),
- replicationDomain.getAssuredSdLevel());
+ startSessionMsg =
+ new StartSessionMsg(
+ initStatus,
+ domain.getRefUrls(),
+ domain.isAssured(),
+ domain.getAssuredMode(),
+ domain.getAssuredSdLevel());
} else
{
startSessionMsg =
@@ -912,7 +842,7 @@
} catch (Exception e)
{
Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
- baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
+ baseDn, server, e.getLocalizedMessage() +
stackTraceToSingleLineString(e));
logError(message);
@@ -950,7 +880,7 @@
* @return The computed best replication server.
*/
public static String computeBestReplicationServer(ServerState myState,
- HashMap<String, ServerInfo> rsInfos, short serverId, DN baseDn,
+ HashMap<String, ServerInfo> rsInfos, short serverId, String baseDn,
byte groupId)
{
/*
@@ -997,7 +927,7 @@
* @return The computed best replication server.
*/
private static String searchForBestReplicationServer(ServerState myState,
- HashMap<String, ServerInfo> rsInfos, short serverId, DN baseDn)
+ HashMap<String, ServerInfo> rsInfos, short serverId, String baseDn)
{
/*
* Find replication servers who are up to date (or more up to date than us,
@@ -1072,9 +1002,7 @@
*/
Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
- upToDateServers.size(),
- baseDn.toNormalizedString(),
- Short.toString(serverId));
+ upToDateServers.size(), baseDn, Short.toString(serverId));
logError(message);
/*
@@ -1154,7 +1082,7 @@
*/
// lateOnes cannot be empty
Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
- baseDn.toNormalizedString(), lateOnes.size());
+ baseDn, lateOnes.size());
logError(message);
// Min of the shifts
@@ -1190,48 +1118,6 @@
}
/**
- * Search for the changes that happened since fromChangeNumber
- * based on the historical attribute. The only changes that will
- * be send will be the one generated on the serverId provided in
- * fromChangeNumber.
- * @param baseDn the base DN
- * @param fromChangeNumber The change number from which we want the changes
- * @param resultListener that will process the entries returned.
- * @return the internal search operation
- * @throws Exception when raised.
- */
- public static InternalSearchOperation searchForChangedEntries(
- DN baseDn,
- ChangeNumber fromChangeNumber,
- InternalSearchListener resultListener)
- throws Exception
- {
- InternalClientConnection conn =
- InternalClientConnection.getRootConnection();
- Short serverId = fromChangeNumber.getServerId();
-
- String maxValueForId = "ffffffffffffffff" +
- String.format("%04x", serverId) + "ffffffff";
-
- LDAPFilter filter = LDAPFilter.decode(
- "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
- + fromChangeNumber + ")(" + Historical.HISTORICALATTRIBUTENAME +
- "<=dummy:" + maxValueForId + "))");
-
- LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
- attrs.add(Historical.HISTORICALATTRIBUTENAME);
- attrs.add(Historical.ENTRYUIDNAME);
- attrs.add("*");
- return conn.processSearch(
- new ASN1OctetString(baseDn.toString()),
- SearchScope.WHOLE_SUBTREE,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- 0, 0, false, filter,
- attrs,
- resultListener);
- }
-
- /**
* Start the heartbeat monitor thread.
*/
private void startHeartBeat()
@@ -1325,7 +1211,7 @@
{
MessageBuilder mb = new MessageBuilder();
mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
- baseDn.toNormalizedString(), e.getLocalizedMessage()));
+ baseDn, e.getLocalizedMessage()));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
}
@@ -1413,7 +1299,7 @@
}
}
}
- if (!credit)
+ if ((!credit) && (currentWindowSemaphore.availablePermits() == 0))
{
// the window is still closed.
// Send a WindowProbeMsg message to wakeup the receiver in case the
@@ -1436,7 +1322,7 @@
if (debugEnabled())
{
debugInfo("ReplicationBroker.publish() " +
- "IO exception raised : " + e.getLocalizedMessage());
+ "Interrupted exception raised : " + e.getLocalizedMessage());
}
}
}
@@ -1479,7 +1365,13 @@
{
WindowMsg windowMsg = (WindowMsg) msg;
sendWindow.release(windowMsg.getNumAck());
- } else
+ }
+ else if (msg instanceof TopologyMsg)
+ {
+ TopologyMsg topoMsg = (TopologyMsg)msg;
+ receiveTopo(topoMsg);
+ }
+ else
{
return msg;
}
@@ -1580,20 +1472,6 @@
}
/**
- * Set the value of the generationId for that broker. Normally the
- * generationId is set through the constructor but there are cases
- * where the value of the generationId must be changed while the broker
- * already exist for example after an on-line import.
- *
- * @param generationId The value of the generationId.
- *
- */
- public void setGenerationId(long generationId)
- {
- this.generationId = generationId;
- }
-
- /**
* Get the name of the replicationServer to which this broker is currently
* connected.
*
@@ -1606,39 +1484,6 @@
}
/**
- * {@inheritDoc}
- */
- public void handleInternalSearchEntry(
- InternalSearchOperation searchOperation,
- SearchResultEntry searchEntry)
- {
- /*
- * This call back is called at session establishment phase
- * for each entry that has been changed by this server and the changes
- * have not been sent to any Replication Server.
- * The role of this method is to build equivalent operation from
- * the historical information and add them in the replayOperations
- * table.
- */
- Iterable<FakeOperation> updates =
- Historical.generateFakeOperations(searchEntry);
- for (FakeOperation op : updates)
- {
- replayOperations.add(op);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public void handleInternalSearchReference(
- InternalSearchOperation searchOperation,
- SearchResultReference searchReference)
- {
- // TODO to be implemented
- }
-
- /**
* Get the maximum receive window size.
*
* @return The maximum receive window size.
@@ -1694,31 +1539,41 @@
}
/**
- * Change some config parameters.
+ * Change some configuration parameters.
*
- * @param replicationServers The new list of replication servers.
- * @param maxReceiveQueue The max size of receive queue.
- * @param maxReceiveDelay The max receive delay.
- * @param maxSendQueue The max send queue.
- * @param maxSendDelay The max Send Delay.
+ * @param replicationServers The new list of replication servers.
* @param window The max window size.
- * @param heartbeatInterval The heartbeat interval.
+ * @param heartbeatInterval The heartBeat interval.
+ *
+ * @return A boolean indicating if the changes
+ * requires to restart the service.
*/
- public void changeConfig(Collection<String> replicationServers,
- int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
- int maxSendDelay, int window, long heartbeatInterval)
+ public boolean changeConfig(
+ Collection<String> replicationServers, int window, long heartbeatInterval)
{
- this.servers = replicationServers;
- this.maxRcvWindow = window;
- this.heartbeatInterval = heartbeatInterval;
- this.maxReceiveDelay = maxReceiveDelay;
- this.maxReceiveQueue = maxReceiveQueue;
- this.maxSendDelay = maxSendDelay;
- this.maxSendQueue = maxSendQueue;
+ // These parameters needs to be renegociated with the ReplicationServer
+ // so if they have changed, that requires restarting the session with
+ // the ReplicationServer.
+ Boolean needToRestartSession = false;
- // For info, a new session with the replicationServer
- // will be recreated in the replication domain
- // to take into account the new configuration.
+ // A new session is necessary only when information regarding
+ // the connection is modified
+ if ((servers == null) ||
+ (!(replicationServers.size() == servers.size()
+ && replicationServers.containsAll(servers))) ||
+ window != this.maxRcvWindow ||
+ heartbeatInterval != this.heartbeatInterval)
+ {
+ needToRestartSession = true;
+ }
+
+ this.servers = replicationServers;
+ this.rcvWindow = window;
+ this.maxRcvWindow = window;
+ this.halfRcvWindow = window / 2;
+ this.heartbeatInterval = heartbeatInterval;
+
+ return needToRestartSession;
}
/**
@@ -1900,14 +1755,13 @@
{
try
{
-
ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS,
newStatus);
session.publish(csMsg);
} catch (IOException ex)
{
Message message = ERR_EXCEPTION_SENDING_CS.get(
- baseDn.toNormalizedString(),
+ baseDn,
Short.toString(serverId),
ex.getLocalizedMessage() + stackTraceToSingleLineString(ex));
logError(message);
@@ -1922,4 +1776,48 @@
{
this.groupId = groupId;
}
+
+ /**
+ * Gets the info for DSs in the topology (except us).
+ * @return The info for DSs in the topology (except us)
+ */
+ public List<DSInfo> getDsList()
+ {
+ return dsList;
+ }
+
+ /**
+ * Gets the info for RSs in the topology (except the one we are connected
+ * to).
+ * @return The info for RSs in the topology (except the one we are connected
+ * to)
+ */
+ public List<RSInfo> getRsList()
+ {
+ return rsList;
+ }
+
+ /**
+ * Processes an incoming TopologyMsg.
+ * Updates the structures for the local view of the topology.
+ *
+ * @param topoMsg The topology information received from RS.
+ */
+ public void receiveTopo(TopologyMsg topoMsg)
+ {
+
+ if (debugEnabled())
+ TRACER.debugInfo("Replication domain " + baseDn
+ + " received topology info update:\n" + topoMsg);
+
+ // Store new lists
+ synchronized(getDsList())
+ {
+ synchronized(getRsList())
+ {
+ dsList = topoMsg.getDsList();
+ rsList = topoMsg.getRsList();
+ }
+ }
+ }
}
--
Gitblit v1.10.0