From 10b76e3a58346e0b6e2e07e58617b75dc383c249 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 21 Aug 2013 08:33:59 +0000
Subject: [PATCH] Increased encapsulation in ReplicationServerDomain by: - Making private the start*() methods - Moved some code to here from some other classes - MAking private the buildAndSendTopoInfoTo*() methods.
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 465 ++++++++++++++++++++++++++++++++--------------------------
1 files changed, 257 insertions(+), 208 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 5b8859a..3a10c23 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -367,13 +367,13 @@
// Push the message to the replication servers
if (sourceHandler.isDataServer())
{
- for (ReplicationServerHandler handler : connectedRSs.values())
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
{
/**
* Ignore updates to RS with bad gen id
* (no system managed status for a RS)
*/
- if (isDifferentGenerationId(handler.getGenerationId()))
+ if (isDifferentGenerationId(rsHandler.getGenerationId()))
{
if (debugEnabled())
{
@@ -383,24 +383,24 @@
+ localReplicationServer.getServerId() + " for dn " + baseDn
+ ", update " + update.getChangeNumber()
+ " will not be sent to replication server "
- + handler.getServerId() + " with generation id "
- + handler.getGenerationId() + " different from local "
+ + rsHandler.getServerId() + " with generation id "
+ + rsHandler.getGenerationId() + " different from local "
+ "generation id " + generationId);
}
continue;
}
- notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate,
+ notAssuredUpdate = addUpdate(rsHandler, update, notAssuredUpdate,
assuredMessage, expectedServers);
}
}
// Push the message to the LDAP servers
- for (DataServerHandler handler : connectedDSs.values())
+ for (DataServerHandler dsHandler : connectedDSs.values())
{
// Don't forward the change to the server that just sent it
- if (handler == sourceHandler)
+ if (dsHandler == sourceHandler)
{
continue;
}
@@ -416,7 +416,7 @@
* stop sending updates is interesting anyway. Not taking the RSD lock
* allows to have better performances in normal mode (most of the time).
*/
- ServerStatus dsStatus = handler.getStatus();
+ ServerStatus dsStatus = dsHandler.getStatus();
if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS
|| dsStatus == ServerStatus.FULL_UPDATE_STATUS)
{
@@ -427,8 +427,8 @@
TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update "
+ update.getChangeNumber()
+ " will not be sent to directory server "
- + handler.getServerId() + " with generation id "
- + handler.getGenerationId() + " different from local "
+ + dsHandler.getServerId() + " with generation id "
+ + dsHandler.getGenerationId() + " different from local "
+ "generation id " + generationId);
}
if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
@@ -436,20 +436,20 @@
TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
+ " for dn " + baseDn + ", update " + update.getChangeNumber()
+ " will not be sent to directory server "
- + handler.getServerId() + " as it is in full update");
+ + dsHandler.getServerId() + " as it is in full update");
}
}
continue;
}
- notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate,
+ notAssuredUpdate = addUpdate(dsHandler, update, notAssuredUpdate,
assuredMessage, expectedServers);
}
// Push the message to the other subscribing handlers
- for (MessageHandler handler : otherHandlers) {
- handler.add(update);
+ for (MessageHandler mHandler : otherHandlers) {
+ mHandler.add(update);
}
}
@@ -491,7 +491,7 @@
return true;
}
- private NotAssuredUpdateMsg addUpdate(ServerHandler handler,
+ private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate,
boolean assuredMessage, List<Integer> expectedServers)
throws UnsupportedEncodingException
@@ -500,9 +500,9 @@
{
// Assured mode: post an assured or not assured matching update
// message according to what has been computed for the destination server
- if (expectedServers.contains(handler.getServerId()))
+ if (expectedServers.contains(sHandler.getServerId()))
{
- handler.add(update);
+ sHandler.add(update);
}
else
{
@@ -510,12 +510,12 @@
{
notAssuredUpdate = new NotAssuredUpdateMsg(update);
}
- handler.add(notAssuredUpdate);
+ sHandler.add(notAssuredUpdate);
}
}
else
{
- handler.add(update);
+ sHandler.add(update);
}
return notAssuredUpdate;
}
@@ -578,23 +578,23 @@
}
// Look for DS eligible for assured
- for (DataServerHandler handler : connectedDSs.values())
+ for (DataServerHandler dsHandler : connectedDSs.values())
{
// Don't forward the change to the server that just sent it
- if (handler == sourceHandler)
+ if (dsHandler == sourceHandler)
{
continue;
}
- if (handler.getGroupId() == groupId)
+ if (dsHandler.getGroupId() == groupId)
// No ack expected from a DS with different group id
{
- ServerStatus serverStatus = handler.getStatus();
+ ServerStatus serverStatus = dsHandler.getStatus();
if (serverStatus == ServerStatus.NORMAL_STATUS)
{
- expectedServers.add(handler.getServerId());
+ expectedServers.add(dsHandler.getServerId());
} else if (serverStatus == ServerStatus.DEGRADED_STATUS) {
// No ack expected from a DS with wrong status
- wrongStatusServers.add(handler.getServerId());
+ wrongStatusServers.add(dsHandler.getServerId());
}
/*
* else
@@ -737,15 +737,15 @@
private void collectRSsEligibleForAssuredReplication(byte groupId,
List<Integer> expectedServers)
{
- for (ReplicationServerHandler handler : connectedRSs.values())
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
{
- if (handler.getGroupId() == groupId
+ if (rsHandler.getGroupId() == groupId
// No ack expected from a RS with different group id
- && isSameGenerationId(handler.getGenerationId())
+ && isSameGenerationId(rsHandler.getGenerationId())
// No ack expected from a RS with bad gen id
)
{
- expectedServers.add(handler.getServerId());
+ expectedServers.add(rsHandler.getServerId());
}
}
}
@@ -951,11 +951,11 @@
*/
public void stopReplicationServers(Collection<String> replServerURLs)
{
- for (ReplicationServerHandler handler : connectedRSs.values())
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
{
- if (replServerURLs.contains(handler.getServerAddressURL()))
+ if (replServerURLs.contains(rsHandler.getServerAddressURL()))
{
- stopServer(handler, false);
+ stopServer(rsHandler, false);
}
}
}
@@ -968,35 +968,33 @@
*/
public void stopAllServers(boolean shutdown)
{
- // Close session with other replication servers
- for (ReplicationServerHandler serverHandler : connectedRSs.values())
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
{
- stopServer(serverHandler, shutdown);
+ stopServer(rsHandler, shutdown);
}
- // Close session with other LDAP servers
- for (DataServerHandler serverHandler : connectedDSs.values())
+ for (DataServerHandler dsHandler : connectedDSs.values())
{
- stopServer(serverHandler, shutdown);
+ stopServer(dsHandler, shutdown);
}
}
/**
* Checks whether it is already connected to a DS with same id.
*
- * @param handler
+ * @param dsHandler
* the DS we want to check
* @return true if this DS is already connected to the current server
*/
- public boolean isAlreadyConnectedToDS(DataServerHandler handler)
+ public boolean isAlreadyConnectedToDS(DataServerHandler dsHandler)
{
- if (connectedDSs.containsKey(handler.getServerId()))
+ if (connectedDSs.containsKey(dsHandler.getServerId()))
{
// looks like two connected LDAP servers have the same serverId
Message message = ERR_DUPLICATE_SERVER_ID.get(
localReplicationServer.getMonitorInstanceName(),
- connectedDSs.get(handler.getServerId()).toString(),
- handler.toString(), handler.getServerId());
+ connectedDSs.get(dsHandler.getServerId()).toString(),
+ dsHandler.toString(), dsHandler.getServerId());
logError(message);
return true;
}
@@ -1006,11 +1004,11 @@
/**
* Stop operations with a given server.
*
- * @param handler the server for which we want to stop operations.
+ * @param sHandler the server for which we want to stop operations.
* @param shutdown A boolean indicating if the stop is due to a
* shutdown condition.
*/
- public void stopServer(ServerHandler handler, boolean shutdown)
+ public void stopServer(ServerHandler sHandler, boolean shutdown)
{
// TODO JNR merge with stopServer(MessageHandler)
if (debugEnabled())
@@ -1018,7 +1016,7 @@
TRACER.debugInfo("In "
+ this.localReplicationServer.getMonitorInstanceName()
+ " domain=" + this + " stopServer() on the server handler "
- + handler.getMonitorInstanceName());
+ + sHandler.getMonitorInstanceName());
}
/*
* We must prevent deadlock on replication server domain lock, when for
@@ -1027,7 +1025,7 @@
* the handler. So use a thread safe flag to know if the job must be done
* or not (is already being processed or not).
*/
- if (!handler.engageShutdown())
+ if (!sHandler.engageShutdown())
// Only do this once (prevent other thread to enter here again)
{
if (!shutdown)
@@ -1056,17 +1054,17 @@
{
TRACER.debugInfo("In "
+ localReplicationServer.getMonitorInstanceName()
- + " remote server " + handler.getMonitorInstanceName()
+ + " remote server " + sHandler.getMonitorInstanceName()
+ " is the last RS/DS to be stopped:"
+ " stopping monitoring publisher");
}
stopMonitoringPublisher();
}
- if (connectedRSs.containsKey(handler.getServerId()))
+ if (connectedRSs.containsKey(sHandler.getServerId()))
{
- unregisterServerHandler(handler, shutdown, false);
- } else if (connectedDSs.containsKey(handler.getServerId()))
+ unregisterServerHandler(sHandler, shutdown, false);
+ } else if (connectedDSs.containsKey(sHandler.getServerId()))
{
// If this is the last DS for the domain,
// shutdown the status analyzer
@@ -1076,15 +1074,15 @@
{
TRACER.debugInfo("In "
+ localReplicationServer.getMonitorInstanceName()
- + " remote server " + handler.getMonitorInstanceName()
+ + " remote server " + sHandler.getMonitorInstanceName()
+ " is the last DS to be stopped: stopping status analyzer");
}
stopStatusAnalyzer();
}
- unregisterServerHandler(handler, shutdown, true);
- } else if (otherHandlers.contains(handler))
+ unregisterServerHandler(sHandler, shutdown, true);
+ } else if (otherHandlers.contains(sHandler))
{
- unregisterOtherHandler(handler);
+ unregisterOtherHandler(sHandler);
}
}
catch(Exception e)
@@ -1102,17 +1100,17 @@
}
}
- private void unregisterOtherHandler(MessageHandler handler)
+ private void unregisterOtherHandler(MessageHandler mHandler)
{
- unRegisterHandler(handler);
- handler.shutdown();
+ unRegisterHandler(mHandler);
+ mHandler.shutdown();
}
- private void unregisterServerHandler(ServerHandler handler, boolean shutdown,
+ private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
boolean isDirectoryServer)
{
- unregisterServerHandler(handler);
- handler.shutdown();
+ unregisterServerHandler(sHandler);
+ sHandler.shutdown();
// Check if generation id has to be reset
mayResetGenerationId();
@@ -1122,19 +1120,19 @@
{
// Update the remote replication servers with our list
// of connected LDAP servers
- buildAndSendTopoInfoToRSs();
+ sendTopoInfoToAllRSs();
}
// Warn our DSs that a RS or DS has quit (does not use this
// handler as already removed from list)
- buildAndSendTopoInfoToDSs(null);
+ sendTopoInfoToAllDSsExcept(null);
}
}
/**
* Stop the handler.
- * @param handler The handler to stop.
+ * @param mHandler The handler to stop.
*/
- public void stopServer(MessageHandler handler)
+ public void stopServer(MessageHandler mHandler)
{
// TODO JNR merge with stopServer(ServerHandler, boolean)
if (debugEnabled())
@@ -1142,7 +1140,7 @@
TRACER.debugInfo("In "
+ this.localReplicationServer.getMonitorInstanceName()
+ " domain=" + this + " stopServer() on the message handler "
- + handler.getMonitorInstanceName());
+ + mHandler.getMonitorInstanceName());
}
/*
* We must prevent deadlock on replication server domain lock, when for
@@ -1151,7 +1149,7 @@
* the handler. So use a thread safe flag to know if the job must be done
* or not (is already being processed or not).
*/
- if (!handler.engageShutdown())
+ if (!mHandler.engageShutdown())
// Only do this once (prevent other thread to enter here again)
{
try
@@ -1170,9 +1168,9 @@
try
{
- if (otherHandlers.contains(handler))
+ if (otherHandlers.contains(mHandler))
{
- unregisterOtherHandler(handler);
+ unregisterOtherHandler(mHandler);
}
}
catch(Exception e)
@@ -1190,17 +1188,17 @@
/**
* Unregister this handler from the list of handlers registered to this
* domain.
- * @param handler the provided handler to unregister.
+ * @param sHandler the provided handler to unregister.
*/
- private void unregisterServerHandler(ServerHandler handler)
+ private void unregisterServerHandler(ServerHandler sHandler)
{
- if (handler.isReplicationServer())
+ if (sHandler.isReplicationServer())
{
- connectedRSs.remove(handler.getServerId());
+ connectedRSs.remove(sHandler.getServerId());
}
else
{
- connectedDSs.remove(handler.getServerId());
+ connectedDSs.remove(sHandler.getServerId());
}
}
@@ -1215,61 +1213,61 @@
*/
private void mayResetGenerationId()
{
+ String prefix =
+ "In RS " + this.localReplicationServer.getMonitorInstanceName()
+ + " for " + baseDn + " ";
+
if (debugEnabled())
{
- TRACER.debugInfo("In RS "
- + this.localReplicationServer.getMonitorInstanceName()
- + " for " + baseDn + " mayResetGenerationId generationIdSavedStatus="
+ TRACER.debugInfo(prefix + "mayResetGenerationId generationIdSavedStatus="
+ generationIdSavedStatus);
}
// If there is no more any LDAP server connected to this domain in the
// topology and the generationId has never been saved, then we can reset
// it and the next LDAP server to connect will become the new reference.
- boolean lDAPServersConnectedInTheTopology = false;
+ boolean ldapServersConnectedInTheTopology = false;
if (connectedDSs.isEmpty())
{
- for (ReplicationServerHandler rsh : connectedRSs.values())
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
{
- if (generationId != rsh.getGenerationId())
+ if (generationId != rsHandler.getGenerationId())
{
if (debugEnabled())
{
- TRACER.debugInfo("In RS "
- + this.localReplicationServer.getMonitorInstanceName() + " for "
- + baseDn + " " + " mayResetGenerationId skip RS"
- + rsh.getMonitorInstanceName() + " that has different genId");
+ TRACER.debugInfo(prefix + "mayResetGenerationId skip RS "
+ + rsHandler.getMonitorInstanceName()
+ + " that has different genId");
}
- } else if (rsh.hasRemoteLDAPServers())
+ }
+ else if (rsHandler.hasRemoteLDAPServers())
{
- lDAPServersConnectedInTheTopology = true;
+ ldapServersConnectedInTheTopology = true;
- if (debugEnabled())
- {
- TRACER.debugInfo("In RS "
- + this.localReplicationServer.getMonitorInstanceName()
- + " for "+ baseDn + " mayResetGenerationId RS"
- + rsh.getMonitorInstanceName()
- + " has servers connected to it"
- + " - will not reset generationId");
- }
- break;
+ if (debugEnabled())
+ {
+ TRACER.debugInfo(prefix + "mayResetGenerationId RS "
+ + rsHandler.getMonitorInstanceName()
+ + " has ldap servers connected to it"
+ + " - will not reset generationId");
+ }
+ break;
}
}
- } else
+ }
+ else
{
- lDAPServersConnectedInTheTopology = true;
+ ldapServersConnectedInTheTopology = true;
if (debugEnabled())
{
- TRACER.debugInfo("In RS "
- + this.localReplicationServer.getMonitorInstanceName() + " for "
- + baseDn + " "
- + " has servers connected to it - will not reset generationId");
+ TRACER.debugInfo(prefix + "has ldap servers connected to it"
+ + " - will not reset generationId");
}
}
- if (!lDAPServersConnectedInTheTopology && !this.generationIdSavedStatus
+ if (!ldapServersConnectedInTheTopology
+ && !this.generationIdSavedStatus
&& generationId != -1)
{
changeGenerationId(-1, false);
@@ -1279,23 +1277,24 @@
/**
* Checks whether a remote RS is already connected to this hosting RS.
*
- * @param handler
+ * @param rsHandler
* The handler for the remote RS.
* @return flag specifying whether the remote RS is already connected.
* @throws DirectoryException
* when a problem occurs.
*/
- public boolean isAlreadyConnectedToRS(ReplicationServerHandler handler)
+ public boolean isAlreadyConnectedToRS(ReplicationServerHandler rsHandler)
throws DirectoryException
{
- ReplicationServerHandler oldHandler =
- connectedRSs.get(handler.getServerId());
- if (oldHandler == null)
+ ReplicationServerHandler oldRsHandler =
+ connectedRSs.get(rsHandler.getServerId());
+ if (oldRsHandler == null)
{
return false;
}
- if (oldHandler.getServerAddressURL().equals(handler.getServerAddressURL()))
+ if (oldRsHandler.getServerAddressURL().equals(
+ rsHandler.getServerAddressURL()))
{
// this is the same server, this means that our ServerStart messages
// have been sent at about the same time and 2 connections
@@ -1308,8 +1307,8 @@
// log an error message and drop this connection.
Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
localReplicationServer.getMonitorInstanceName(),
- oldHandler.getServerAddressURL(), handler.getServerAddressURL(),
- handler.getServerId());
+ oldRsHandler.getServerAddressURL(), rsHandler.getServerAddressURL(),
+ rsHandler.getServerId());
throw new DirectoryException(ResultCode.OTHER, message);
}
@@ -1318,11 +1317,11 @@
* This call is blocking when no update is available or when dependencies
* do not allow to send the next available change
*
- * @param handler The server handler for the target directory server.
+ * @param sHandler The server handler for the target directory server.
*
* @return the update that must be forwarded
*/
- public UpdateMsg take(ServerHandler handler)
+ public UpdateMsg take(ServerHandler sHandler)
{
/*
* Get the balanced tree that we use to sort the changes to be
@@ -1332,7 +1331,7 @@
* So this methods simply need to check that dependencies are OK
* and update this replicaId RUV
*/
- return handler.take();
+ return sHandler.take();
}
/**
@@ -1360,8 +1359,8 @@
public ReplicationIterator getChangelogIterator(int serverId,
ChangeNumber startAfterCN)
{
- DbHandler handler = sourceDbHandlers.get(serverId);
- if (handler == null)
+ DbHandler dbHandler = sourceDbHandlers.get(serverId);
+ if (dbHandler == null)
{
return null;
}
@@ -1369,7 +1368,7 @@
ReplicationIterator it;
try
{
- it = handler.generateIterator(startAfterCN);
+ it = dbHandler.generateIterator(startAfterCN);
}
catch (Exception e)
{
@@ -1395,10 +1394,10 @@
*/
public long getCount(int serverId, ChangeNumber from, ChangeNumber to)
{
- DbHandler handler = sourceDbHandlers.get(serverId);
- if (handler != null)
+ DbHandler dbHandler = sourceDbHandlers.get(serverId);
+ if (dbHandler != null)
{
- return handler.getCount(from, to);
+ return dbHandler.getCount(from, to);
}
return 0;
}
@@ -1781,17 +1780,17 @@
// from the states stored in the serverHandler.
// - the server state
// - the older missing change
- for (DataServerHandler lsh : this.connectedDSs.values())
+ for (DataServerHandler dsHandler : this.connectedDSs.values())
{
- monitorMsg.setServerState(lsh.getServerId(),
- lsh.getServerState(), lsh.getApproxFirstMissingDate(), true);
+ monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
+ .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
}
// Same for the connected RS
- for (ReplicationServerHandler rsh : this.connectedRSs.values())
+ for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
{
- monitorMsg.setServerState(rsh.getServerId(),
- rsh.getServerState(), rsh.getApproxFirstMissingDate(), false);
+ monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
+ .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
}
// Populate the RS state in the msg from the DbState
@@ -1857,27 +1856,29 @@
}
/**
- * Send a TopologyMsg to all the connected directory servers in order to
- * let them know the topology (every known DSs and RSs).
- * @param notThisOne If not null, the topology message will not be sent to
- * this passed server.
+ * Send a TopologyMsg to all the connected directory servers in order to let
+ * them know the topology (every known DSs and RSs).
+ *
+ * @param notThisOne
+ * If not null, the topology message will not be sent to this DS.
*/
- public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne)
+ private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne)
{
- for (DataServerHandler handler : connectedDSs.values())
+ for (DataServerHandler dsHandler : connectedDSs.values())
{
- if (notThisOne == null || handler != notThisOne)
- // All except passed one
+ if (dsHandler != notThisOne)
+ // All except the supplied one
{
for (int i=1; i<=2; i++)
{
- if (!handler.shuttingDown()
- && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
+ if (!dsHandler.shuttingDown()
+ && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
{
- TopologyMsg topoMsg = createTopologyMsgForDS(handler.getServerId());
+ TopologyMsg topoMsg =
+ createTopologyMsgForDS(dsHandler.getServerId());
try
{
- handler.sendTopoInfo(topoMsg);
+ dsHandler.sendTopoInfo(topoMsg);
break;
}
catch (IOException e)
@@ -1886,7 +1887,7 @@
{
Message message =
ERR_EXCEPTION_SENDING_TOPO_INFO
- .get(baseDn, "directory", Integer.toString(handler
+ .get(baseDn, "directory", Integer.toString(dsHandler
.getServerId()), e.getMessage());
logError(message);
}
@@ -1902,28 +1903,28 @@
* Send a TopologyMsg to all the connected replication servers
* in order to let them know our connected LDAP servers.
*/
- public void buildAndSendTopoInfoToRSs()
+ private void sendTopoInfoToAllRSs()
{
TopologyMsg topoMsg = createTopologyMsgForRS();
- for (ReplicationServerHandler handler : connectedRSs.values())
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
{
for (int i=1; i<=2; i++)
{
- if (!handler.shuttingDown()
- && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
+ if (!rsHandler.shuttingDown()
+ && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
{
try
{
- handler.sendTopoInfo(topoMsg);
+ rsHandler.sendTopoInfo(topoMsg);
break;
}
catch (IOException e)
{
if (i == 2)
{
- Message message =
- ERR_EXCEPTION_SENDING_TOPO_INFO.get(baseDn, "replication",
- Integer.toString(handler.getServerId()), e.getMessage());
+ Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
+ baseDn, "replication",
+ Integer.toString(rsHandler.getServerId()), e.getMessage());
logError(message);
}
}
@@ -1947,16 +1948,15 @@
public TopologyMsg createTopologyMsgForRS()
{
List<DSInfo> dsInfos = new ArrayList<DSInfo>();
-
- // Go through every DSs
- for (DataServerHandler serverHandler : connectedDSs.values())
+ for (DataServerHandler dsHandler : connectedDSs.values())
{
- dsInfos.add(serverHandler.toDSInfo());
+ dsInfos.add(dsHandler.toDSInfo());
}
// Create info for the local RS
List<RSInfo> rsInfos = new ArrayList<RSInfo>();
rsInfos.add(toRSInfo(localReplicationServer, generationId));
+
return new TopologyMsg(dsInfos, rsInfos);
}
@@ -1974,13 +1974,13 @@
{
// Go through every DSs (except recipient of msg)
List<DSInfo> dsInfos = new ArrayList<DSInfo>();
- for (DataServerHandler serverHandler : connectedDSs.values())
+ for (DataServerHandler dsHandler : connectedDSs.values())
{
- if (serverHandler.getServerId() == destDsId)
+ if (dsHandler.getServerId() == destDsId)
{
continue;
}
- dsInfos.add(serverHandler.toDSInfo());
+ dsInfos.add(dsHandler.toDSInfo());
}
@@ -1990,11 +1990,11 @@
// Go through every peer RSs (and get their connected DSs), also add info
// for RSs
- for (ReplicationServerHandler serverHandler : connectedRSs.values())
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
{
- rsInfos.add(serverHandler.toRSInfo());
+ rsInfos.add(rsHandler.toRSInfo());
- serverHandler.addDSInfos(dsInfos);
+ rsHandler.addDSInfos(dsInfos);
}
return new TopologyMsg(dsInfos, rsInfos);
@@ -2161,11 +2161,9 @@
// (consecutive to reset gen id message), we prefer advertising once for
// all after changes (less packet sent), here at the end of the reset msg
// treatment.
- buildAndSendTopoInfoToDSs(null);
- buildAndSendTopoInfoToRSs();
+ sendTopoInfoToAll();
- Message message = NOTE_RESET_GENERATION_ID.get(baseDn, newGenId);
- logError(message);
+ logError(NOTE_RESET_GENERATION_ID.get(baseDn, newGenId));
}
catch(Exception e)
{
@@ -2219,9 +2217,7 @@
return;
}
- // Update every peers (RS/DS) with topology changes
- buildAndSendTopoInfoToDSs(senderHandler);
- buildAndSendTopoInfoToRSs();
+ sendTopoInfoToAllExcept(senderHandler);
Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
senderHandler.getServerId(), baseDn, newStatus.toString());
@@ -2241,12 +2237,12 @@
/**
* Change the status of a directory server according to the event generated
* from the status analyzer.
- * @param serverHandler The handler of the directory server to update
+ * @param dsHandler The handler of the directory server to update
* @param event The event to be used for new status computation
* @return True if we have been interrupted (must stop), false otherwise
*/
- public boolean changeStatusFromStatusAnalyzer(
- DataServerHandler serverHandler, StatusMachineEvent event)
+ public boolean changeStatus(DataServerHandler dsHandler,
+ StatusMachineEvent event)
{
try
{
@@ -2272,7 +2268,7 @@
TRACER.debugInfo("Status analyzer for domain " + baseDn
+ " has been interrupted when"
+ " trying to acquire domain lock for changing the status of DS "
- + serverHandler.getServerId());
+ + dsHandler.getServerId());
}
return true;
}
@@ -2280,16 +2276,16 @@
try
{
ServerStatus newStatus = ServerStatus.INVALID_STATUS;
- ServerStatus oldStatus = serverHandler.getStatus();
+ ServerStatus oldStatus = dsHandler.getStatus();
try
{
- newStatus = serverHandler.changeStatusFromStatusAnalyzer(event);
+ newStatus = dsHandler.changeStatus(event);
}
catch (IOException e)
{
logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER
.get(baseDn,
- Integer.toString(serverHandler.getServerId()),
+ Integer.toString(dsHandler.getServerId()),
e.getMessage()));
}
@@ -2300,9 +2296,7 @@
return false;
}
- // Update every peers (RS/DS) with topology changes
- buildAndSendTopoInfoToDSs(serverHandler);
- buildAndSendTopoInfoToRSs();
+ sendTopoInfoToAllExcept(dsHandler);
}
catch (Exception e)
{
@@ -2318,6 +2312,26 @@
}
/**
+ * Update every peers (RS/DS) with topology changes.
+ */
+ public void sendTopoInfoToAll()
+ {
+ sendTopoInfoToAllExcept(null);
+ }
+
+ /**
+ * Update every peers (RS/DS) with topology changes but one DS.
+ *
+ * @param dsHandler
+ * if not null, the topology message will not be sent to this DS
+ */
+ private void sendTopoInfoToAllExcept(DataServerHandler dsHandler)
+ {
+ sendTopoInfoToAllDSsExcept(dsHandler);
+ sendTopoInfoToAllRSs();
+ }
+
+ /**
* Clears the Db associated with that domain.
*/
public void clearDbs()
@@ -2372,11 +2386,11 @@
+ " given local generation Id=" + this.generationId);
}
- ServerHandler handler = connectedRSs.get(serverId);
- if (handler == null)
+ ServerHandler sHandler = connectedRSs.get(serverId);
+ if (sHandler == null)
{
- handler = connectedDSs.get(serverId);
- if (handler == null)
+ sHandler = connectedDSs.get(serverId);
+ if (sHandler == null)
{
return false;
}
@@ -2386,30 +2400,29 @@
{
TRACER.debugInfo("In "
+ this.localReplicationServer.getMonitorInstanceName()
- + " baseDN=" + baseDn + " Compute degradation of serverId="
- + serverId + " LS server generation Id=" + handler.getGenerationId());
+ + " baseDN=" + baseDn + " Compute degradation of serverId=" + serverId
+ + " LS server generation Id=" + sHandler.getGenerationId());
}
- return handler.getGenerationId() != this.generationId;
+ return sHandler.getGenerationId() != this.generationId;
}
/**
* Process topology information received from a peer RS.
* @param topoMsg The just received topo message from remote RS
- * @param handler The handler that received the message.
+ * @param rsHandler The handler that received the message.
* @param allowResetGenId True for allowing to reset the generation id (
* when called after initial handshake)
* @throws IOException If an error occurred.
* @throws DirectoryException If an error occurred.
*/
public void receiveTopoInfoFromRS(TopologyMsg topoMsg,
- ReplicationServerHandler handler,
- boolean allowResetGenId)
- throws IOException, DirectoryException
+ ReplicationServerHandler rsHandler, boolean allowResetGenId)
+ throws IOException, DirectoryException
{
if (debugEnabled())
{
TRACER.debugInfo("In RS " + getLocalRSServerId()
- + " Receiving TopologyMsg from " + handler.getServerId()
+ + " Receiving TopologyMsg from " + rsHandler.getServerId()
+ " for baseDn " + baseDn + ":\n" + topoMsg);
}
@@ -2430,37 +2443,38 @@
try
{
// Store DS connected to remote RS & update information about the peer RS
- handler.processTopoInfoFromRS(topoMsg);
+ rsHandler.processTopoInfoFromRS(topoMsg);
// Handle generation id
if (allowResetGenId)
{
- // Check if generation id has to be reseted
+ // Check if generation id has to be reset
mayResetGenerationId();
if (generationId < 0)
{
- generationId = handler.getGenerationId();
+ generationId = rsHandler.getGenerationId();
}
}
- if (isDifferentGenerationId(handler.getGenerationId()))
+ if (isDifferentGenerationId(rsHandler.getGenerationId()))
{
Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(
- handler.getServerId(), handler.session.getReadableRemoteAddress(),
- handler.getGenerationId(),
+ rsHandler.getServerId(),
+ rsHandler.session.getReadableRemoteAddress(),
+ rsHandler.getGenerationId(),
baseDn, getLocalRSServerId(), generationId);
logError(message);
- ErrorMsg errorMsg =
- new ErrorMsg(getLocalRSServerId(), handler.getServerId(), message);
- handler.send(errorMsg);
+ ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(),
+ rsHandler.getServerId(), message);
+ rsHandler.send(errorMsg);
}
/*
* Sends the currently known topology information to every connected
* DS we have.
*/
- buildAndSendTopoInfoToDSs(null);
+ sendTopoInfoToAllDSsExcept(null);
}
catch(Exception e)
{
@@ -2765,9 +2779,9 @@
*/
public void setPurgeDelay(long delay)
{
- for (DbHandler handler : sourceDbHandlers.values())
+ for (DbHandler dbHandler : sourceDbHandlers.values())
{
- handler.setPurgeDelay(delay);
+ dbHandler.setPurgeDelay(delay);
}
}
@@ -2857,7 +2871,7 @@
/**
* Starts the status analyzer for the domain if not already started.
*/
- public void startStatusAnalyzer()
+ private void startStatusAnalyzer()
{
int degradedStatusThreshold =
localReplicationServer.getDegradedStatusThreshold();
@@ -2888,7 +2902,7 @@
/**
* Starts the monitoring publisher for the domain if not already started.
*/
- public void startMonitoringPublisher()
+ private void startMonitoringPublisher()
{
long period = localReplicationServer.getMonitoringPublisherPeriod();
if (period > 0) // 0 means no monitoring publisher
@@ -2967,21 +2981,21 @@
/**
* Register in the domain an handler that subscribes to changes.
- * @param handler the provided subscribing handler.
+ * @param mHandler the provided subscribing handler.
*/
- public void registerHandler(MessageHandler handler)
+ public void registerHandler(MessageHandler mHandler)
{
- this.otherHandlers.add(handler);
+ this.otherHandlers.add(mHandler);
}
/**
* Unregister from the domain an handler.
- * @param handler the provided unsubscribing handler.
+ * @param mHandler the provided unsubscribing handler.
* @return Whether this handler has been unregistered with success.
*/
- public boolean unRegisterHandler(MessageHandler handler)
+ public boolean unRegisterHandler(MessageHandler mHandler)
{
- return this.otherHandlers.remove(handler);
+ return this.otherHandlers.remove(mHandler);
}
/**
@@ -3357,7 +3371,7 @@
{
saThread.setDegradedStatusThreshold(degradedStatusThreshold);
}
- else if (getConnectedDSs().size() > 0)
+ else if (connectedDSs.size() > 0)
{
// Requested to start analyzers with provided threshold value
startStatusAnalyzer();
@@ -3384,10 +3398,45 @@
{
mpThread.setPeriod(period);
}
- else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0)
+ else if (connectedDSs.size() > 0 || connectedRSs.size() > 0)
{
// Requested to start monitoring publishers with provided period value
startMonitoringPublisher();
}
}
+
+ /**
+ * Registers a DS handler into this domain and notifies the domain about the
+ * new DS.
+ *
+ * @param dsHandler
+ * The Directory Server Handler to register
+ */
+ public void register(DataServerHandler dsHandler)
+ {
+ startStatusAnalyzer();
+ startMonitoringPublisher();
+
+ // connected with new DS: store handler.
+ connectedDSs.put(dsHandler.getServerId(), dsHandler);
+
+ // Tell peer RSs and DSs a new DS just connected to us
+ // No need to re-send TopologyMsg to this just new DS
+ sendTopoInfoToAllExcept(dsHandler);
+ }
+
+ /**
+ * Registers the RS handler into this domain and notifies the domain.
+ *
+ * @param rsHandler
+ * The Replication Server Handler to register
+ */
+ public void register(ReplicationServerHandler rsHandler)
+ {
+ startMonitoringPublisher();
+
+ // connected with new RS (either outgoing or incoming
+ // connection): store handler.
+ connectedRSs.put(rsHandler.getServerId(), rsHandler);
+ }
}
--
Gitblit v1.10.0