From 4d90aff1b4e079be6e32e3f880e328883dd534ee Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 30 Mar 2011 19:21:16 +0000
Subject: [PATCH] Fix issue OpenDJ-96: Replication server monitor data computation takes too long / blocks rest of server when another RS is cannot be reached
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 758 +++++++++++++++++++++++++---------------------------------
1 files changed, 325 insertions(+), 433 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 5e73527..4358278 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -41,15 +41,7 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.*;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -70,7 +62,6 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.WorkflowImpl;
import org.opends.server.core.networkgroups.NetworkGroup;
-import org.opends.server.loggers.LogLevel;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.ProtocolSession;
@@ -100,7 +91,6 @@
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import com.sleepycat.je.DatabaseException;
-import java.util.Collections;
/**
* ReplicationServer Listener.
@@ -113,7 +103,7 @@
* It is responsible for creating the replication server replicationServerDomain
* and managing it
*/
-public class ReplicationServer
+public final class ReplicationServer
implements ConfigurationChangeListener<ReplicationServerCfg>,
BackupTaskListener, RestoreTaskListener, ImportTaskListener,
ExportTaskListener
@@ -131,8 +121,8 @@
/* This table is used to store the list of dn for which we are currently
* handling servers.
*/
- private ConcurrentHashMap<String, ReplicationServerDomain> baseDNs =
- new ConcurrentHashMap<String, ReplicationServerDomain>();
+ private final Map<String, ReplicationServerDomain> baseDNs =
+ new HashMap<String, ReplicationServerDomain>();
private String localURL = "null";
private volatile boolean shutdown = false;
@@ -155,11 +145,6 @@
// ID of the backend
private static final String backendId = "replicationChanges";
- // At startup, the listen thread wait on this flag for the connect
- // thread to look for other servers in the topology.
- private boolean connectedInTopology = false;
- private final Object connectedInTopologyLock = new Object();
-
/*
* Assured mode properties
*/
@@ -180,10 +165,20 @@
// The handler of the draft change numbers database, the database used to
// store the relation between a draft change number ('seqnum') and the
// associated cookie.
+ //
+ // Guarded by draftCNLock
+ //
private DraftCNDbHandler draftCNDbHandler;
+
// The last value generated of the draft change number.
+ //
+ // Guarded by draftCNLock
+ //
private int lastGeneratedDraftCN = 0;
+ // Used for protecting draft CN related state.
+ private final Object draftCNLock = new Object();
+
/**
* The tracer object for the debug logger.
*/
@@ -191,13 +186,15 @@
private static String externalChangeLogWorkflowID =
"External Changelog Workflow ID";
- ECLWorkflowElement eclwe;
- WorkflowImpl externalChangeLogWorkflowImpl = null;
+ private ECLWorkflowElement eclwe;
+ private WorkflowImpl externalChangeLogWorkflowImpl = null;
private static HashSet<Integer> localPorts = new HashSet<Integer>();
- // used to synchronize the domain creation with the connect thread.
- final private Object domainMonitor = new Object();
+ // Monitors for synchronizing domain creation with the connect thread.
+ private final Object domainTicketLock = new Object();
+ private final Object connectThreadLock = new Object();
+ private long domainTicket = 0L;
// ServiceIDs excluded for ECL
private ArrayList<String> excludedServiceIDs = new ArrayList<String>();
@@ -314,22 +311,6 @@
void runListen()
{
- // wait for the connect thread to find other replication
- // servers in the topology before starting to accept connections
- // from the ldap servers.
- synchronized (connectedInTopologyLock)
- {
- if (connectedInTopology == false)
- {
- try
- {
- connectedInTopologyLock.wait(1000);
- } catch (InterruptedException e)
- {
- }
- }
- }
-
while ((shutdown == false) && (stopListen == false))
{
// Wait on the replicationServer port.
@@ -419,88 +400,91 @@
*/
void runConnect()
{
- while (shutdown == false)
+ synchronized (connectThreadLock)
{
- /*
- * periodically check that we are connected to all other
- * replication servers and if not establish the connection
- */
- for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
+ while (!shutdown)
{
- Set<String> connectedReplServers =
- replicationServerDomain.getChangelogs();
/*
- * check that all replication server in the config are in the connected
- * Set. If not create the connection
+ * periodically check that we are connected to all other replication
+ * servers and if not establish the connection
*/
- for (String serverURL : replicationServers)
+ for (ReplicationServerDomain domain : getReplicationServerDomains())
{
- int separator = serverURL.lastIndexOf(':');
- String port = serverURL.substring(separator + 1);
- String hostname = serverURL.substring(0, separator);
+ Set<String> connectedReplServers = domain.getChangelogs();
- try
+ /*
+ * check that all replication server in the config are in the
+ * connected Set. If not create the connection
+ */
+ for (String serverURL : replicationServers)
{
- InetAddress inetAddress = InetAddress.getByName(hostname);
- String serverAddress = inetAddress.getHostAddress() + ":" + port;
- String alternServerAddress = null;
- if (hostname.equalsIgnoreCase("localhost"))
+ int separator = serverURL.lastIndexOf(':');
+ String port = serverURL.substring(separator + 1);
+ String hostname = serverURL.substring(0, separator);
+ try
{
- // if "localhost" was used as the hostname in the configuration
- // also check is the connection is already opened with the
- // local address.
- alternServerAddress =
- InetAddress.getLocalHost().getHostAddress() + ":" + port;
- }
- if (inetAddress.equals(InetAddress.getLocalHost()))
- {
- // if the host address is the local one, also check
- // if the connection is already opened with the "localhost"
- // address
- alternServerAddress = "127.0.0.1" + ":" + port;
- }
+ InetAddress inetAddress = InetAddress
+ .getByName(hostname);
+ String serverAddress = inetAddress.getHostAddress()
+ + ":" + port;
+ String alternServerAddress = null;
- if ((serverAddress.compareTo("127.0.0.1:" + replicationPort) != 0)
- && (serverAddress.compareTo(this.localURL) != 0)
- && (!connectedReplServers.contains(serverAddress)
- && ((alternServerAddress == null)
- || !connectedReplServers.contains(alternServerAddress))))
+ if (hostname.equalsIgnoreCase("localhost"))
+ {
+ // if "localhost" was used as the hostname in the configuration
+ // also check is the connection is already opened with the
+ // local address.
+ alternServerAddress = InetAddress.getLocalHost()
+ .getHostAddress() + ":" + port;
+ }
+
+ if (inetAddress.equals(InetAddress.getLocalHost()))
+ {
+ // if the host address is the local one, also check
+ // if the connection is already opened with the "localhost"
+ // address
+ alternServerAddress = "127.0.0.1" + ":" + port;
+ }
+
+ if ((serverAddress.compareTo("127.0.0.1:"
+ + replicationPort) != 0)
+ && (serverAddress.compareTo(this.localURL) != 0)
+ && (!connectedReplServers.contains(serverAddress)
+ && ((alternServerAddress == null) || !connectedReplServers
+ .contains(alternServerAddress))))
+ {
+ connect(serverURL, domain.getBaseDn());
+ }
+ }
+ catch (IOException e)
{
- this.connect(serverURL, replicationServerDomain.getBaseDn());
+ Message message = ERR_COULD_NOT_SOLVE_HOSTNAME
+ .get(hostname);
+ logError(message);
}
}
- catch (IOException e)
- {
- Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hostname);
- logError(message);
- }
}
- }
- synchronized (connectedInTopologyLock)
- {
- // wake up the listen thread if necessary.
- if (connectedInTopology == false)
+
+ // Notify any threads waiting with domain tickets after each iteration.
+ synchronized (domainTicketLock)
{
- connectedInTopologyLock.notify();
- connectedInTopology = true;
+ domainTicket++;
+ domainTicketLock.notifyAll();
}
- }
- try
- {
- synchronized(domainMonitor)
+
+ // Retry each second.
+ final int randomizer = (int) (Math.random() * 100);
+ try
{
- domainMonitor.notifyAll();
+ // Releases lock, allows threads to get domain ticket.
+ connectThreadLock.wait(1000 + randomizer);
}
- synchronized (this)
+ catch (InterruptedException e)
{
- /* check if we are connected every second */
- int randomizer = (int)(Math.random()*100);
- wait(1000 + randomizer);
+ // Signalled to shutdown.
+ return;
}
- } catch (InterruptedException e)
- {
- // ignore error, will try to connect again or shutdown
}
}
}
@@ -721,40 +705,50 @@
private void shutdownECL()
{
- WorkflowImpl eclwf =
- (WorkflowImpl)WorkflowImpl.getWorkflow(externalChangeLogWorkflowID);
+ WorkflowImpl eclwf = (WorkflowImpl) WorkflowImpl
+ .getWorkflow(externalChangeLogWorkflowID);
// do it only if not already done by another RS (unit test case)
// if (DirectoryServer.getWorkflowElement(externalChangeLogWorkflowID)
- if (eclwf!=null)
+ if (eclwf != null)
{
+ // FIXME:ECL should the ECL Workflow be registered in
+ // internalNetworkGroup?
+ NetworkGroup internalNetworkGroup = NetworkGroup
+ .getInternalNetworkGroup();
+ internalNetworkGroup
+ .deregisterWorkflow(externalChangeLogWorkflowID);
+ // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
+ NetworkGroup adminNetworkGroup = NetworkGroup
+ .getAdminNetworkGroup();
+ adminNetworkGroup
+ .deregisterWorkflow(externalChangeLogWorkflowID);
- // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup?
- NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup();
- internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
+ NetworkGroup defaultNetworkGroup = NetworkGroup
+ .getDefaultNetworkGroup();
+ defaultNetworkGroup
+ .deregisterWorkflow(externalChangeLogWorkflowID);
- // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
- NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
- adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
-
- NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
- defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
-
- eclwf.deregister();
- eclwf.finalizeWorkflow();
+ eclwf.deregister();
+ eclwf.finalizeWorkflow();
}
- eclwe = (ECLWorkflowElement)
- DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
- if (eclwe!=null)
+ eclwe = (ECLWorkflowElement) DirectoryServer
+ .getWorkflowElement("EXTERNAL CHANGE LOG");
+ if (eclwe != null)
{
DirectoryServer.deregisterWorkflowElement(eclwe);
eclwe.finalizeWorkflowElement();
}
- if (draftCNDbHandler != null)
- draftCNDbHandler.shutdown();
+ synchronized (draftCNLock)
+ {
+ if (draftCNDbHandler != null)
+ {
+ draftCNDbHandler.shutdown();
+ }
+ }
}
/**
@@ -790,38 +784,60 @@
public ReplicationServerDomain getReplicationServerDomain(String baseDn,
boolean create, boolean waitConnections)
{
- ReplicationServerDomain replicationServerDomain;
+ ReplicationServerDomain domain;
synchronized (baseDNs)
{
- replicationServerDomain = baseDNs.get(baseDn);
- if ((replicationServerDomain == null) && (create))
+ domain = baseDNs.get(baseDn);
+
+ if (domain != null ||!create) {
+ return domain;
+ }
+
+ domain = new ReplicationServerDomain(baseDn, this);
+ baseDNs.put(baseDn, domain);
+ }
+
+ if (waitConnections)
+ {
+ // Acquire a domain ticket and wait for a complete cycle of the connect
+ // thread.
+ final long myDomainTicket;
+ synchronized (connectThreadLock)
{
- replicationServerDomain = new ReplicationServerDomain(baseDn, this);
- baseDNs.put(baseDn, replicationServerDomain);
- synchronized (domainMonitor)
+ // Connect thread must be waiting.
+ synchronized (domainTicketLock)
{
- if (waitConnections)
+ // Determine the ticket which will be used in the next connect thread
+ // iteration.
+ myDomainTicket = domainTicket + 1;
+ }
+
+ // Wake up connect thread.
+ connectThreadLock.notify();
+ }
+
+ // Wait until the connect thread has processed next connect phase.
+ synchronized (domainTicketLock)
+ {
+ // Condition.
+ while (myDomainTicket > domainTicket && !shutdown)
+ {
+ try
{
- synchronized (this)
- {
- // kick up the connect thread so that this new domain
- // gets connected to all the Replication Servers.
- this.notify();
- }
- try
- {
- // wait for the connect thread to signal that it finished its job
- domainMonitor.wait(500);
- } catch (InterruptedException e)
- {
- }
+ // Wait with timeout so that we detect shutdown.
+ domainTicketLock.wait(500);
+ }
+ catch (InterruptedException e)
+ {
+ // Can't do anything with this.
+ Thread.currentThread().interrupt();
}
}
}
}
- return replicationServerDomain;
+ return domain;
}
/**
@@ -861,9 +877,9 @@
}
// shutdown all the ChangelogCaches
- for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
+ for (ReplicationServerDomain domain : getReplicationServerDomains())
{
- replicationServerDomain.shutdown();
+ domain.shutdown();
}
shutdownECL();
@@ -894,37 +910,60 @@
return new DbHandler(id, baseDn, this, dbEnv, queueSize);
}
+
+
/**
* Clears the generationId for the replicationServerDomain related to the
* provided baseDn.
- * @param baseDn The baseDn for which to delete the generationId.
- * @throws DatabaseException When it occurs.
+ *
+ * @param baseDn
+ * The baseDn for which to delete the generationId.
*/
public void clearGenerationId(String baseDn)
- throws DatabaseException
{
try
{
dbEnv.clearGenerationId(baseDn);
+ }
+ catch (Exception e)
+ {
+ // Ignore.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ }
+ }
- if (this.draftCNDbHandler != null)
+ synchronized (draftCNLock)
+ {
+ if (draftCNDbHandler != null)
{
try
{
- try
+ draftCNDbHandler.clear(baseDn);
+ }
+ catch (Exception e)
+ {
+ // Ignore.
+ if (debugEnabled())
{
- draftCNDbHandler.clear(baseDn);
+ TRACER.debugCaught(DebugLogLevel.WARNING, e);
}
- catch(Exception e){}
+ }
+
+ try
+ {
lastGeneratedDraftCN = draftCNDbHandler.getLastKey();
}
- catch(Exception e) {}
+ catch (Exception e)
+ {
+ // Ignore.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ }
+ }
}
-
- }
- catch(Exception e)
- {
- TRACER.debugCaught(LogLevel.ALL, e);
}
}
@@ -993,7 +1032,7 @@
{
purgeDelay = newPurgeDelay;
// propagate
- for (ReplicationServerDomain domain : baseDNs.values())
+ for (ReplicationServerDomain domain : getReplicationServerDomains())
{
domain.setPurgeDelay(purgeDelay*1000);
}
@@ -1043,64 +1082,71 @@
// Update threshold value for status analyzers (stop them if requested
// value is 0)
- if (degradedStatusThreshold != configuration.getDegradedStatusThreshold())
+ if (degradedStatusThreshold != configuration
+ .getDegradedStatusThreshold())
{
int oldThresholdValue = degradedStatusThreshold;
- degradedStatusThreshold = configuration.getDegradedStatusThreshold();
- for(ReplicationServerDomain rsd : baseDNs.values())
+ degradedStatusThreshold = configuration
+ .getDegradedStatusThreshold();
+ for (ReplicationServerDomain domain : getReplicationServerDomains())
{
if (degradedStatusThreshold == 0)
{
// Requested to stop analyzers
- rsd.stopStatusAnalyzer();
- } else if (rsd.isRunningStatusAnalyzer())
+ domain.stopStatusAnalyzer();
+ }
+ else if (domain.isRunningStatusAnalyzer())
{
// Update the threshold value for this running analyzer
- rsd.updateStatusAnalyzer(degradedStatusThreshold);
- } else if (oldThresholdValue == 0)
+ domain.updateStatusAnalyzer(degradedStatusThreshold);
+ }
+ else if (oldThresholdValue == 0)
{
// Requested to start analyzers with provided threshold value
- if (rsd.getConnectedDSs().size() > 0)
- rsd.startStatusAnalyzer();
+ if (domain.getConnectedDSs().size() > 0)
+ domain.startStatusAnalyzer();
}
}
}
// Update period value for monitoring publishers (stop them if requested
// value is 0)
- if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
+ if (monitoringPublisherPeriod != configuration
+ .getMonitoringPeriod())
{
long oldMonitoringPeriod = monitoringPublisherPeriod;
monitoringPublisherPeriod = configuration.getMonitoringPeriod();
- for(ReplicationServerDomain rsd : baseDNs.values())
+ for (ReplicationServerDomain domain : getReplicationServerDomains())
{
if (monitoringPublisherPeriod == 0L)
{
// Requested to stop monitoring publishers
- rsd.stopMonitoringPublisher();
- } else if (rsd.isRunningMonitoringPublisher())
+ domain.stopMonitoringPublisher();
+ }
+ else if (domain.isRunningMonitoringPublisher())
{
// Update the threshold value for this running monitoring publisher
- rsd.updateMonitoringPublisher(monitoringPublisherPeriod);
- } else if (oldMonitoringPeriod == 0L)
+ domain.updateMonitoringPublisher(monitoringPublisherPeriod);
+ }
+ else if (oldMonitoringPeriod == 0L)
{
// Requested to start monitoring publishers with provided period value
- if ( (rsd.getConnectedDSs().size() > 0) ||
- (rsd.getConnectedRSs().size() > 0) )
- rsd.startMonitoringPublisher();
+ if ((domain.getConnectedDSs().size() > 0)
+ || (domain.getConnectedRSs().size() > 0))
+ domain.startMonitoringPublisher();
}
}
}
// Changed the group id ?
- byte newGroupId = (byte)configuration.getGroupId();
+ byte newGroupId = (byte) configuration.getGroupId();
if (newGroupId != groupId)
{
groupId = newGroupId;
// Have a new group id: Disconnect every servers.
- for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
+ for (ReplicationServerDomain domain : getReplicationServerDomains())
{
- replicationServerDomain.stopAllServers(true);
+ domain.stopAllServers(true);
}
}
@@ -1129,10 +1175,10 @@
*/
private void broadcastConfigChange()
{
- for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
+ for (ReplicationServerDomain domain : getReplicationServerDomains())
{
- replicationServerDomain.buildAndSendTopoInfoToDSs(null);
- replicationServerDomain.buildAndSendTopoInfoToRSs();
+ domain.buildAndSendTopoInfoToDSs(null);
+ domain.buildAndSendTopoInfoToRSs();
}
}
@@ -1364,10 +1410,15 @@
*/
public Iterator<ReplicationServerDomain> getDomainIterator()
{
- if (!baseDNs.isEmpty())
- return baseDNs.values().iterator();
+ Collection<ReplicationServerDomain> domains = getReplicationServerDomains();
+ if (!domains.isEmpty())
+ {
+ return domains.iterator();
+ }
else
+ {
return null;
+ }
}
/**
@@ -1384,16 +1435,40 @@
rsd.clearDbs();
}
}
- if (this.draftCNDbHandler != null)
+
+ synchronized (draftCNLock)
{
- try
+ if (draftCNDbHandler != null)
{
- try { draftCNDbHandler.clear(); } catch(Exception e){}
- draftCNDbHandler.shutdown();
+ try
+ {
+ draftCNDbHandler.clear();
+ }
+ catch (Exception e)
+ {
+ // Ignore.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ }
+ }
+
+ try
+ {
+ draftCNDbHandler.shutdown();
+ }
+ catch (Exception e)
+ {
+ // Ignore.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ }
+ }
+
lastGeneratedDraftCN = 0;
draftCNDbHandler = null;
}
- catch(Exception e) {}
}
}
@@ -1468,9 +1543,9 @@
if (serversToDisconnect.isEmpty())
return;
- for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
+ for (ReplicationServerDomain domain: getReplicationServerDomains())
{
- replicationServerDomain.stopReplicationServers(serversToDisconnect);
+ domain.stopReplicationServers(serversToDisconnect);
}
}
@@ -1494,221 +1569,6 @@
return replicationPort;
}
- // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
- private long monitorDataLifeTime = 500;
-
- /* The date of the last time they have been elaborated */
- private long monitorDataLastBuildDate = 0;
-
- /**
- * This uniquely identifies a server (handler) in the cross-domain topology.
- * Represents an identifier of a handler (in the whole RS) we have to wait a
- * monitoring message from before answering to a monitor request.
- */
- public static class GlobalServerId {
-
- private int serverId = -1;
- private String baseDn = null;
-
- /**
- * Constructor for a global server id.
- * @param baseDn The dn of the RSD owning the handler.
- * @param serverId The handler id in the matching RSD.
- */
- public GlobalServerId(String baseDn, int serverId) {
- this.baseDn = baseDn;
- this.serverId = serverId;
- }
-
- /**
- * Get the server handler id.
- * @return the serverId
- */
- public int getServerId()
- {
- return serverId;
- }
-
- /**
- * Get the base dn.
- * @return the baseDn
- */
- public String getBaseDn()
- {
- return baseDn;
- }
-
- /**
- * Get the hascode.
- * @return The hashcode.
- */
- @Override
- public int hashCode()
- {
- int hash = 7;
- hash = 43 * hash + this.serverId;
- hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0);
- return hash;
- }
-
- /**
- * Tests if the passed global server handler id represents the same server
- * handler as this one.
- * @param obj The object to test.
- * @return True if both identifiers are the same.
- */
- public boolean equals(Object obj) {
- if ( (obj == null) || (!(obj instanceof GlobalServerId)))
- return false;
-
- GlobalServerId globalServerId = (GlobalServerId)obj;
- return ( globalServerId.baseDn.equals(baseDn) &&
- (globalServerId.serverId == serverId) );
- }
- }
-
- /**
- * This gives the list of server handlers we are willing to wait monitoring
- * message from. Each time a monitoring message is received by a server
- * handler, the matching server handler id is retired from the list. When the
- * list is empty, we received all expected monitoring messages.
- */
- private List<GlobalServerId> expectedMonitoringMsg = null;
-
- /**
- * Trigger the computation of the Global Monitoring Data.
- * This should be called by all the MonitorProviders that need
- * the global monitoring data to be updated before they can
- * publish their information to cn=monitor.
- *
- * This method will trigger the update of all the global monitoring
- * information of all the base-DNs of this replication Server.
- *
- * @throws DirectoryException If the computation cannot be achieved.
- */
- public synchronized void computeMonitorData() throws DirectoryException
- {
- if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
- {
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache");
- // The current data are still valid. No need to renew them.
- return;
- }
-
- // Initialize the list of server handlers we expect monitoring messages from
- expectedMonitoringMsg =
- Collections.synchronizedList(new ArrayList<GlobalServerId>());
-
- // Copy the list of domains as a new domain may arrive or disappear between
- // the initializeMonitorData and completeMonitorData calls
- List<ReplicationServerDomain> rsdList =
- new ArrayList<ReplicationServerDomain>(baseDNs.values());
-
- for (ReplicationServerDomain domain : rsdList)
- {
- domain.initializeMonitorData(expectedMonitoringMsg);
- }
-
- // Wait for responses
- waitMonitorDataResponses();
-
- for (ReplicationServerDomain domain : rsdList)
- {
- domain.completeMonitorData();
- }
- }
-
- /**
- * Wait for the expected received MonitorMsg.
- * @throws DirectoryException When an error occurs.
- */
- private void waitMonitorDataResponses()
- throws DirectoryException
- {
- try
- {
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + getMonitorInstanceName() +
- " waiting for " + expectedMonitoringMsg.size() +
- " expected monitor messages");
-
- // Wait up to 5 seconds for every expected monitoring message to come
- // back.
- boolean allReceived = false;
- long startTime = TimeThread.getTime();
- long curTime = startTime;
- int maxTime = 5000;
- while ( (curTime - startTime) < maxTime )
- {
- // Have every expected monitoring messages arrived ?
- if (expectedMonitoringMsg.size() == 0)
- {
- // Ok break the loop
- allReceived = true;
- break;
- }
- Thread.sleep(100);
- curTime = TimeThread.getTime();
- }
-
- monitorDataLastBuildDate = TimeThread.getTime();
-
- if (!allReceived)
- {
- logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
- // let's go on in best effort even with limited data received.
- } else
- {
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + getMonitorInstanceName() +
- " Successfully received all expected monitor messages");
- }
- } catch (Exception e)
- {
- logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
- }
- }
-
-
- /**
- * This should be called by each ReplicationServerDomain that receives
- * a response to a monitor request message. This may also be called when a
- * monitoring message is coming from a RS whose monitoring publisher thread
- * sent it. As monitoring messages (sent because of monitoring request or
- * because of monitoring publisher) have the same content, this is also ok
- * to mark ok the server when the monitoring message coms from a monitoring
- * publisher thread.
- * @param globalServerId The server handler that is receiving the
- * monitoring message.
- */
- public void responseReceived(GlobalServerId globalServerId)
- {
- expectedMonitoringMsg.remove(globalServerId);
- }
-
-
- /**
- * This should be called when the Monitoring has failed and the
- * Worker thread that is waiting for the result should be awaken.
- */
- public void responseReceivedAll()
- {
- expectedMonitoringMsg.clear();
- }
-
- /**
- * Returns the number of domains managed by this replication server.
- * @return the number of domains managed.
- */
- public int getCacheSize()
- {
- return baseDNs.size();
- }
-
/**
* Create a new session to get the ECL.
* @param msg The message that specifies the ECL request.
@@ -1861,32 +1721,37 @@
return eligibleCN;
}
+
+
/**
* Get or create a handler on a Db on DraftCN for external changelog.
+ *
* @return the handler.
- * @throws DirectoryException when needed.
+ * @throws DirectoryException
+ * when needed.
*/
- public synchronized DraftCNDbHandler getDraftCNDbHandler()
- throws DirectoryException
+ public DraftCNDbHandler getDraftCNDbHandler()
+ throws DirectoryException
{
- try
+ synchronized (draftCNLock)
{
- if (draftCNDbHandler == null)
+ try
{
- draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv);
if (draftCNDbHandler == null)
- return null;
- this.lastGeneratedDraftCN = getLastDraftChangeNumber();
+ {
+ draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv);
+ lastGeneratedDraftCN = getLastDraftChangeNumber();
+ }
+ return draftCNDbHandler;
}
- return draftCNDbHandler;
- }
- catch (Exception e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get(""));
- throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
- mb.toMessage(), e);
+ catch (Exception e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get(""));
+ throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
+ mb.toMessage(), e);
+ }
}
}
@@ -1896,10 +1761,17 @@
*/
public int getFirstDraftChangeNumber()
{
- int first=0;
- if (draftCNDbHandler != null)
- first = draftCNDbHandler.getFirstKey();
- return first;
+ synchronized (draftCNLock)
+ {
+ if (draftCNDbHandler != null)
+ {
+ return draftCNDbHandler.getFirstKey();
+ }
+ else
+ {
+ return 0;
+ }
+ }
}
/**
@@ -1908,19 +1780,29 @@
*/
public int getLastDraftChangeNumber()
{
- int last=0;
- if (draftCNDbHandler != null)
- last = draftCNDbHandler.getLastKey();
- return last;
+ synchronized (draftCNLock)
+ {
+ if (draftCNDbHandler != null)
+ {
+ return draftCNDbHandler.getLastKey();
+ }
+ else
+ {
+ return 0;
+ }
+ }
}
/**
* Generate a new Draft ChangeNumber.
* @return The generated Draft ChangeNUmber
*/
- synchronized public int getNewDraftCN()
+ public int getNewDraftCN()
{
- return ++lastGeneratedDraftCN;
+ synchronized (draftCNLock)
+ {
+ return ++lastGeneratedDraftCN;
+ }
}
/**
@@ -2092,4 +1974,14 @@
return weight;
}
+
+
+ private Collection<ReplicationServerDomain> getReplicationServerDomains()
+ {
+ synchronized (baseDNs)
+ {
+ return new ArrayList<ReplicationServerDomain>(baseDNs.values());
+ }
+ }
+
}
--
Gitblit v1.10.0