From fa73aa0575f97205255d66ffed45d64bc04434ed Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 08 Aug 2013 15:16:13 +0000
Subject: [PATCH] ReplicationServerDomain.java: Changed getCount() return type from int to long (This change goes along with r9386). Extracted method addUpdate(), isDifferentGenerationId(), isSameGenerationId(), collectRSsEligibleForAssuredReplication(), toRSInfo(). In many places, renamed sid to serverId. Various cleanups: - converted comments to javadoc - added curly braces around if bodies - collapsed if statements - used interfaces instead of concrete classes - removed useless parentheses - removed useless 'this' qualifier in non static method calls - put code on one line where it fits
---
opends/src/server/org/opends/server/replication/server/MessageHandler.java | 11
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 766 +++++++++++++++++++++++++++++-----------------------------
2 files changed, 391 insertions(+), 386 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 6813d54..626c08b 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -140,9 +140,8 @@
*
* @param update The update that must be added to the list of updates of
* this handler.
- * @param sourceHandler The source handler that generated the update.
*/
- public void add(UpdateMsg update, MessageHandler sourceHandler)
+ public void add(UpdateMsg update)
{
synchronized (msgQueue)
{
@@ -445,15 +444,13 @@
{
if (following)
{
- if (msgQueue.isEmpty())
- {
- result = null;
- } else
+ if (!msgQueue.isEmpty())
{
UpdateMsg msg = msgQueue.first();
result = msg.getChangeNumber();
}
- } else
+ }
+ else
{
if (lateQueue.isEmpty())
{
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index fa8d3d0..274384a 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -33,6 +33,7 @@
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -73,55 +74,56 @@
public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
{
private final String baseDn;
- // The Status analyzer that periodically verifies if the connected DSs are
- // late or not
+ /**
+ * The Status analyzer that periodically verifies whether the connected DSs
+ * are late.
+ */
private StatusAnalyzer statusAnalyzer = null;
- // The monitoring publisher that periodically sends monitoring messages to the
- // topology
+ /**
+ * The monitoring publisher that periodically sends monitoring messages to the
+ * topology.
+ */
private MonitoringPublisher monitoringPublisher = null;
- /*
- * The following map contains one balanced tree for each replica ID
- * to which we are currently publishing
- * the first update in the balanced tree is the next change that we
- * must push to this particular server
- *
- * We add new TreeSet in the HashMap when a new server register
- * to this replication server.
- *
+ /**
+ * The following map contains one balanced tree for each replica ID to which
+ * we are currently publishing the first update in the balanced tree is the
+ * next change that we must push to this particular server.
+ * <p>
+ * We add new TreeSet in the HashMap when a new server register to this
+ * replication server.
*/
private final Map<Integer, DataServerHandler> directoryServers =
new ConcurrentHashMap<Integer, DataServerHandler>();
- /*
- * This map contains one ServerHandler for each replication servers
- * with which we are connected (so normally all the replication servers)
- * the first update in the balanced tree is the next change that we
- * must push to this particular server
- *
- * We add new TreeSet in the HashMap when a new replication server register
- * to this replication server.
+ /**
+ * This map contains one ServerHandler for each replication servers with which
+ * we are connected (so normally all the replication servers) the first update
+ * in the balanced tree is the next change that we must push to this
+ * particular server.
+ * <p>
+ * We add new TreeSet in the HashMap when a new replication server register to
+ * this replication server.
*/
private final Map<Integer, ReplicationServerHandler> replicationServers =
new ConcurrentHashMap<Integer, ReplicationServerHandler>();
- private final ConcurrentLinkedQueue<MessageHandler> otherHandlers =
+ private final Queue<MessageHandler> otherHandlers =
new ConcurrentLinkedQueue<MessageHandler>();
- /*
- * This map contains the List of updates received from each
- * LDAP server
+ /**
+ * This map contains the List of updates received from each LDAP server.
*/
private final Map<Integer, DbHandler> sourceDbHandlers =
new ConcurrentHashMap<Integer, DbHandler>();
private ReplicationServer replicationServer;
- // GenerationId management
+ /** GenerationId management. */
private volatile long generationId = -1;
private boolean generationIdSavedStatus = false;
- // The tracer object for the debug logger.
+ /** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
// Monitor data management
@@ -130,42 +132,53 @@
*/
private volatile MonitorData monitorData = new MonitorData();
- // This lock guards against multiple concurrent monitor data recalculation.
+ /**
+ * This lock guards against multiple concurrent monitor data recalculation.
+ */
private final Object pendingMonitorLock = new Object();
- // Guarded by pendingMonitorLock.
+ /** Guarded by pendingMonitorLock. */
private long monitorDataLastBuildDate = 0;
- // The set of replication servers which are already known to be slow to send
- // monitor data.
- //
- // Guarded by pendingMonitorLock.
+ /**
+ * The set of replication servers which are already known to be slow to send
+ * monitor data.
+ * <p>
+ * Guarded by pendingMonitorLock.
+ */
private final Set<Integer> monitorDataLateServers = new HashSet<Integer>();
- // This lock serializes updates to the pending monitor data.
+ /** This lock serializes updates to the pending monitor data. */
private final Object pendingMonitorDataLock = new Object();
- // Monitor data which is currently being calculated.
- //
- // Guarded by pendingMonitorDataLock.
+ /**
+ * Monitor data which is currently being calculated. Guarded by
+ * pendingMonitorDataLock.
+ */
private MonitorData pendingMonitorData;
- // A set containing the IDs of servers from which we are currently expecting
- // monitor responses. When a response is received from a server we remove the
- // ID from this table, and count down the latch if the ID was in the table.
- //
- // Guarded by pendingMonitorDataLock.
+ /**
+ * A set containing the IDs of servers from which we are currently expecting
+ * monitor responses. When a response is received from a server we remove the
+ * ID from this table, and count down the latch if the ID was in the table.
+ * <p>
+ * Guarded by pendingMonitorDataLock.
+ */
private final Set<Integer> pendingMonitorDataServerIDs =
new HashSet<Integer>();
- // This latch is non-null and is used in order to count incoming responses as
- // they arrive. Since incoming response may arrive at any time, even when
- // there is no pending monitor request, access to the latch must be guarded.
- //
- // Guarded by pendingMonitorDataLock.
+ /**
+ * This latch is non-null and is used in order to count incoming responses as
+ * they arrive. Since incoming response may arrive at any time, even when
+ * there is no pending monitor request, access to the latch must be guarded.
+ * <p>
+ * Guarded by pendingMonitorDataLock.
+ */
private CountDownLatch pendingMonitorDataLatch = null;
- // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+ /**
+ * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
+ */
private final long monitorDataLifeTime = 500;
@@ -173,20 +186,28 @@
/**
* The needed info for each received assured update message we are waiting
* acks for.
+ * <p>
* Key: a change number matching a received update message which requested
* assured mode usage (either safe read or safe data mode)
+ * <p>
* Value: The object holding every info needed about the already received acks
* as well as the acks to be received.
- * For more details, see ExpectedAcksInfo and its sub classes javadoc.
+ *
+ * @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub
+ * classes javadoc.
*/
private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks =
new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>();
- // The timer used to run the timeout code (timer tasks) for the assured update
- // messages we are waiting acks for.
+ /**
+ * The timer used to run the timeout code (timer tasks) for the assured update
+ * messages we are waiting acks for.
+ */
private Timer assuredTimeoutTimer = null;
- // Counter used to purge the timer tasks references in assuredTimeoutTimer,
- // every n number of treated assured messages
+ /**
+ * Counter used to purge the timer tasks references in assuredTimeoutTimer,
+ * every n number of treated assured messages.
+ */
private int assuredTimeoutTimerPurgeCounter = 0;
private ServerState ctHeartbeatState = null;
@@ -345,10 +366,17 @@
// Purge timer every 100 treated messages
assuredTimeoutTimerPurgeCounter++;
if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
+ {
assuredTimeoutTimer.purge();
+ }
}
}
+ if (expectedServers == null)
+ {
+ expectedServers = Collections.emptyList();
+ }
+
/**
* The update message equivalent to the originally received update message,
* but with assured flag disabled. This message is the one that should be
@@ -374,42 +402,25 @@
* Ignore updates to RS with bad gen id
* (no system managed status for a RS)
*/
- if ( (generationId>0) && (generationId != handler.getGenerationId()) )
+ if (isDifferentGenerationId(handler.getGenerationId()))
{
if (debugEnabled())
- TRACER.debugInfo("In " + "Replication Server " +
- replicationServer.getReplicationPort() + " " +
- baseDn + " " + replicationServer.getServerId() +
- " for dn " + baseDn + ", update " + update.getChangeNumber() +
- " will not be sent to replication server " +
- handler.getServerId() + " with generation id " +
- handler.getGenerationId() + " different from local " +
- "generation id " + generationId);
+ {
+ TRACER.debugInfo("In Replication Server "
+ + replicationServer.getReplicationPort() + " " + baseDn + " "
+ + replicationServer.getServerId() + " for dn " + baseDn
+ + ", update " + update.getChangeNumber()
+ + " will not be sent to replication server "
+ + handler.getServerId() + " with generation id "
+ + handler.getGenerationId() + " different from local "
+ + "generation id " + generationId);
+ }
continue;
}
- if (assuredMessage)
- {
- // Assured mode: post an assured or not assured matching update
- // message according to what has been computed for the destination
- // server
- if ((expectedServers != null) && expectedServers.contains(handler.
- getServerId()))
- {
- handler.add(update, sourceHandler);
- } else
- {
- if (notAssuredUpdate == null)
- {
- notAssuredUpdate = new NotAssuredUpdateMsg(update);
- }
- handler.add(notAssuredUpdate, sourceHandler);
- }
- } else
- {
- handler.add(update, sourceHandler);
- }
+ notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate,
+ assuredMessage, expectedServers);
}
}
@@ -436,58 +447,71 @@
* allows to have better performances in normal mode (most of the time).
*/
ServerStatus dsStatus = handler.getStatus();
- if ( (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
- (dsStatus == ServerStatus.FULL_UPDATE_STATUS) )
+ if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS
+ || dsStatus == ServerStatus.FULL_UPDATE_STATUS)
{
if (debugEnabled())
{
if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
- TRACER.debugInfo("In " + this +
- " for dn " + baseDn + ", update " + update.getChangeNumber() +
- " will not be sent to directory server " +
- handler.getServerId() + " with generation id " +
- handler.getGenerationId() + " different from local " +
- "generation id " + generationId);
+ {
+ TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update "
+ + update.getChangeNumber()
+ + " will not be sent to directory server "
+ + handler.getServerId() + " with generation id "
+ + handler.getGenerationId() + " different from local "
+ + "generation id " + generationId);
+ }
if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
- TRACER.debugInfo("In RS " +
- replicationServer.getServerId() +
- " for dn " + baseDn + ", update " + update.getChangeNumber() +
- " will not be sent to directory server " + handler.getServerId() +
- " as it is in full update");
+ {
+ TRACER.debugInfo("In RS " + replicationServer.getServerId()
+ + " for dn " + baseDn + ", update " + update.getChangeNumber()
+ + " will not be sent to directory server "
+ + handler.getServerId() + " as it is in full update");
+ }
}
continue;
}
- if (assuredMessage)
- {
- // Assured mode: post an assured or not assured matching update
- // message according to what has been computed for the destination
- // server
- if ((expectedServers != null) && expectedServers.contains(handler.
- getServerId()))
- {
- handler.add(update, sourceHandler);
- } else
- {
- if (notAssuredUpdate == null)
- {
- notAssuredUpdate = new NotAssuredUpdateMsg(update);
- }
- handler.add(notAssuredUpdate, sourceHandler);
- }
- } else
- {
- handler.add(update, sourceHandler);
- }
+ notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate,
+ assuredMessage, expectedServers);
}
// Push the message to the other subscribing handlers
for (MessageHandler handler : otherHandlers) {
- handler.add(update, sourceHandler);
+ handler.add(update);
}
}
+ private NotAssuredUpdateMsg addUpdate(ServerHandler handler,
+ UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate,
+ boolean assuredMessage, List<Integer> expectedServers)
+ throws UnsupportedEncodingException
+ {
+ if (assuredMessage)
+ {
+ // Assured mode: post an assured or not assured matching update
+ // message according to what has been computed for the destination server
+ if (expectedServers.contains(handler.getServerId()))
+ {
+ handler.add(update);
+ }
+ else
+ {
+ if (notAssuredUpdate == null)
+ {
+ notAssuredUpdate = new NotAssuredUpdateMsg(update);
+ }
+ handler.add(notAssuredUpdate);
+ }
+ }
+ else
+ {
+ handler.add(update);
+ }
+ return notAssuredUpdate;
+ }
+
/**
* Helper class to be the return type of a method that processes a just
* received assured update message:
@@ -503,7 +527,6 @@
* should be not null.
* Servers that are not in this list are servers not eligible for an ack
* request.
- *
*/
public List<Integer> expectedServers = null;
@@ -543,20 +566,7 @@
{
if (sourceHandler.isDataServer())
{
- // Look for RS eligible for assured
- for (ReplicationServerHandler handler : replicationServers.values())
- {
- if (handler.getGroupId() == groupId)
- // No ack expected from a RS with different group id
- {
- if ((generationId > 0) &&
- (generationId == handler.getGenerationId()))
- // No ack expected from a RS with bad gen id
- {
- expectedServers.add(handler.getServerId());
- }
- }
- }
+ collectRSsEligibleForAssuredReplication(groupId, expectedServers);
}
// Look for DS eligible for assured
@@ -574,23 +584,19 @@
if (serverStatus == ServerStatus.NORMAL_STATUS)
{
expectedServers.add(handler.getServerId());
- } else
+ } else if (serverStatus == ServerStatus.DEGRADED_STATUS) {
// No ack expected from a DS with wrong status
- {
- if (serverStatus == ServerStatus.DEGRADED_STATUS)
- {
- wrongStatusServers.add(handler.getServerId());
- }
- /**
- * else
- * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
- * We do not want this to be reported as an error to the update
- * maker -> no pollution or potential misunderstanding when
- * reading logs or monitoring and it was just administration (for
- * instance new server is being configured in topo: it goes in bad
- * gen then then full full update).
- */
+ wrongStatusServers.add(handler.getServerId());
}
+ /*
+ * else
+ * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
+ * We do not want this to be reported as an error to the update
+ * maker -> no pollution or potential misunderstanding when
+ * reading logs or monitoring and it was just administration (for
+ * instance new server is being configured in topo: it goes in bad
+ * gen then full update).
+ */
}
}
}
@@ -644,15 +650,11 @@
Integer.toString(replicationServer.getServerId()),
Byte.toString(safeDataLevel), baseDn, update.toString());
logError(errorMsg);
- } else if (sourceGroupId != groupId)
+ } else if (sourceGroupId == groupId
+ // Assured feature does not cross different group IDS
+ && isSameGenerationId(sourceHandler.getGenerationId()))
+ // Ignore assured updates from wrong generationId servers
{
- // Assured feature does not cross different group IDS
- } else
- {
- if ((generationId > 0) &&
- (generationId == sourceHandler.getGenerationId()))
- // Ignore assured updates from wrong generationId servers
- {
if (sourceHandler.isDataServer())
{
if (safeDataLevel == (byte) 1)
@@ -685,23 +687,12 @@
sourceHandler.send(new AckMsg(cn));
}
}
- }
}
List<Integer> expectedServers = new ArrayList<Integer>();
if (interestedInAcks && sourceHandler.isDataServer())
{
- // Look for RS eligible for assured
- for (ReplicationServerHandler handler : replicationServers.values())
- {
- if (handler.getGroupId() == groupId
- // No ack expected from a RS with different group id
- && generationId > 0 && (generationId == handler.getGenerationId()))
- // No ack expected from a RS with bad gen id
- {
- expectedServers.add(handler.getServerId());
- }
- }
+ collectRSsEligibleForAssuredReplication(groupId, expectedServers);
}
// Return computed structures
@@ -718,9 +709,9 @@
// servers: the level is a best effort thing, we do not want to timeout
// at every assured SD update for instance if a RS has had his gen id
// reseted
- byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ?
+ byte finalSdl = (nExpectedServers >= neededAdditionalServers) ?
(byte)sdl : // Keep level as it was
- (byte)(nExpectedServers+1)); // Change level to match what's available
+ (byte)(nExpectedServers+1); // Change level to match what's available
preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
sourceHandler, finalSdl, expectedServers);
preparedAssuredInfo.expectedServers = expectedServers;
@@ -735,6 +726,32 @@
return preparedAssuredInfo;
}
+ private void collectRSsEligibleForAssuredReplication(byte groupId,
+ List<Integer> expectedServers)
+ {
+ for (ReplicationServerHandler handler : replicationServers.values())
+ {
+ if (handler.getGroupId() == groupId
+ // No ack expected from a RS with different group id
+ && isSameGenerationId(handler.getGenerationId())
+ // No ack expected from a RS with bad gen id
+ )
+ {
+ expectedServers.add(handler.getServerId());
+ }
+ }
+ }
+
+ private boolean isSameGenerationId(long generationId)
+ {
+ return this.generationId > 0 && this.generationId == generationId;
+ }
+
+ private boolean isDifferentGenerationId(long generationId)
+ {
+ return this.generationId > 0 && this.generationId != generationId;
+ }
+
/**
* Process an ack received from a given server.
*
@@ -810,7 +827,7 @@
/**
* Constructor for the timer task.
- * @param cn The changenumber of the assured update we are waiting acks for
+ * @param cn The changeNumber of the assured update we are waiting acks for
*/
public AssuredTimeoutTask(ChangeNumber cn)
{
@@ -843,10 +860,13 @@
AckMsg finalAck = expectedAcksInfo.createAck(true);
ServerHandler origServer = expectedAcksInfo.getRequesterServer();
if (debugEnabled())
- TRACER.debugInfo(
- "In RS " + replicationServer.getServerId() + " for " + baseDn +
- ", sending timeout for assured update with change " + " number " +
- cn + " to server id " + origServer.getServerId());
+ {
+ TRACER.debugInfo("In RS " + replicationServer.getServerId()
+ + " for "+ baseDn
+ + ", sending timeout for assured update with change "
+ + " number " + cn + " to server id "
+ + origServer.getServerId());
+ }
try
{
origServer.send(finalAck);
@@ -882,37 +902,30 @@
List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers();
for (Integer serverId : serversInTimeout)
{
- ServerHandler expectedServerInTimeout =
- directoryServers.get(serverId);
- if (expectedServerInTimeout != null)
+ ServerHandler expectedDSInTimeout = directoryServers.get(serverId);
+ ServerHandler expectedRSInTimeout =
+ replicationServers.get(serverId);
+ if (expectedDSInTimeout != null)
{
- // Was a DS
if (safeRead)
{
- expectedServerInTimeout.incrementAssuredSrSentUpdatesTimeout();
+ expectedDSInTimeout.incrementAssuredSrSentUpdatesTimeout();
} else
{
// No SD update sent to a DS (meaningless)
}
- } else
+ } else if (expectedRSInTimeout != null)
{
- expectedServerInTimeout =
- replicationServers.get(serverId);
- if (expectedServerInTimeout != null)
+ if (safeRead)
{
- // Was a RS
- if (safeRead)
- {
- expectedServerInTimeout.
- incrementAssuredSrSentUpdatesTimeout();
- } else
- {
- expectedServerInTimeout.
- incrementAssuredSdSentUpdatesTimeout();
- }
+ expectedRSInTimeout.incrementAssuredSrSentUpdatesTimeout();
}
- /* else server disappeared ? Let's forget about it. */
+ else
+ {
+ expectedRSInTimeout.incrementAssuredSdSentUpdatesTimeout();
+ }
}
+ // else server disappeared ? Let's forget about it.
}
// Mark the ack info object as completed to prevent potential
// processAck() code parallel run
@@ -934,7 +947,9 @@
for (ReplicationServerHandler handler : replicationServers.values())
{
if (replServers.contains(handler.getServerAddressURL()))
+ {
stopServer(handler, false);
+ }
}
}
@@ -989,11 +1004,12 @@
*/
public void stopServer(ServerHandler handler, boolean shutdown)
{
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " domain=" + this + " stopServer() on the server handler " +
- handler.getMonitorInstanceName());
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + " domain=" + this + " stopServer() on the server handler "
+ + handler.getMonitorInstanceName());
+ }
/*
* We must prevent deadlock on replication server domain lock, when for
* instance this code is called from dying ServerReader but also dying
@@ -1027,10 +1043,12 @@
if ( (directoryServers.size() + replicationServers.size() )== 1)
{
if (debugEnabled())
- TRACER.debugInfo("In " +
- replicationServer.getMonitorInstanceName() +
- " remote server " + handler.getMonitorInstanceName() + " is " +
- "the last RS/DS to be stopped: stopping monitoring publisher");
+ {
+ TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
+ + " remote server " + handler.getMonitorInstanceName()
+ + " is the last RS/DS to be stopped:"
+ + " stopping monitoring publisher");
+ }
stopMonitoringPublisher();
}
@@ -1057,10 +1075,11 @@
if (directoryServers.size() == 1)
{
if (debugEnabled())
- TRACER.debugInfo("In " +
- replicationServer.getMonitorInstanceName() +
- " remote server " + handler.getMonitorInstanceName() +
- " is the last DS to be stopped: stopping status analyzer");
+ {
+ TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
+ + " remote server " + handler.getMonitorInstanceName()
+ + " is the last DS to be stopped: stopping status analyzer");
+ }
stopStatusAnalyzer();
}
@@ -1106,10 +1125,11 @@
public void stopServer(MessageHandler handler)
{
if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName()
+ {
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ " domain=" + this + " stopServer() on the message handler "
+ handler.getMonitorInstanceName());
+ }
/*
* We must prevent deadlock on replication server domain lock, when for
* instance this code is called from dying ServerReader but also dying
@@ -1176,18 +1196,19 @@
* server currently connected in the whole topology on this domain and
* if the generationId has never been saved.
*
- * - test emtpyness of directoryServers list
+ * - test emptiness of directoryServers list
* - traverse replicationServers list and test for each if DS are connected
* So it strongly relies on the directoryServers list
*/
private void mayResetGenerationId()
{
if (debugEnabled())
- TRACER.debugInfo(
- "In RS " + this.replicationServer.getMonitorInstanceName() +
- " for " + baseDn + " " +
- " mayResetGenerationId generationIdSavedStatus=" +
- generationIdSavedStatus);
+ {
+ TRACER.debugInfo("In RS "
+ + this.replicationServer.getMonitorInstanceName() + " for " + baseDn
+ + " " + " mayResetGenerationId generationIdSavedStatus="
+ + generationIdSavedStatus);
+ }
// If there is no more any LDAP server connected to this domain in the
// topology and the generationId has never been saved, then we can reset
@@ -1200,35 +1221,39 @@
if (generationId != rsh.getGenerationId())
{
if (debugEnabled())
- TRACER.debugInfo(
- "In RS " + this.replicationServer.getMonitorInstanceName() +
- " for " + baseDn + " " +
- " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
- " that has different genId");
- } else
- {
- if (rsh.hasRemoteLDAPServers())
{
+ TRACER.debugInfo("In RS "
+ + this.replicationServer.getMonitorInstanceName() + " for "
+ + baseDn + " " + " mayResetGenerationId skip RS"
+ + rsh.getMonitorInstanceName() + " that has different genId");
+ }
+ } else if (rsh.hasRemoteLDAPServers())
+ {
lDAPServersConnectedInTheTopology = true;
if (debugEnabled())
- TRACER.debugInfo(
- "In RS " + this.replicationServer.getMonitorInstanceName() +
- " for " + baseDn + " " +
- " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
- " has servers connected to it - will not reset generationId");
+ {
+ TRACER.debugInfo("In RS "
+ + this.replicationServer.getMonitorInstanceName()
+ + " for "+ baseDn + " mayResetGenerationId RS"
+ + rsh.getMonitorInstanceName()
+ + " has servers connected to it"
+ + " - will not reset generationId");
+ }
break;
- }
}
}
} else
{
lDAPServersConnectedInTheTopology = true;
+
if (debugEnabled())
- TRACER.debugInfo(
- "In RS " + this.replicationServer.getMonitorInstanceName() +
- " for " + baseDn + " " +
- " has servers connected to it - will not reset generationId");
+ {
+ TRACER.debugInfo("In RS "
+ + this.replicationServer.getMonitorInstanceName() + " for "
+ + baseDn + " "
+ + " has servers connected to it - will not reset generationId");
+ }
}
if (!lDAPServersConnectedInTheTopology && !this.generationIdSavedStatus
@@ -1292,7 +1317,6 @@
* The next change to send is always the first one in the tree
* So this methods simply need to check that dependencies are OK
* and update this replicaId RUV
- *
*/
return handler.take();
}
@@ -1304,14 +1328,12 @@
*/
public Set<String> getChangelogs()
{
- LinkedHashSet<String> mySet = new LinkedHashSet<String>();
-
+ Set<String> results = new LinkedHashSet<String>();
for (ReplicationServerHandler handler : replicationServers.values())
{
- mySet.add(handler.getServerAddressURL());
+ results.add(handler.getServerAddressURL());
}
-
- return mySet;
+ return results;
}
/**
@@ -1333,13 +1355,12 @@
*/
public List<String> getConnectedLDAPservers()
{
- List<String> mySet = new ArrayList<String>(0);
-
+ List<String> results = new ArrayList<String>(0);
for (DataServerHandler handler : directoryServers.values())
{
- mySet.add(String.valueOf(handler.getServerId()));
+ results.add(String.valueOf(handler.getServerId()));
}
- return mySet;
+ return results;
}
/**
@@ -1358,7 +1379,9 @@
{
DbHandler handler = sourceDbHandlers.get(serverId);
if (handler == null)
+ {
return null;
+ }
ReplicationIterator it;
try
@@ -1386,16 +1409,15 @@
* @param from lower limit changenumber.
* @param to upper limit changenumber.
* @return the number of changes.
- *
*/
- public int getCount(int serverId,
- ChangeNumber from, ChangeNumber to)
+ public long getCount(int serverId, ChangeNumber from, ChangeNumber to)
{
DbHandler handler = sourceDbHandlers.get(serverId);
- if (handler == null)
- return 0;
-
- return handler.getCount(from, to);
+ if (handler != null)
+ {
+ return handler.getCount(from, to);
+ }
+ return 0;
}
/**
@@ -1450,8 +1472,7 @@
private List<ServerHandler> getDestinationServers(RoutableMsg msg,
ServerHandler senderHandler)
{
- List<ServerHandler> servers =
- new ArrayList<ServerHandler>();
+ List<ServerHandler> servers = new ArrayList<ServerHandler>();
if (msg.getDestination() == RoutableMsg.THE_CLOSEST_SERVER)
{
@@ -1476,7 +1497,9 @@
{
// Don't loop on the sender
if (destinationHandler == senderHandler)
+ {
continue;
+ }
servers.add(destinationHandler);
}
} else
@@ -1775,7 +1798,7 @@
}
// Populate the RS state in the msg from the DbState
- monitorMsg.setReplServerDbState(this.getDbServerState());
+ monitorMsg.setReplServerDbState(getDbServerState());
return monitorMsg;
}
finally
@@ -1849,7 +1872,7 @@
{
for (DataServerHandler handler : directoryServers.values())
{
- if ((notThisOne == null) || (handler != notThisOne))
+ if (notThisOne == null || handler != notThisOne)
// All except passed one
{
for (int i=1; i<=2; i++)
@@ -1934,9 +1957,7 @@
// Create info for the local RS
List<RSInfo> rsInfos = new ArrayList<RSInfo>();
- RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
- replicationServer.getServerURL(), generationId,
- replicationServer.getGroupId(), replicationServer.getWeight());
+ RSInfo localRSInfo = toRSInfo(replicationServer, generationId);
rsInfos.add(localRSInfo);
return new TopologyMsg(dsInfos, rsInfos);
@@ -1961,14 +1982,14 @@
for (DataServerHandler serverHandler : directoryServers.values())
{
if (serverHandler.getServerId() == destDsId)
+ {
continue;
+ }
dsInfos.add(serverHandler.toDSInfo());
}
// Add our own info (local RS)
- RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
- replicationServer.getServerURL(), generationId,
- replicationServer.getGroupId(), replicationServer.getWeight());
+ RSInfo localRSInfo = toRSInfo(replicationServer, generationId);
rsInfos.add(localRSInfo);
// Go through every peer RSs (and get their connected DSs), also add info
@@ -1984,6 +2005,12 @@
return new TopologyMsg(dsInfos, rsInfos);
}
+ private RSInfo toRSInfo(ReplicationServer rs, long generationId)
+ {
+ return new RSInfo(rs.getServerId(), rs.getServerURL(), generationId,
+ rs.getGroupId(), rs.getWeight());
+ }
+
/**
* Get the generationId associated to this domain.
*
@@ -2059,10 +2086,11 @@
ResetGenerationIdMsg genIdMsg)
{
if (debugEnabled())
- TRACER.debugInfo(
- "In " + this +
- " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId()+
- " for baseDn " + baseDn + ":\n" + genIdMsg);
+ {
+ TRACER.debugInfo("In " + this + " Receiving ResetGenerationIdMsg from "
+ + senderHandler.getServerId() + " for baseDn " + baseDn + ":\n"
+ + genIdMsg);
+ }
try
{
@@ -2090,11 +2118,12 @@
{
// Order to take a gen id we already have, just ignore
if (debugEnabled())
- TRACER.debugInfo(
- "In " + this
+ {
+ TRACER.debugInfo("In " + this
+ " Reset generation id requested for baseDn " + baseDn
- + " but generation id was already " + this.generationId
- + ":\n" + genIdMsg);
+ + " but generation id was already " + this.generationId + ":\n"
+ + genIdMsg);
+ }
}
// If we are the first replication server warned,
@@ -2246,13 +2275,12 @@
// status of a DS has to be changed. See more comments in run method of
// StatusAnalyzer.
if (debugEnabled())
- TRACER
- .debugInfo("Status analyzer for domain "
- + baseDn
- + " has been interrupted when"
- + " trying to acquire domain lock for changing the status"
- + " of DS "
- + serverHandler.getServerId());
+ {
+ TRACER.debugInfo("Status analyzer for domain " + baseDn
+ + " has been interrupted when"
+ + " trying to acquire domain lock for changing the status of DS "
+ + serverHandler.getServerId());
+ }
return true;
}
@@ -2273,8 +2301,7 @@
e.getMessage()));
}
- if ((newStatus == ServerStatus.INVALID_STATUS)
- || (newStatus == oldStatus))
+ if (newStatus == ServerStatus.INVALID_STATUS || newStatus == oldStatus)
{
// Change was impossible or already occurred (see StatusAnalyzer
// comments)
@@ -2347,11 +2374,11 @@
public boolean isDegradedDueToGenerationId(int serverId)
{
if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDN=" + baseDn +
- " isDegraded serverId=" + serverId +
- " given local generation Id=" + this.generationId);
+ {
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + " baseDN=" + baseDn + " isDegraded serverId=" + serverId
+ + " given local generation Id=" + this.generationId);
+ }
ServerHandler handler = replicationServers.get(serverId);
if (handler == null)
@@ -2364,12 +2391,12 @@
}
if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDN=" + baseDn +
- " Compute degradation of serverId=" + serverId +
- " LS server generation Id=" + handler.getGenerationId());
- return (handler.getGenerationId() != this.generationId);
+ {
+ TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ + " baseDN=" + baseDn + " Compute degradation of serverId="
+ + serverId + " LS server generation Id=" + handler.getGenerationId());
+ }
+ return handler.getGenerationId() != this.generationId;
}
/**
@@ -2432,10 +2459,12 @@
// Check if generation id has to be reseted
mayResetGenerationId();
if (generationId < 0)
+ {
generationId = handler.getGenerationId();
+ }
}
- if (generationId > 0 && (generationId != handler.getGenerationId()))
+ if (isDifferentGenerationId(handler.getGenerationId()))
{
Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(handler
.getServerId(), handler.session
@@ -2651,9 +2680,9 @@
ServerState dbServerState = getDbServerState();
pendingMonitorData.setRSState(replicationServer.getServerId(),
dbServerState);
- for (int sid : dbServerState) {
- ChangeNumber storedCN = dbServerState.getChangeNumber(sid);
- pendingMonitorData.setMaxCN(sid, storedCN);
+ for (int serverId : dbServerState) {
+ ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
+ pendingMonitorData.setMaxCN(serverId, storedCN);
}
}
@@ -2668,8 +2697,7 @@
* @param serverId
* server handler that is receiving the message.
*/
- private void receivesMonitorDataResponse(MonitorMsg msg,
- int serverId)
+ private void receivesMonitorDataResponse(MonitorMsg msg, int serverId)
{
synchronized (pendingMonitorDataLock)
{
@@ -2690,8 +2718,7 @@
pendingMonitorData.setMaxCNs(replServerState);
// store the remote RS states.
- pendingMonitorData.setRSState(msg.getSenderID(),
- replServerState);
+ pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
// Store the remote LDAP servers states
Iterator<Integer> lsidIterator = msg.ldapIterator();
@@ -2720,24 +2747,20 @@
{
int connectedlsid = connectedlsh.getServerId();
Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
- pendingMonitorData.setFirstMissingDate(connectedlsid,
- newfmd);
+ pendingMonitorData.setFirstMissingDate(connectedlsid, newfmd);
}
}
else
{
// this is the latency of the remote RSi regarding another RSj
// let's update the latency of the LSes connected to RSj
- ReplicationServerHandler rsjHdr = replicationServers
- .get(rsid);
+ ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
if (rsjHdr != null)
{
- for (int remotelsid : rsjHdr
- .getConnectedDirectoryServerIds())
+ for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds())
{
Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
- pendingMonitorData.setFirstMissingDate(remotelsid,
- newfmd);
+ pendingMonitorData.setFirstMissingDate(remotelsid, newfmd);
}
}
}
@@ -2816,7 +2839,7 @@
private final ReentrantLock lock = new ReentrantLock();
/**
- * This lock is used to protect the generationid variable.
+ * This lock is used to protect the generationId variable.
*/
private final Object generationIDLock = new Object();
@@ -2984,28 +3007,21 @@
* {@inheritDoc}
*/
@Override
- public ArrayList<Attribute> getMonitorData()
+ public List<Attribute> getMonitorData()
{
- /*
- * publish the server id and the port number.
- */
- ArrayList<Attribute> attributes = new ArrayList<Attribute>();
+ // publish the server id and the port number.
+ List<Attribute> attributes = new ArrayList<Attribute>();
attributes.add(Attributes.create("replication-server-id",
String.valueOf(replicationServer.getServerId())));
attributes.add(Attributes.create("replication-server-port",
String.valueOf(replicationServer.getReplicationPort())));
- /*
- * Add all the base DNs that are known by this replication server.
- */
- AttributeBuilder builder = new AttributeBuilder("domain-name");
- builder.add(baseDn);
- attributes.add(builder.toAttribute());
+ // Add all the base DNs that are known by this replication server.
+ attributes.add(Attributes.create("domain-name", baseDn));
// Publish to monitor the generation ID by replicationServerDomain
- builder = new AttributeBuilder("generation-id");
- builder.add(baseDn + " " + generationId);
- attributes.add(builder.toAttribute());
+ attributes.add(Attributes.create("generation-id",
+ baseDn + " " + generationId));
MonitorData md = getDomainMonitorData();
@@ -3045,7 +3061,7 @@
{
if (ctHeartbeatState == null)
{
- ctHeartbeatState = this.getDbServerState().duplicate();
+ ctHeartbeatState = getDbServerState().duplicate();
}
return ctHeartbeatState;
}
@@ -3053,6 +3069,7 @@
/**
* Computes the eligible server state for the domain.
*
+ * <pre>
* s1 s2 s3
* -- -- --
* cn31
@@ -3062,6 +3079,7 @@
* cn14
* cn26
* cn13
+ * </pre>
*
* The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
*
@@ -3070,7 +3088,7 @@
*/
public ServerState getEligibleState(ChangeNumber eligibleCN)
{
- ServerState dbState = this.getDbServerState();
+ ServerState dbState = getDbServerState();
// The result is initialized from the dbState.
// From it, we don't want to keep the changes newer than eligibleCN.
@@ -3078,9 +3096,10 @@
if (eligibleCN != null)
{
- for (int sid : dbState) {
- DbHandler h = sourceDbHandlers.get(sid);
- ChangeNumber mostRecentDbCN = dbState.getChangeNumber(sid);
+ for (int serverId : dbState)
+ {
+ DbHandler h = sourceDbHandlers.get(serverId);
+ ChangeNumber mostRecentDbCN = dbState.getChangeNumber(serverId);
try {
// Is the most recent change in the Db newer than eligible CN ?
// if yes (like cn15 in the example above, then we have to go back
@@ -3090,13 +3109,13 @@
ReplicationIterator ri = null;
try {
ri = h.generateIterator(eligibleCN);
- if ((ri != null) && (ri.getChange() != null)) {
+ if (ri != null && ri.getChange() != null) {
ChangeNumber newCN = ri.getChange().getChangeNumber();
result.update(newCN);
}
} catch (Exception e) {
// there's no change older than eligibleCN (case of s3/cn31)
- result.update(new ChangeNumber(0, 0, sid));
+ result.update(new ChangeNumber(0, 0, serverId));
} finally {
if (ri != null) {
ri.releaseCursor();
@@ -3117,8 +3136,10 @@
}
if (debugEnabled())
- TRACER.debugInfo("In " + this
- + " getEligibleState() result is " + result);
+ {
+ TRACER
+ .debugInfo("In " + this + " getEligibleState() result is " + result);
+ }
return result;
}
@@ -3154,11 +3175,11 @@
for (DbHandler db : sourceDbHandlers.values())
{
// Consider this producer (DS/db).
- int sid = db.getServerId();
+ int serverId = db.getServerId();
// Should it be considered for eligibility ?
ChangeNumber heartbeatLastCN =
- getChangeTimeHeartbeatState().getChangeNumber(sid);
+ getChangeTimeHeartbeatState().getChangeNumber(serverId);
// If the most recent UpdateMsg or CLHeartbeatMsg received is very old
// then the domain is considered down and not considered for eligibility
@@ -3174,31 +3195,32 @@
}
*/
- boolean sidConnected = false;
- if (directoryServers.containsKey(sid))
+ boolean serverIdConnected = false;
+ if (directoryServers.containsKey(serverId))
{
- sidConnected = true;
+ serverIdConnected = true;
}
else
{
// not directly connected
for (ReplicationServerHandler rsh : replicationServers.values())
{
- if (rsh.isRemoteLDAPServer(sid))
+ if (rsh.isRemoteLDAPServer(serverId))
{
- sidConnected = true;
+ serverIdConnected = true;
break;
}
}
}
- if (!sidConnected)
+ if (!serverIdConnected)
{
if (debugEnabled())
- TRACER.debugInfo("In " + "Replication Server " +
- replicationServer.getReplicationPort() + " " +
- baseDn + " " + replicationServer.getServerId() +
- " Server " + sid
- + " is not considered for eligibility ... potentially down");
+ {
+ TRACER.debugInfo("In " + "Replication Server "
+ + replicationServer.getReplicationPort() + " " + baseDn + " "
+ + replicationServer.getServerId() + " Server " + serverId
+ + " is not considered for eligibility ... potentially down");
+ }
continue;
}
@@ -3216,10 +3238,12 @@
}
if (debugEnabled())
- TRACER.debugInfo(
- "In " + "Replication Server " + replicationServer.getReplicationPort() +
- " " + baseDn + " " + replicationServer.getServerId() +
- " getEligibleCN() returns result =" + eligibleCN);
+ {
+ TRACER.debugInfo("In Replication Server "
+ + replicationServer.getReplicationPort() + " " + baseDn + " "
+ + replicationServer.getServerId()
+ + " getEligibleCN() returns result =" + eligibleCN);
+ }
return eligibleCN;
}
@@ -3254,8 +3278,7 @@
{
// If we are the first replication server warned,
// then forwards the message to the remote replication servers
- for (ReplicationServerHandler rsHandler : replicationServers
- .values())
+ for (ReplicationServerHandler rsHandler : replicationServers.values())
{
try
{
@@ -3292,19 +3315,6 @@
// TODO:May be we can spare processing by only storing CN (timestamp)
// instead of a server state.
getChangeTimeHeartbeatState().update(cn);
-
- /*
- if (debugEnabled())
- {
- Set<String> ss = ctHeartbeatState.toStringSet();
- String dss = "";
- for (String s : ss)
- {
- dss = dss + " \\ " + s;
- }
- TRACER.debugInfo("In " + this.getName() + " " + dss);
- }
- */
}
/**
@@ -3319,20 +3329,22 @@
{
long res = 0;
- // Parses the dbState of the domain , server by server
- ServerState dbState = this.getDbServerState();
- for (int sid : dbState) {
- // process one sid
+ for (int serverId : getDbServerState())
+ {
ChangeNumber startCN = null;
- if (startState.getChangeNumber(sid) != null)
- startCN = startState.getChangeNumber(sid);
- long sidRes = getCount(sid, startCN, endCN);
+ if (startState.getChangeNumber(serverId) != null)
+ {
+ startCN = startState.getChangeNumber(serverId);
+ }
+ long serverIdRes = getCount(serverId, startCN, endCN);
// The startPoint is excluded when counting the ECL eligible changes
- if ((startCN != null) && (sidRes > 0))
- sidRes--;
+ if (startCN != null && serverIdRes > 0)
+ {
+ serverIdRes--;
+ }
- res += sidRes;
+ res += serverIdRes;
}
return res;
}
@@ -3348,14 +3360,10 @@
public long getEligibleCount(ChangeNumber startCN, ChangeNumber endCN)
{
long res = 0;
-
- // Parses the dbState of the domain , server by server
- ServerState dbState = this.getDbServerState();
- for (int sid : dbState) {
- // process one sid
+ for (int serverId : getDbServerState()) {
ChangeNumber lStartCN =
- new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), sid);
- res += getCount(sid, lStartCN, endCN);
+ new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), serverId);
+ res += getCount(serverId, lStartCN, endCN);
}
return res;
}
@@ -3370,7 +3378,7 @@
long latest = 0;
for (DbHandler db : sourceDbHandlers.values())
{
- if ((latest==0) || (latest<db.getLatestTrimDate()))
+ if (latest == 0 || latest < db.getLatestTrimDate())
{
latest = db.getLatestTrimDate();
}
--
Gitblit v1.10.0