From d04fb0f282e0fd9a4bc80d3f9d5ee15506a3b83b Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 672 ++++++++++++++++++++++++++++++++++++++++++++-----------
1 files changed, 539 insertions(+), 133 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index a7c3dae..cf68944 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -50,7 +50,6 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -58,17 +57,21 @@
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantLock;
+import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
/**
* This class define an in-memory cache that will be used to store
@@ -88,9 +91,8 @@
*/
public class ReplicationServerDomain
{
-
private final Object flowControlLock = new Object();
- private final DN baseDn;
+ private final String baseDn;
// The Status analyzer that periodically verifis if the connected DSs are
// late or not
private StatusAnalyzer statusAnalyzer = null;
@@ -154,16 +156,38 @@
private MonitorData wrkMonitorData;
/**
+ * The needed info for each received assured update message we are waiting
+ * acks for.
+ * Key: a change number matching a received update message which requested
+ * assured mode usage (either safe read or safe data mode)
+ * Value: The object holding every info needed about the already received acks
+ * as well as the acks to be received.
+ * For more details, see ExpectedAcksInfo and its sub classes javadoc.
+ */
+ private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks =
+ new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>();
+
+ // The timer used to run the timeout code (timer tasks) for the assured update
+ // messages we are waiting acks for.
+ private Timer assuredTimeoutTimer = null;
+ // Counter used to purge the timer tasks referemces in assuredTimeoutTimer,
+ // every n number of treated assured messages
+ private int assuredTimeoutTimerPurgeCounter = 0;
+
+ /**
* Creates a new ReplicationServerDomain associated to the DN baseDn.
*
* @param baseDn The baseDn associated to the ReplicationServerDomain.
* @param replicationServer the ReplicationServer that created this
* replicationServer cache.
*/
- public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer)
+ public ReplicationServerDomain(
+ String baseDn, ReplicationServer replicationServer)
{
this.baseDn = baseDn;
this.replicationServer = replicationServer;
+ this.assuredTimeoutTimer = new Timer("Replication Assured Timer for " +
+ baseDn + " in RS " + replicationServer.getServerId(), true);
}
/**
@@ -179,32 +203,11 @@
public void put(UpdateMsg update, ServerHandler sourceHandler)
throws IOException
{
- /*
- * TODO : In case that the source server is a LDAP server this method
- * should check that change did get pushed to at least one
- * other replication server before pushing it to the LDAP servers
- */
-
- short id = update.getChangeNumber().getServerId();
+ ChangeNumber cn = update.getChangeNumber();
+ short id = cn.getServerId();
sourceHandler.updateServerState(update);
sourceHandler.incrementInCount();
- if (update.isAssured())
- {
- int count = this.NumServers();
- if (count > 1)
- {
- if (sourceHandler.isReplicationServer())
- ServerHandler.addWaitingAck(update, sourceHandler.getServerId(),
- this, count - 1);
- else
- sourceHandler.addWaitingAck(update, count - 1);
- } else
- {
- sourceHandler.sendAck(update.getChangeNumber());
- }
- }
-
if (generationId < 0)
{
generationId = sourceHandler.getGenerationId();
@@ -244,11 +247,103 @@
// Publish the messages to the source handler
dbHandler.add(update);
+ /**
+ * If this is an assured message (a message requesting ack), we must
+ * construct the ExpectedAcksInfo object with the right number of expected
+ * acks before posting message to the writers. Otherwise some writers may
+ * have time to post, receive the ack and increment received ack counter
+ * (kept in ExpectedAcksInfo object) and we could think the acknowledgment
+ * is fully processed although it may be not (some other acks from other
+ * servers are not yet arrived). So for that purpose we do a pre-loop
+ * to determine to who we will post an assured message.
+ * Whether the assured mode is safe read or safe data, we anyway do not
+ * support the assured replication feature across topologies with different
+ * group ids. The assured feature insures assured replication based on the
+ * same locality (group id). For instance in double data center deployment
+ * (2 group id usage) with assured replication enabled, an assured message
+ * sent from data center 1 (group id = 1) will be sent to servers of both
+ * data centers, but one will request and wait acks only from servers of the
+ * data center 1.
+ */
+ boolean assuredMessage = update.isAssured();
+ PreparedAssuredInfo preparedAssuredInfo = null;
+ if (assuredMessage)
+ {
+ // Assured feature is supported starting from replication protocol V2
+ if (sourceHandler.getProtocolVersion() >=
+ ProtocolVersion.REPLICATION_PROTOCOL_V2)
+ {
+ // According to assured sub-mode, prepare structures to keep track of
+ // the acks we are interested in.
+ AssuredMode assuredMode = update.getAssuredMode();
+ if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+ {
+ preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
+ } else if (assuredMode != AssuredMode.SAFE_READ_MODE)
+ {
+ preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
+ } else
+ {
+ // Unknown assured mode: should never happen
+ Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
+ Short.toString(replicationServer.getServerId()),
+ assuredMode.toString(), baseDn, update.toString());
+ logError(errorMsg);
+ assuredMessage = false;
+ }
+ } else
+ {
+ assuredMessage = false;
+ }
+ }
+
+ List<Short> expectedServers = null;
+ if (assuredMessage)
+ {
+ expectedServers = preparedAssuredInfo.expectedServers;
+ if (expectedServers != null)
+ {
+ // Store the expected acks info into the global map.
+ // The code for processing reception of acks for this update will update
+ // info kept in this object and if enough acks received, it will send
+ // back the final ack to the requester and remove the object from this
+ // map
+ // OR
+ // The following timer will time out and send an timeout ack to the
+ // requester if the acks are not received in time. The timer will also
+ // remove the object from this map.
+ waitingAcks.put(cn, preparedAssuredInfo.expectedAcksInfo);
+
+ // Arm timer for this assured update message (wait for acks until it
+ // times out)
+ AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn);
+ assuredTimeoutTimer.schedule(assuredTimeoutTask,
+ replicationServer.getAssuredTimeout());
+ // Purge timer every 100 treated messages
+ assuredTimeoutTimerPurgeCounter++;
+ if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
+ assuredTimeoutTimer.purge();
+ }
+ }
+
+ /**
+ * The update message equivalent to the originally received update message,
+ * but with assured flag disabled. This message is the one that should be
+ * sent to non elligible servers for assured mode.
+ * We need a clone like of the original message with assured flag off, to be
+ * posted to servers we don't want to wait the ack from (not normal status
+ * servers or servers with different group id). This must be done because
+ * the posted message is a reference so each writer queue gets the same
+ * reference, thus, changing the assured flag of an object is done for every
+ * references posted on every writer queues. That is why we need a message
+ * version with assured flag on and another one with assured flag off.
+ */
+ NotAssuredUpdateMsg notAssuredUpdate = null;
/*
* Push the message to the replication servers
*/
- if (!sourceHandler.isReplicationServer())
+ if (sourceHandler.isLDAPserver())
{
for (ServerHandler handler : replicationServers.values())
{
@@ -261,7 +356,7 @@
if (debugEnabled())
TRACER.debugInfo("In RS " +
replicationServer.getServerId() +
- " for dn " + baseDn.toNormalizedString() + ", update " +
+ " for dn " + baseDn + ", update " +
update.getChangeNumber().toString() +
" will not be sent to replication server " +
Short.toString(handler.getServerId()) + " with generation id " +
@@ -272,7 +367,27 @@
continue;
}
- handler.add(update, sourceHandler);
+ if (assuredMessage)
+ {
+ // Assured mode: post an assured or not assured matching update
+ // message according to what has been computed for the destination
+ // server
+ if ((expectedServers != null) && expectedServers.contains(handler.
+ getServerId()))
+ {
+ handler.add(update, sourceHandler);
+ } else
+ {
+ if (notAssuredUpdate == null)
+ {
+ notAssuredUpdate = new NotAssuredUpdateMsg(update);
+ }
+ handler.add(notAssuredUpdate, sourceHandler);
+ }
+ } else
+ {
+ handler.add(update, sourceHandler);
+ }
}
}
@@ -281,7 +396,7 @@
*/
for (ServerHandler handler : directoryServers.values())
{
- // don't forward the change to the server that just sent it
+ // Don't forward the change to the server that just sent it
if (handler == sourceHandler)
{
continue;
@@ -307,7 +422,7 @@
if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
TRACER.debugInfo("In RS " +
replicationServer.getServerId() +
- " for dn " + baseDn.toNormalizedString() + ", update " +
+ " for dn " + baseDn + ", update " +
update.getChangeNumber().toString() +
" will not be sent to directory server " +
Short.toString(handler.getServerId()) + " with generation id " +
@@ -317,7 +432,7 @@
if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
TRACER.debugInfo("In RS " +
replicationServer.getServerId() +
- " for dn " + baseDn.toNormalizedString() + ", update " +
+ " for dn " + baseDn + ", update " +
update.getChangeNumber().toString() +
" will not be sent to directory server " +
Short.toString(handler.getServerId()) +
@@ -327,9 +442,389 @@
continue;
}
- handler.add(update, sourceHandler);
+ if (assuredMessage)
+ {
+ // Assured mode: post an assured or not assured matching update
+ // message according to what has been computed for the destination
+ // server
+ if ((expectedServers != null) && expectedServers.contains(handler.
+ getServerId()))
+ {
+ handler.add(update, sourceHandler);
+ } else
+ {
+ if (notAssuredUpdate == null)
+ {
+ notAssuredUpdate = new NotAssuredUpdateMsg(update);
+ }
+ handler.add(notAssuredUpdate, sourceHandler);
+ }
+ } else
+ {
+ handler.add(update, sourceHandler);
+ }
+ }
+ }
+
+ /**
+ * Helper class to be the return type of a method that processes a just
+ * received assured update message:
+ * - processSafeReadUpdateMsg
+ * - processSafeDataUpdateMsg
+ * This is a facility to pack many interesting returned object.
+ */
+ private class PreparedAssuredInfo
+ {
+ /**
+ * The list of servers identified as servers we are interested in
+ * receiving acks from. If this list is not null, then expectedAcksInfo
+ * should be not null.
+ * Servers that are not in this list are servers not elligible for an ack
+ * request.
+ *
+ */
+ public List<Short> expectedServers = null;
+
+ /**
+ * The constructed ExpectedAcksInfo object to be used when acks will be
+ * received. Null if expectedServers is null.
+ */
+ public ExpectedAcksInfo expectedAcksInfo = null;
+ }
+
+ /**
+ * Process a just received assured update message in Safe Read mode. If the
+ * ack can be sent immediately, it is done here. This will also determine to
+ * which suitable servers an ack should be requested from, and which ones are
+ * not elligible for an ack request.
+ * This method is an helper method for the put method. Have a look at the put
+ * method for a better understanding.
+ * @param update The just received assured update to process.
+ * @param sourceHandler The ServerHandler for the server from which the
+ * update was received
+ * @return A suitable PreparedAssuredInfo object that contains every needed
+ * info to proceed with post to server writers.
+ * @throws IOException When an IO exception happens during the update
+ * processing.
+ */
+ private PreparedAssuredInfo processSafeReadUpdateMsg(
+ UpdateMsg update, ServerHandler sourceHandler) throws IOException
+ {
+ ChangeNumber cn = update.getChangeNumber();
+ byte groupId = replicationServer.getGroupId();
+ byte sourceGroupId = sourceHandler.getGroupId();
+ List<Short> expectedServers = new ArrayList<Short>();
+ List<Short> wrongStatusServers = new ArrayList<Short>();
+
+ if (sourceGroupId != groupId)
+ // Assured feature does not cross different group ids
+ {
+ if (sourceHandler.isLDAPserver())
+ {
+ // Look for RS elligible for assured
+ for (ServerHandler handler : replicationServers.values())
+ {
+ if (handler.getGroupId() == groupId)
+ expectedServers.add(handler.getServerId());
+ }
+ }
+
+ // Look for DS elligible for assured
+ for (ServerHandler handler : directoryServers.values())
+ {
+ // Don't forward the change to the server that just sent it
+ if (handler == sourceHandler)
+ {
+ continue;
+ }
+ if (handler.getGroupId() == groupId)
+ {
+ if (handler.getStatus() == ServerStatus.NORMAL_STATUS)
+ {
+ expectedServers.add(handler.getServerId());
+ } else
+ {
+ wrongStatusServers.add(handler.getServerId());
+ }
+ }
+ }
}
+ // Return computed structures
+ PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
+ if (expectedServers.size() > 0)
+ {
+ // Some other acks to wait for
+ preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(cn,
+ sourceHandler, expectedServers, wrongStatusServers);
+ preparedAssuredInfo.expectedServers = expectedServers;
+ }
+
+ if (preparedAssuredInfo.expectedServers == null)
+ {
+ // No elligible servers found, send the ack immediatly
+ AckMsg ack = new AckMsg(cn);
+ sourceHandler.sendAck(ack);
+ }
+
+ return preparedAssuredInfo;
+ }
+
+ /**
+ * Process a just received assured update message in Safe Data mode. If the
+ * ack can be sent immediately, it is done here. This will also determine to
+ * which suitable servers an ack should be requested from, and which ones are
+ * not elligible for an ack request.
+ * This method is an helper method for the put method. Have a look at the put
+ * method for a better understanding.
+ * @param update The just received assured update to process.
+ * @param sourceHandler The ServerHandler for the server from which the
+ * update was received
+ * @return A suitable PreparedAssuredInfo object that contains every needed
+ * info to proceed with post to server writers.
+ * @throws IOException When an IO exception happens during the update
+ * processing.
+ */
+ private PreparedAssuredInfo processSafeDataUpdateMsg(
+ UpdateMsg update, ServerHandler sourceHandler) throws IOException
+ {
+ ChangeNumber cn = update.getChangeNumber();
+ boolean interestedInAcks = true;
+ byte safeDataLevel = update.getSafeDataLevel();
+ byte groupId = replicationServer.getGroupId();
+ byte sourceGroupId = sourceHandler.getGroupId();
+ if (safeDataLevel < (byte) 1)
+ {
+ // Should never happen
+ Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get(
+ Short.toString(replicationServer.getServerId()),
+ Byte.toString(safeDataLevel), baseDn, update.toString());
+ logError(errorMsg);
+ interestedInAcks = false;
+ } else if (sourceGroupId != groupId)
+ {
+ // Assured feature does not cross different group ids
+ interestedInAcks = false;
+ } else
+ {
+ if (sourceHandler.isLDAPserver())
+ {
+ if (safeDataLevel == (byte) 1)
+ {
+ // Immediatly return the ack for an assured message in safe data mode
+ // with safe data level 1, coming from a DS. No need to wait for more
+ // acks
+ AckMsg ack = new AckMsg(cn);
+ sourceHandler.sendAck(ack);
+ interestedInAcks = false; // No further acks to obtain
+ } else
+ {
+ // level > 1 : We need further acks
+ // The message will be posted in assured mode to elligible servers.
+ // The embedded safe data level is not changed, and his value will be
+ // used by a remote RS to determine if he must send an ack (level > 1)
+ // or not (level = 1)
+ }
+ } else
+ { // A RS sent us the safe data message, for sure no futher acks to wait
+ interestedInAcks = false;
+ if (safeDataLevel == (byte) 1)
+ {
+ // The original level was 1 so the RS that sent us this message should
+ // have already sent his ack to the sender DS. Level 1 has already
+ // been reached so no further acks to wait
+ // This should not happen in theory as the sender RS server should
+ // have sent us a matching not assured message so we should not come
+ // to here.
+ } else
+ {
+ // level > 1, so Ack this message to originator RS
+ AckMsg ack = new AckMsg(cn);
+ sourceHandler.sendAck(ack);
+ }
+ }
+ }
+
+ List<Short> expectedServers = new ArrayList<Short>();
+ if (interestedInAcks)
+ {
+ if (sourceHandler.isLDAPserver())
+ {
+ // Look for RS elligible for assured
+ for (ServerHandler handler : replicationServers.values())
+ {
+ if (handler.getGroupId() == groupId)
+ expectedServers.add(handler.getServerId());
+ }
+ }
+
+ // Look for DS elligible for assured
+ for (ServerHandler handler : directoryServers.values())
+ {
+ // Don't forward the change to the server that just sent it
+ if (handler == sourceHandler)
+ {
+ continue;
+ }
+ if (handler.getGroupId() == groupId)
+ expectedServers.add(handler.getServerId());
+ }
+ }
+
+ // Return computed structures
+ PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
+ if (interestedInAcks && (expectedServers.size() > 0))
+ {
+ // Some other acks to wait for
+ preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
+ sourceHandler, update.getSafeDataLevel());
+ preparedAssuredInfo.expectedServers = expectedServers;
+ }
+
+ if (interestedInAcks && (preparedAssuredInfo.expectedServers == null))
+ {
+ // level > 1 and source is a DS but no elligible servers found, send the
+ // ack immediatly
+ AckMsg ack = new AckMsg(cn);
+ sourceHandler.sendAck(ack);
+ }
+
+ return preparedAssuredInfo;
+ }
+
+ /**
+ * Process an ack received from a given server.
+ *
+ * @param ack The ack message received.
+ * @param ackingServer The server handler of the server that sent the ack.
+ */
+ public void processAck(AckMsg ack, ServerHandler ackingServer)
+ {
+ // Retrieve the expected acks info for the update matching the original
+ // sent update.
+ ChangeNumber cn = ack.getChangeNumber();
+ ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
+
+ if (expectedAcksInfo != null)
+ {
+ // Prevent concurrent access from processAck() or AssuredTimeoutTask.run()
+ synchronized (expectedAcksInfo)
+ {
+ if (expectedAcksInfo.isCompleted())
+ {
+ // Timeout code is sending a timeout ack, do nothing and let him
+ // remove object from the map
+ return;
+ }
+ // If this is the last ack we were waiting from, immediatly create and
+ // send the final ack to the original server
+ if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
+ {
+ // Remove the object from the map as no more needed
+ waitingAcks.remove(cn);
+ AckMsg finalAck = expectedAcksInfo.createAck(false);
+ ServerHandler origServer = expectedAcksInfo.getRequesterServer();
+ try
+ {
+ origServer.sendAck(finalAck);
+ } catch (IOException e)
+ {
+ /*
+ * An error happened trying the send back an ack to the server.
+ * Log an error and close the connection to this server.
+ */
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_RS_ERROR_SENDING_ACK.get(
+ Short.toString(replicationServer.getServerId()),
+ Short.toString(origServer.getServerId()), cn.toString(), baseDn));
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ stopServer(origServer);
+ }
+ // Mark the ack info object as completed to prevent potential timeout
+ // code parallel run
+ expectedAcksInfo.completed();
+ }
+ }
+ } else
+ {
+ // The timeout occured for the update matching this change number and the
+ // ack with timeout error has probably already been sent.
+ }
+ }
+
+ /**
+ * The code run when the timeout occurs while waiting for acks of the
+ * elligible servers. This basically sends a timeout ack (with any additional
+ * error info) to the original server that sent an assured update message.
+ */
+ private class AssuredTimeoutTask extends TimerTask
+ {
+ private ChangeNumber cn = null;
+
+ /**
+ * Constructor for the timer task.
+ * @param cn The changenumber of the assured update we are waiting acks for
+ */
+ public AssuredTimeoutTask(ChangeNumber cn)
+ {
+ this.cn = cn;
+ }
+
+ /**
+ * Run when the assured timeout for an assured update message we are waiting
+ * acks for occurs.
+ */
+ public void run()
+ {
+ ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
+
+ if (expectedAcksInfo != null)
+ {
+ synchronized (expectedAcksInfo)
+ {
+ if (expectedAcksInfo.isCompleted())
+ {
+ // processAck() code is sending the ack, do nothing and let him
+ // remove object from the map
+ return;
+ }
+ // Remove the object from the map as no more needed
+ waitingAcks.remove(cn);
+ // Create the timeout ack and send him to the server the assured
+ // update message came from
+ AckMsg finalAck = expectedAcksInfo.createAck(true);
+ ServerHandler origServer = expectedAcksInfo.getRequesterServer();
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + Short.toString(replicationServer.getServerId()) +
+ " for " + baseDn +
+ ", sending timeout for assured update with change " + " number " +
+ cn.toString() + " to server id " +
+ Short.toString(origServer.getServerId()));
+ try
+ {
+ origServer.sendAck(finalAck);
+ } catch (IOException e)
+ {
+ /*
+ * An error happened trying the send back an ack to the server.
+ * Log an error and close the connection to this server.
+ */
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_RS_ERROR_SENDING_ACK.get(
+ Short.toString(replicationServer.getServerId()),
+ Short.toString(origServer.getServerId()), cn.toString(), baseDn));
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ stopServer(origServer);
+ }
+ // Mark the ack info object as completed to prevent potential
+ // processAck() code parallel run
+ expectedAcksInfo.completed();
+ }
+ }
+ }
}
/**
@@ -687,7 +1182,7 @@
* Get the baseDn.
* @return Returns the baseDn.
*/
- public DN getBaseDn()
+ public String getBaseDn()
{
return baseDn;
}
@@ -711,44 +1206,6 @@
}
/**
- * Get the number of currently connected servers.
- *
- * @return the number of currently connected servers.
- */
- private int NumServers()
- {
- return replicationServers.size() + directoryServers.size();
- }
-
- /**
- * Add an ack to the list of ack received for a given change.
- *
- * @param message The ack message received.
- * @param fromServerId The identifier of the server that sent the ack.
- */
- public void ack(AckMsg message, short fromServerId)
- {
- /*
- * there are 2 possible cases here :
- * - the message that was acked comes from a server to which
- * we are directly connected.
- * In this case, we can find the handler from the directoryServers map
- * - the message that was acked comes from a server to which we are not
- * connected.
- * In this case we need to find the replication server that forwarded
- * the change and send back the ack to this server.
- */
- ServerHandler handler = directoryServers.get(
- message.getChangeNumber().getServerId());
- if (handler != null)
- handler.ack(message, fromServerId);
- else
- {
- ServerHandler.ackChangelog(message, fromServerId);
- }
- }
-
- /**
* Retrieves the destination handlers for a routable message.
*
* @param msg The message to route.
@@ -975,56 +1432,6 @@
}
/**
- * Send back an ack to the server that sent the change.
- *
- * @param changeNumber The ChangeNumber of the change that must be acked.
- * @param isLDAPserver This boolean indicates if the server that sent the
- * change was an LDAP server or a ReplicationServer.
- */
- public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
- {
- short serverId = changeNumber.getServerId();
- sendAck(changeNumber, isLDAPserver, serverId);
- }
-
- /**
- *
- * Send back an ack to a server that sent the change.
- *
- * @param changeNumber The ChangeNumber of the change that must be acked.
- * @param isLDAPserver This boolean indicates if the server that sent the
- * change was an LDAP server or a ReplicationServer.
- * @param serverId The identifier of the server from which we
- * received the change..
- */
- public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
- short serverId)
- {
- ServerHandler handler;
- if (isLDAPserver)
- handler = directoryServers.get(serverId);
- else
- handler = replicationServers.get(serverId);
-
- // TODO : check for null handler and log error
- try
- {
- handler.sendAck(changeNumber);
- } catch (IOException e)
- {
- /*
- * An error happened trying the send back an ack to this server.
- * Log an error and close the connection to this server.
- */
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_ERROR_SENDING_ACK.get(this.toString()));
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- stopServer(handler);
- }
- }
-
- /**
* Shutdown this ReplicationServerDomain.
*/
public void shutdown()
@@ -1228,13 +1635,8 @@
{
// Put RS info
rsInfos.add(serverHandler.toRSInfo());
- // Put his DSs info
- Map<Short, LightweightServerHandler> lsList =
- serverHandler.getConnectedDSs();
- for (LightweightServerHandler ls : lsList.values())
- {
- dsInfos.add(ls.toDSInfo());
- }
+
+ serverHandler.addDSInfos(dsInfos);
}
return new TopologyMsg(dsInfos, rsInfos);
@@ -1641,7 +2043,7 @@
if (generationId > 0 && (generationId != handler.getGenerationId()))
{
Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
- baseDn.toNormalizedString(),
+ baseDn,
Short.toString(handler.getServerId()),
Long.toString(handler.getGenerationId()),
Long.toString(generationId));
@@ -1879,10 +2281,13 @@
synchronized (wrkMonitorData)
{
// Here is the RS state : list <serverID, lastChangeNumber>
- // For each LDAP Server, we keep the max CN accross the RSes
+ // For each LDAP Server, we keep the max CN across the RSes
ServerState replServerState = msg.getReplServerDbState();
wrkMonitorData.setMaxCNs(replServerState);
+ // store the remote RS states.
+ wrkMonitorData.setRSState(msg.getsenderID(), replServerState);
+
// Store the remote LDAP servers states
Iterator<Short> lsidIterator = msg.ldapIterator();
while (lsidIterator.hasNext())
@@ -2092,3 +2497,4 @@
}
}
}
+
--
Gitblit v1.10.0