From 45eb21b1354b6925fc058f834f505a9699d1bbbe Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 10 Jun 2009 08:43:50 +0000
Subject: [PATCH] External Changelog - first step - related issues 495, 519
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 542 ++++++++++++++++++++++++++++++++++++++---------------
1 files changed, 384 insertions(+), 158 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index ea99e4b..76aa95b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -26,56 +26,59 @@
*/
package org.opends.server.replication.server;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
-
-import static org.opends.server.loggers.debug.DebugLogger.*;
-
-import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.MonitorProvider;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
-import java.util.Iterator;
+import java.util.concurrent.locks.ReentrantLock;
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
+import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
+import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.locks.ReentrantLock;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AckMsg;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.ProtocolVersion;
/**
* This class define an in-memory cache that will be used to store
@@ -111,8 +114,8 @@
* to this replication server.
*
*/
- private final Map<Short, ServerHandler> directoryServers =
- new ConcurrentHashMap<Short, ServerHandler>();
+ private final Map<Short, DataServerHandler> directoryServers =
+ new ConcurrentHashMap<Short, DataServerHandler>();
/*
* This map contains one ServerHandler for each replication servers
@@ -123,8 +126,11 @@
* We add new TreeSet in the HashMap when a new replication server register
* to this replication server.
*/
- private final Map<Short, ServerHandler> replicationServers =
- new ConcurrentHashMap<Short, ServerHandler>();
+ private final Map<Short, ReplicationServerHandler> replicationServers =
+ new ConcurrentHashMap<Short, ReplicationServerHandler>();
+
+ private final ConcurrentLinkedQueue<MessageHandler> otherHandlers =
+ new ConcurrentLinkedQueue<MessageHandler>();
/*
* This map contains the List of updates received from each
@@ -134,16 +140,14 @@
new ConcurrentHashMap<Short, DbHandler>();
private ReplicationServer replicationServer;
- /* GenerationId management */
+ // GenerationId management
private long generationId = -1;
private boolean generationIdSavedStatus = false;
- /**
- * The tracer object for the debug logger.
- */
+
+ // The tracer object for the debug logger.
private static final DebugTracer TRACER = getTracer();
- /* Monitor data management */
-
+ // Monitor data management
/**
* The monitor data consolidated over the topology.
*/
@@ -346,9 +350,9 @@
/*
* Push the message to the replication servers
*/
- if (sourceHandler.isLDAPserver())
+ if (sourceHandler.isDataServer())
{
- for (ServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : replicationServers.values())
{
/**
* Ignore updates to RS with bad gen id
@@ -397,7 +401,7 @@
/*
* Push the message to the LDAP servers
*/
- for (ServerHandler handler : directoryServers.values())
+ for (DataServerHandler handler : directoryServers.values())
{
// Don't forward the change to the server that just sent it
if (handler == sourceHandler)
@@ -467,6 +471,14 @@
handler.add(update, sourceHandler);
}
}
+
+ // Push the message to the other subscribing handlers
+ Iterator<MessageHandler> otherIter = otherHandlers.iterator();
+ while (otherIter.hasNext())
+ {
+ MessageHandler handler = otherIter.next();
+ handler.add(update, sourceHandler);
+ }
}
/**
@@ -522,10 +534,10 @@
if (sourceGroupId == groupId)
// Assured feature does not cross different group ids
{
- if (sourceHandler.isLDAPserver())
+ if (sourceHandler.isDataServer())
{
// Look for RS eligible for assured
- for (ServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : replicationServers.values())
{
if (handler.getGroupId() == groupId)
// No ack expected from a RS with different group id
@@ -541,7 +553,7 @@
}
// Look for DS eligible for assured
- for (ServerHandler handler : directoryServers.values())
+ for (DataServerHandler handler : directoryServers.values())
{
// Don't forward the change to the server that just sent it
if (handler == sourceHandler)
@@ -636,7 +648,7 @@
(generationId == sourceHandler.getGenerationId()))
// Ignore assured updates from wrong generationId servers
{
- if (sourceHandler.isLDAPserver())
+ if (sourceHandler.isDataServer())
{
if (safeDataLevel == (byte) 1)
{
@@ -689,10 +701,10 @@
List<Short> expectedServers = new ArrayList<Short>();
if (interestedInAcks)
{
- if (sourceHandler.isLDAPserver())
+ if (sourceHandler.isDataServer())
{
// Look for RS eligible for assured
- for (ServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : replicationServers.values())
{
if (handler.getGroupId() == groupId)
// No ack expected from a RS with different group id
@@ -879,7 +891,7 @@
origServer.incrementAssuredSrReceivedUpdatesTimeout();
} else
{
- if (origServer.isLDAPserver())
+ if (origServer.isDataServer())
{
origServer.incrementAssuredSdReceivedUpdatesTimeout();
}
@@ -957,7 +969,7 @@
*/
public void stopReplicationServers(Collection<String> replServers)
{
- for (ServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : replicationServers.values())
{
if (replServers.contains(handler.getServerAddressURL()))
stopServer(handler);
@@ -970,13 +982,13 @@
public void stopAllServers()
{
// Close session with other replication servers
- for (ServerHandler serverHandler : replicationServers.values())
+ for (ReplicationServerHandler serverHandler : replicationServers.values())
{
stopServer(serverHandler);
}
// Close session with other LDAP servers
- for (ServerHandler serverHandler : directoryServers.values())
+ for (DataServerHandler serverHandler : directoryServers.values())
{
stopServer(serverHandler);
}
@@ -988,14 +1000,13 @@
* @param handler the DS we want to check
* @return true if this is not a duplicate server
*/
- public boolean checkForDuplicateDS(ServerHandler handler)
+ public boolean checkForDuplicateDS(DataServerHandler handler)
{
- ServerHandler oldHandler = directoryServers.get(handler.getServerId());
+ DataServerHandler oldHandler = directoryServers.get(handler.getServerId());
if (directoryServers.containsKey(handler.getServerId()))
{
// looks like two LDAP servers have the same serverId
- // log an error message and drop this connection.
Message message = ERR_DUPLICATE_SERVER_ID.get(
replicationServer.getMonitorInstanceName(), oldHandler.toString(),
handler.toString(), handler.getServerId());
@@ -1012,6 +1023,12 @@
*/
public void stopServer(ServerHandler handler)
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " domain=" + this +
+ " stopServer(SH)" + handler.getMonitorInstanceName() +
+ " " + stackTraceToSingleLineString(new Exception()));
/*
* We must prevent deadlock on replication server domain lock, when for
* instance this code is called from dying ServerReader but also dying
@@ -1020,14 +1037,103 @@
* or not (is already being processed or not).
*/
if (!handler.engageShutdown())
- // Only do this once (prevent other thread to enter here again)
+ // Only do this once (prevent other thread to enter here again)
{
- if (debugEnabled())
- TRACER.debugInfo(
- "In RS " + this.replicationServer.getMonitorInstanceName() +
- " for " + baseDn + " " +
- " stopServer " + handler.getMonitorInstanceName());
+ try
+ {
+ try
+ {
+ // Acquire lock on domain (see more details in comment of start()
+ // method of ServerHandler)
+ lock();
+ } catch (InterruptedException ex)
+ {
+ // Try doing job anyway...
+ }
+ if (handler.isReplicationServer())
+ {
+ if (replicationServers.containsValue(handler))
+ {
+ unregisterServerHandler(handler);
+ handler.shutdown();
+
+ // Check if generation id has to be resetted
+ mayResetGenerationId();
+ // Warn our DSs that a RS or DS has quit (does not use this
+ // handler as already removed from list)
+ buildAndSendTopoInfoToDSs(null);
+ }
+ } else
+ {
+ if (directoryServers.containsValue(handler))
+ {
+ // If this is the last DS for the domain,
+ // shutdown the status analyzer
+ if (directoryServers.size() == 1)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In " +
+ replicationServer.getMonitorInstanceName() +
+ " remote server " + handler.getMonitorInstanceName() +
+ " is the last DS to be stopped: stopping status analyzer");
+ stopStatusAnalyzer();
+ }
+
+ unregisterServerHandler(handler);
+ handler.shutdown();
+
+ // Check if generation id has to be resetted
+ mayResetGenerationId();
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ buildAndSendTopoInfoToRSs();
+ // Warn our DSs that a RS or DS has quit (does not use this
+ // handler as already removed from list)
+ buildAndSendTopoInfoToDSs(null);
+ }
+ else if (otherHandlers.contains(handler))
+ {
+ unRegisterHandler(handler);
+ handler.shutdown();
+ }
+ }
+
+ }
+ catch(Exception e)
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE,
+ stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
+ release();
+ }
+ }
+ }
+
+ /**
+ * Stop the handler.
+ * @param handler The handler to stop.
+ */
+ public void stopServer(MessageHandler handler)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In RS " + this.replicationServer.getMonitorInstanceName() +
+ " domain=" + this +
+ " stopServer(MH)" + handler.getMonitorInstanceName() +
+ " " + stackTraceToSingleLineString(new Exception()));
+ /*
+ * We must prevent deadlock on replication server domain lock, when for
+ * instance this code is called from dying ServerReader but also dying
+ * ServerWriter at the same time, or from a thread that wants to shut down
+ * the handler. So use a thread safe flag to know if the job must be done
+ * or not (is already being processed or not).
+ */
+ if (!handler.engageShutdown())
+ // Only do this once (prevent other thread to enter here again)
+ {
try
{
// Acquire lock on domain (see more details in comment of start()
@@ -1037,57 +1143,40 @@
{
// Try doing job anyway...
}
-
- if (handler.isReplicationServer())
+ if (otherHandlers.contains(handler))
{
- if (replicationServers.containsValue(handler))
- {
- replicationServers.remove(handler.getServerId());
- handler.shutdown();
-
- // Check if generation id has to be resetted
- mayResetGenerationId();
- // Warn our DSs that a RS or DS has quit (does not use this
- // handler as already removed from list)
- sendTopoInfoToDSs(null);
- }
- } else
- {
- if (directoryServers.containsValue(handler))
- {
- // If this is the last DS for the domain, shutdown the status analyzer
- if (directoryServers.size() == 1)
- {
- if (debugEnabled())
- TRACER.debugInfo("In " +
- replicationServer.getMonitorInstanceName() +
- " remote server " + handler.getMonitorInstanceName() +
- " is the last DS to be stopped: stopping status analyzer");
- stopStatusAnalyzer();
- }
-
- directoryServers.remove(handler.getServerId());
- handler.shutdown();
-
- // Check if generation id has to be resetted
- mayResetGenerationId();
- // Update the remote replication servers with our list
- // of connected LDAP servers
- sendTopoInfoToRSs();
- // Warn our DSs that a RS or DS has quit (does not use this
- // handler as already removed from list)
- sendTopoInfoToDSs(null);
- }
+ unRegisterHandler(handler);
+ handler.shutdown();
}
+ }
+ release();
+ }
- release();
+ /**
+ * Unregister this handler from the list of handlers registered to this
+ * domain.
+ * @param handler the provided handler to unregister.
+ */
+ protected void unregisterServerHandler(ServerHandler handler)
+ {
+ if (handler.isReplicationServer())
+ {
+ replicationServers.remove(handler.getServerId());
+ }
+ else
+ {
+ directoryServers.remove(handler.getServerId());
}
}
/**
- * Resets the generationId for this domain if there is no LDAP
- * server currently connected and if the generationId has never
- * been saved.
+ * This method resets the generationId for this domain if there is no LDAP
+ * server currently connected in the whole topology on this domain and
+ * if the generationId has never been saved.
+ *
+ * - test emtpyness of directoryServers list
+ * - traverse replicationServers list and test for each if DS are connected
+ * So it strongly relies on the directoryServers list
*/
protected void mayResetGenerationId()
{
@@ -1104,7 +1193,7 @@
boolean lDAPServersConnectedInTheTopology = false;
if (directoryServers.isEmpty())
{
- for (ServerHandler rsh : replicationServers.values())
+ for (ReplicationServerHandler rsh : replicationServers.values())
{
if (generationId != rsh.getGenerationId())
{
@@ -1149,14 +1238,16 @@
}
/**
- * Checks that a RS is not already connected.
- *
- * @param handler the RS we want to check
- * @return true if this is not a duplicate server
+ * Checks that a remote RS is not already connected to this hosting RS.
+ * @param handler The handler for the remote RS.
+ * @return flag specifying whether the remote RS is already connected.
+ * @throws DirectoryException when a problem occurs.
*/
- public boolean checkForDuplicateRS(ServerHandler handler)
+ public boolean checkForDuplicateRS(ReplicationServerHandler handler)
+ throws DirectoryException
{
- ServerHandler oldHandler = replicationServers.get(handler.getServerId());
+ ReplicationServerHandler oldHandler =
+ replicationServers.get(handler.getServerId());
if ((oldHandler != null))
{
if (oldHandler.getServerAddressURL().equals(
@@ -1166,7 +1257,9 @@
// have been sent at about the same time and 2 connections
// have been established.
// Silently drop this connection.
- } else
+ return false;
+ }
+ else
{
// looks like two replication servers have the same serverId
// log an error message and drop this connection.
@@ -1174,9 +1267,8 @@
replicationServer.getMonitorInstanceName(), oldHandler.
getServerAddressURL(), handler.getServerAddressURL(),
handler.getServerId());
- logError(message);
+ throw new DirectoryException(ResultCode.OTHER, message);
}
- return false;
}
return true;
}
@@ -1223,7 +1315,7 @@
{
LinkedHashSet<String> mySet = new LinkedHashSet<String>();
- for (ServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : replicationServers.values())
{
mySet.add(handler.getServerAddressURL());
}
@@ -1232,7 +1324,9 @@
}
/**
- * Return a Set containing the servers known by this replicationServer.
+ * Return a set containing the server that produced update and known by
+ * this replicationServer from all over the topology,
+ * whatever directly connected of connected to another RS.
* @return a set containing the servers known by this replicationServer.
*/
public Set<Short> getServers()
@@ -1250,7 +1344,7 @@
{
List<String> mySet = new ArrayList<String>(0);
- for (ServerHandler handler : directoryServers.values())
+ for (DataServerHandler handler : directoryServers.values())
{
mySet.add(String.valueOf(handler.getServerId()));
}
@@ -1348,7 +1442,7 @@
{
// Send to all replication servers with a least one remote
// server connected
- for (ServerHandler rsh : replicationServers.values())
+ for (ReplicationServerHandler rsh : replicationServers.values())
{
if (rsh.hasRemoteLDAPServers())
{
@@ -1358,7 +1452,7 @@
}
// Sends to all connected LDAP servers
- for (ServerHandler destinationHandler : directoryServers.values())
+ for (DataServerHandler destinationHandler : directoryServers.values())
{
// Don't loop on the sender
if (destinationHandler == senderHandler)
@@ -1368,7 +1462,7 @@
} else
{
// Destination is one server
- ServerHandler destinationHandler =
+ DataServerHandler destinationHandler =
directoryServers.get(msg.getDestination());
if (destinationHandler != null)
{
@@ -1378,9 +1472,9 @@
// the targeted server is NOT connected
// Let's search for THE changelog server that MAY
// have the targeted server connected.
- if (senderHandler.isLDAPserver())
+ if (senderHandler.isDataServer())
{
- for (ServerHandler h : replicationServers.values())
+ for (ReplicationServerHandler h : replicationServers.values())
{
// Send to all replication servers with a least one remote
// server connected
@@ -1421,7 +1515,7 @@
// build the full list of all servers in the topology
// and send back a MonitorMsg with the full list of all the servers
// in the topology.
- if (senderHandler.isLDAPserver())
+ if (senderHandler.isDataServer())
{
MonitorMsg returnMsg =
new MonitorMsg(msg.getDestination(), msg.getsenderID());
@@ -1481,7 +1575,7 @@
// from the states stored in the serverHandler.
// - the server state
// - the older missing change
- for (ServerHandler lsh : this.directoryServers.values())
+ for (DataServerHandler lsh : this.directoryServers.values())
{
monitorMsg.setServerState(
lsh.getServerId(),
@@ -1491,7 +1585,7 @@
}
// Same for the connected RS
- for (ServerHandler rsh : this.replicationServers.values())
+ for (ReplicationServerHandler rsh : this.replicationServers.values())
{
monitorMsg.setServerState(
rsh.getServerId(),
@@ -1657,12 +1751,12 @@
*/
public void checkAllSaturation() throws IOException
{
- for (ServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : replicationServers.values())
{
handler.checkWindow();
}
- for (ServerHandler handler : directoryServers.values())
+ for (DataServerHandler handler : directoryServers.values())
{
handler.checkWindow();
}
@@ -1675,15 +1769,15 @@
* @return true if the server can restart sending changes.
* false if the server can't restart sending changes.
*/
- public boolean restartAfterSaturation(ServerHandler sourceHandler)
+ public boolean restartAfterSaturation(MessageHandler sourceHandler)
{
- for (ServerHandler handler : replicationServers.values())
+ for (MessageHandler handler : replicationServers.values())
{
if (!handler.restartAfterSaturation(sourceHandler))
return false;
}
- for (ServerHandler handler : directoryServers.values())
+ for (MessageHandler handler : directoryServers.values())
{
if (!handler.restartAfterSaturation(sourceHandler))
return false;
@@ -1698,9 +1792,9 @@
* @param notThisOne If not null, the topology message will not be sent to
* this passed server.
*/
- public void sendTopoInfoToDSs(ServerHandler notThisOne)
+ public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne)
{
- for (ServerHandler handler : directoryServers.values())
+ for (DataServerHandler handler : directoryServers.values())
{
if ((notThisOne == null) || // All DSs requested
((notThisOne != null) && (handler != notThisOne)))
@@ -1725,10 +1819,10 @@
* Send a TopologyMsg to all the connected replication servers
* in order to let them know our connected LDAP servers.
*/
- public void sendTopoInfoToRSs()
+ public void buildAndSendTopoInfoToRSs()
{
TopologyMsg topoMsg = createTopologyMsgForRS();
- for (ServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : replicationServers.values())
{
try
{
@@ -1755,7 +1849,7 @@
List<DSInfo> dsInfos = new ArrayList<DSInfo>();
// Go through every DSs
- for (ServerHandler serverHandler : directoryServers.values())
+ for (DataServerHandler serverHandler : directoryServers.values())
{
dsInfos.add(serverHandler.toDSInfo());
}
@@ -1785,7 +1879,7 @@
List<RSInfo> rsInfos = new ArrayList<RSInfo>();
// Go through every DSs (except recipient of msg)
- for (ServerHandler serverHandler : directoryServers.values())
+ for (DataServerHandler serverHandler : directoryServers.values())
{
if (serverHandler.getServerId() == destDsId)
continue;
@@ -1799,7 +1893,7 @@
// Go through every peer RSs (and get their connected DSs), also add info
// for RSs
- for (ServerHandler serverHandler : replicationServers.values())
+ for (ReplicationServerHandler serverHandler : replicationServers.values())
{
// Put RS info
rsInfos.add(serverHandler.toRSInfo());
@@ -1840,12 +1934,6 @@
synchronized public long setGenerationId(long generationId,
boolean savedStatus)
{
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " baseDN=" + baseDn +
- " RCache.set GenerationId=" + generationId);
-
long oldGenerationId = this.generationId;
if (this.generationId != generationId)
@@ -1916,7 +2004,7 @@
// After we'll have sent the message , the remote RS will adopt
// the new genId
rsHandler.setGenerationId(newGenId);
- if (senderHandler.isLDAPserver())
+ if (senderHandler.isDataServer())
{
rsHandler.forwardGenerationIdToRS(genIdMsg);
}
@@ -1929,7 +2017,7 @@
// Change status of the connected DSs according to the requested new
// reference generation id
- for (ServerHandler dsHandler : directoryServers.values())
+ for (DataServerHandler dsHandler : directoryServers.values())
{
try
{
@@ -1948,8 +2036,8 @@
// (consecutive to reset gen id message), we prefer advertising once for
// all after changes (less packet sent), here at the end of the reset msg
// treatment.
- sendTopoInfoToDSs(null);
- sendTopoInfoToRSs();
+ buildAndSendTopoInfoToDSs(null);
+ buildAndSendTopoInfoToRSs();
Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(),
Long.toString(newGenId));
@@ -1964,7 +2052,7 @@
* that changed his status.
* @param csMsg The message containing the new status
*/
- public void processNewStatus(ServerHandler senderHandler,
+ public void processNewStatus(DataServerHandler senderHandler,
ChangeStatusMsg csMsg)
{
if (debugEnabled())
@@ -1995,8 +2083,8 @@
}
// Update every peers (RS/DS) with topology changes
- sendTopoInfoToDSs(senderHandler);
- sendTopoInfoToRSs();
+ buildAndSendTopoInfoToDSs(senderHandler);
+ buildAndSendTopoInfoToRSs();
Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
Short.toString(senderHandler.getServerId()),
@@ -2014,7 +2102,7 @@
* @param event The event to be used for new status computation
* @return True if we have been interrupted (must stop), false otherwise
*/
- public boolean changeStatusFromStatusAnalyzer(ServerHandler serverHandler,
+ public boolean changeStatusFromStatusAnalyzer(DataServerHandler serverHandler,
StatusMachineEvent event)
{
try
@@ -2066,8 +2154,8 @@
}
// Update every peers (RS/DS) with topology changes
- sendTopoInfoToDSs(serverHandler);
- sendTopoInfoToRSs();
+ buildAndSendTopoInfoToDSs(serverHandler);
+ buildAndSendTopoInfoToRSs();
release();
return false;
@@ -2169,10 +2257,12 @@
* @param allowResetGenId True for allowing to reset the generation id (
* when called after initial handshake)
* @throws IOException If an error occurred.
+ * @throws DirectoryException If an error occurred.
*/
- public void receiveTopoInfoFromRS(TopologyMsg topoMsg, ServerHandler handler,
+ public void receiveTopoInfoFromRS(TopologyMsg topoMsg,
+ ReplicationServerHandler handler,
boolean allowResetGenId)
- throws IOException
+ throws IOException, DirectoryException
{
if (debugEnabled())
{
@@ -2186,7 +2276,8 @@
{
// Acquire lock on domain (see more details in comment of start() method
// of ServerHandler)
- lock();
+ if (!hasLock())
+ lock();
} catch (InterruptedException ex)
{
// Try doing job anyway...
@@ -2228,7 +2319,7 @@
* Sends the currently known topology information to every connected
* DS we have.
*/
- sendTopoInfoToDSs(null);
+ buildAndSendTopoInfoToDSs(null);
release();
}
@@ -2441,7 +2532,7 @@
{
// this is the latency of the remote RSi regarding another RSj
// let's update the latency of the LSes connected to RSj
- ServerHandler rsjHdr = replicationServers.get(rsid);
+ ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
if (rsjHdr != null)
{
for (short remotelsid : rsjHdr.getConnectedDirectoryServerIds())
@@ -2496,7 +2587,7 @@
* Get the map of connected DSs.
* @return The map of connected DSs
*/
- public Map<Short, ServerHandler> getConnectedDSs()
+ public Map<Short, DataServerHandler> getConnectedDSs()
{
return directoryServers;
}
@@ -2505,7 +2596,7 @@
* Get the map of connected RSs.
* @return The map of connected RSs
*/
- public Map<Short, ServerHandler> getConnectedRSs()
+ public Map<Short, ReplicationServerHandler> getConnectedRSs()
{
return replicationServers;
}
@@ -2708,5 +2799,140 @@
return attributes;
}
-}
+ /**
+ * Register in the domain an handler that subscribes to changes.
+ * @param handler the provided subscribing handler.
+ */
+ public void registerHandler(MessageHandler handler)
+ {
+ this.otherHandlers.add(handler);
+ }
+
+ /**
+ * Unregister from the domain an handler.
+ * @param handler the provided unsubscribing handler.
+ * @return Whether this handler has been unregistered with success.
+ */
+ public boolean unRegisterHandler(MessageHandler handler)
+ {
+ return this.otherHandlers.remove(handler);
+ }
+
+ /**
+ * Return the state that contain for each server the time of eligibility.
+ * @return the state.
+ */
+ public ServerState getHeartbeatState()
+ {
+ // TODO:ECL Eligility must be supported
+ return this.getDbServerState();
+ }
+ /**
+ * Computes the change number eligible to the ECL.
+ * @return null if the domain does not play in eligibility.
+ */
+ public ChangeNumber computeEligibleCN()
+ {
+ ChangeNumber elligibleCN = null;
+ ServerState heartbeatState = getHeartbeatState();
+
+ if (heartbeatState==null)
+ return null;
+
+ // compute elligible CN
+ ServerState hbState = heartbeatState.duplicate();
+
+ Iterator<Short> it = hbState.iterator();
+ while (it.hasNext())
+ {
+ short sid = it.next();
+ ChangeNumber storedCN = hbState.getMaxChangeNumber(sid);
+
+ // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
+ // then the server is considered down and not considered for eligibility
+ if (TimeThread.getTime()-storedCN.getTime()>2000)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "For RSD." + this.baseDn + " Server " + sid
+ + " is not considered for eligibility ... potentially down");
+ continue;
+ }
+
+ if ((elligibleCN == null) || (storedCN.older(elligibleCN)))
+ {
+ elligibleCN = storedCN;
+ }
+ }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "For RSD." + this.baseDn + " ElligibleCN()=" + elligibleCN);
+ return elligibleCN;
+ }
+
+ /**
+ * Computes the eligible server state by minimizing the dbServerState and the
+ * elligibleCN.
+ * @return The computed eligible server state.
+ */
+ public ServerState getCLElligibleState()
+ {
+ // ChangeNumber elligibleCN = computeEligibleCN();
+ ServerState res = new ServerState();
+ ServerState dbState = this.getDbServerState();
+ res = dbState;
+
+ /* TODO:ECL Eligibility is not yet implemented
+ Iterator<Short> it = dbState.iterator();
+ while (it.hasNext())
+ {
+ Short sid = it.next();
+ DbHandler h = sourceDbHandlers.get(sid);
+ ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
+ try
+ {
+ if ((elligibleCN!=null)&&(elligibleCN.older(dbCN)))
+ {
+ // some CN exist in the db newer than elligible CN
+ ReplicationIterator ri = h.generateIterator(elligibleCN);
+ ChangeNumber newCN = ri.getCurrentCN();
+ res.update(newCN);
+ ri.releaseCursor();
+ }
+ else
+ {
+ // no CN exist in the db newer than elligible CN
+ res.update(dbCN);
+ }
+ }
+ catch(Exception e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ */
+
+ if (debugEnabled())
+ TRACER.debugInfo("In " + this.getName()
+ + " getCLElligibleState returns:" + res);
+ return res;
+ }
+
+ /**
+ * Returns the start state of the domain, made of the first (oldest)
+ * change stored for each serverId.
+ * @return t start state of the domain.
+ */
+ public ServerState getStartState()
+ {
+ ServerState domainStartState = new ServerState();
+ Iterator<Short> it = this.getDbServerState().iterator();
+ for (DbHandler dbHandler : sourceDbHandlers.values())
+ {
+ domainStartState.update(dbHandler.getFirstChange());
+ }
+ return domainStartState;
+ }
+}
--
Gitblit v1.10.0