From 2942eaa1b7264228c9ca7535aabd206e663581e9 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 03 Jan 2008 09:41:49 +0000
Subject: [PATCH]
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 379 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 372 insertions(+), 7 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 7eadd3e..6382ca3 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,25 +22,31 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.debug.DebugLogger.*;
+
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.messages.ToolMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.Iterator;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
@@ -49,9 +55,13 @@
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
+import org.opends.server.replication.protocol.MonitorMessage;
+import org.opends.server.replication.protocol.MonitorRequestMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.types.DN;
-
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.ResultCode;
+import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
/**
@@ -118,6 +128,34 @@
*/
private static final DebugTracer TRACER = getTracer();
+ /* Monitor data management */
+
+ // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
+ private long remoteMonitorDataLifeTime = 500;
+
+ /* Search op on monitor data is processed by a worker thread.
+ * Requests are sent to the other RS,and responses are received by the
+ * listener threads.
+ * The worker thread is awoke on this semaphore, or on timeout.
+ */
+ Semaphore remoteMonitorResponsesSemaphore;
+
+ /* The date of the last time they have been elaborated */
+ private long validityDate = 0;
+
+ // For each LDAP server, its server state
+ private HashMap<Short, ServerState> LDAPStates =
+ new HashMap<Short, ServerState>();
+
+ // For each LDAP server, the last CN it published
+ private HashMap<Short, ChangeNumber> maxCNs =
+ new HashMap<Short, ChangeNumber>();
+
+ // For each LDAP server, an approximation of the date of the first missing
+ // change
+ private HashMap<Short, Long> approxFirstMissingDate =
+ new HashMap<Short, Long>();
+
/**
* Creates a new ReplicationServerDomain associated to the DN baseDn.
*
@@ -352,7 +390,7 @@
}
else
{
- if (!rsh.getRemoteLDAPServers().isEmpty())
+ if (rsh.hasRemoteLDAPServers())
{
lDAPServersConnectedInTheTopology = true;
@@ -636,7 +674,7 @@
// server connected
for (ServerHandler rsh : replicationServers.values())
{
- if (!rsh.getRemoteLDAPServers().isEmpty())
+ if (rsh.hasRemoteLDAPServers())
{
servers.add(rsh);
}
@@ -693,15 +731,58 @@
*/
public void process(RoutableMessage msg, ServerHandler senderHandler)
{
- // A replication server is not expected to be the destination
- // of a routable message except for an error message.
+ // Test the message for which a ReplicationServer is expected
+ // to be the destination
if (msg.getDestination() == this.replicationServer.getServerId())
{
if (msg instanceof ErrorMessage)
{
ErrorMessage errorMsg = (ErrorMessage)msg;
logError(ERR_ERROR_MSG_RECEIVED.get(
- errorMsg.getDetails()));
+ errorMsg.getDetails()));
+ }
+ else if (msg instanceof MonitorRequestMessage)
+ {
+ MonitorRequestMessage replServerMonitorRequestMsg =
+ (MonitorRequestMessage) msg;
+
+ MonitorMessage monitorMsg =
+ new MonitorMessage(
+ replServerMonitorRequestMsg.getDestination(),
+ replServerMonitorRequestMsg.getsenderID());
+
+ // Populate the RS state in the msg from the DbState
+ monitorMsg.setReplServerState(this.getDbServerState());
+
+ // Populate for each connected LDAP Server
+ // from the states stored in the serverHandler.
+ // - the server state
+ // - the older missing change
+ for (ServerHandler lsh : this.connectedServers.values())
+ {
+ monitorMsg.setLDAPServerState(
+ lsh.getServerId(),
+ lsh.getServerState(),
+ lsh.getApproxFirstMissingDate());
+ }
+ try
+ {
+ senderHandler.send(monitorMsg);
+ }
+ catch(Exception e)
+ {
+ // We log the error. The requestor will detect a timeout or
+ // any other failure on the connection.
+ logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
+ Short.toString((msg.getDestination()))));
+ }
+ }
+ else if (msg instanceof MonitorMessage)
+ {
+ MonitorMessage monitorMsg =
+ (MonitorMessage) msg;
+
+ receivesMonitorDataResponse(monitorMsg);
}
else
{
@@ -1156,4 +1237,288 @@
{
return replicationServer;
}
+
+ /*
+ * Monitor Data generation
+ */
+
+ /**
+ * Retrieves the remote monitor data.
+ *
+ * @throws DirectoryException When an error occurs.
+ */
+ protected void retrievesRemoteMonitorData()
+ throws DirectoryException
+ {
+ if (validityDate > TimeThread.getTime())
+ {
+ // The current data are still valid. No need to renew them.
+ return;
+ }
+
+ // Clean
+ this.LDAPStates.clear();
+ this.maxCNs.clear();
+
+ // Init the maxCNs of our direct LDAP servers from our own dbstate
+ for (ServerHandler rs : connectedServers.values())
+ {
+ short serverID = rs.getServerId();
+ ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
+ if (cn == null)
+ {
+ // we have nothing in db for that server
+ cn = new ChangeNumber(0, 0 , serverID);
+ }
+ this.maxCNs.put(serverID, cn);
+ }
+
+ ServerState replServerState = this.getDbServerState();
+ Iterator<Short> it = replServerState.iterator();
+ while (it.hasNext())
+ {
+ short sid = it.next();
+ ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
+ ChangeNumber maxCN = this.maxCNs.get(sid);
+ if ((maxCN != null) && (receivedCN.newer(maxCN)))
+ {
+ // We found a newer one
+ this.maxCNs.remove(sid);
+ this.maxCNs.put(sid, receivedCN);
+ }
+ }
+
+ // Send Request to the other Replication Servers
+ if (remoteMonitorResponsesSemaphore == null)
+ {
+ remoteMonitorResponsesSemaphore = new Semaphore(
+ replicationServers.size() -1);
+
+ sendMonitorDataRequest();
+
+ // Wait reponses from them or timeout
+ waitMonitorDataResponses(replicationServers.size());
+ }
+ else
+ {
+ // The processing of renewing the monitor cache is already running
+ // We'll make it sleeping until the end
+ while (remoteMonitorResponsesSemaphore!=null)
+ {
+ waitMonitorDataResponses(1);
+ }
+ }
+
+ // Now we have the expected answers of an error occured
+ validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
+
+ if (debugEnabled())
+ {
+ debugMonitorData();
+ }
+ }
+
+ private void debugMonitorData()
+ {
+ String mds = " Monitor data=";
+ Iterator<Short> ite = LDAPStates.keySet().iterator();
+ while (ite.hasNext())
+ {
+ Short sid = ite.next();
+ ServerState ss = LDAPStates.get(sid);
+ mds += " LDAPState(" + sid + ")=" + ss.toString();
+ }
+ Iterator<Short> itc = maxCNs.keySet().iterator();
+ while (itc.hasNext())
+ {
+ Short sid = itc.next();
+ ChangeNumber cn = maxCNs.get(sid);
+ mds += " maxCNs(" + sid + ")=" + cn.toString();
+ }
+
+ mds += "--";
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ mds);
+ }
+
+ /**
+ * Sends a MonitorRequest message to all connected RS.
+ * @throws DirectoryException when a problem occurs.
+ */
+ protected void sendMonitorDataRequest()
+ throws DirectoryException
+ {
+ try
+ {
+ for (ServerHandler rs : replicationServers.values())
+ {
+ MonitorRequestMessage msg = new
+ MonitorRequestMessage(this.replicationServer.getServerId(),
+ rs.getServerId());
+ rs.send(msg);
+ }
+ }
+ catch(Exception e)
+ {
+ Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
+ logError(message);
+ throw new DirectoryException(ResultCode.OTHER,
+ message, e);
+ }
+ }
+
+ /**
+ * Wait for the expected count of received MonitorMessage.
+ * @param expectedResponses The number of expected answers.
+ * @throws DirectoryException When an error occurs.
+ */
+ protected void waitMonitorDataResponses(int expectedResponses)
+ throws DirectoryException
+ {
+ try
+ {
+ boolean allPermitsAcquired =
+ remoteMonitorResponsesSemaphore.tryAcquire(
+ expectedResponses,
+ (long) 500, TimeUnit.MILLISECONDS);
+
+ if (!allPermitsAcquired)
+ {
+ logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+ }
+ else
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ "Successfully received all " + replicationServers.size()
+ + " expected monitor messages");
+ }
+ }
+ catch(Exception e)
+ {
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
+ }
+ finally
+ {
+ remoteMonitorResponsesSemaphore = null;
+ }
+ }
+
+ /**
+ * Processes a Monitor message receives from a remote Replication Server
+ * and stores the data received.
+ *
+ * @param msg The message to be processed.
+ */
+ public void receivesMonitorDataResponse(MonitorMessage msg)
+ {
+ if (remoteMonitorResponsesSemaphore == null)
+ {
+ // Ignoring the remote monitor data because an error occured previously
+ return;
+ }
+
+ try
+ {
+ // Here is the RS state : list <serverID, lastChangeNumber>
+ // For each LDAP Server, we keep the max CN accross the RSes
+ ServerState replServerState = msg.getReplServerState();
+ Iterator<Short> it = replServerState.iterator();
+ while (it.hasNext())
+ {
+ short sid = it.next();
+ ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
+ ChangeNumber maxCN = this.maxCNs.get(sid);
+ if (receivedCN.newer(maxCN))
+ {
+ // We found a newer one
+ this.maxCNs.remove(sid);
+ this.maxCNs.put(sid, receivedCN);
+ }
+ }
+
+ // Store the LDAP servers states
+ Iterator<Short> sidIterator = msg.iterator();
+ while (sidIterator.hasNext())
+ {
+ short sid = sidIterator.next();
+ ServerState ss = msg.getLDAPServerState(sid);
+ this.LDAPStates.put(sid, ss);
+ this.approxFirstMissingDate.put(sid,
+ msg.getApproxFirstMissingDate(sid));
+ }
+
+ // Decreases the number of expected responses and potentially
+ // wakes up the waiting requestor thread.
+ remoteMonitorResponsesSemaphore.release();
+ }
+ catch (Exception e)
+ {
+ // If an exception occurs while processing one of the expected message,
+ // the processing is aborted and the waiting thread is awoke.
+ remoteMonitorResponsesSemaphore.notifyAll();
+ }
+ }
+
+ /**
+ * Get the state of the LDAP server with the provided serverId.
+ * @param serverId The server ID.
+ * @return The server state.
+ */
+ public ServerState getServerState(short serverId)
+ {
+ return LDAPStates.get(serverId);
+ }
+
+ /**
+ * Get the highest know change number of the LDAP server with the provided
+ * serverId.
+ * @param serverId The server ID.
+ * @return The highest change number.
+ */
+ public ChangeNumber getMaxCN(short serverId)
+ {
+ return maxCNs.get(serverId);
+ }
+
+ /**
+ * Get an approximation of the date of the oldest missing changes.
+ * serverId.
+ * @param serverId The server ID.
+ * @return The approximation of the date of the oldest missing change.
+ */
+ public Long getApproxFirstMissingDate(short serverId)
+ {
+ return approxFirstMissingDate.get(serverId);
+ }
+
+ /**
+ * Get the number of missing change for the server with the provided state.
+ * @param state The provided server state.
+ * @return The number of missing changes.
+ */
+ public int getMissingChanges(ServerState state)
+ {
+ // Traverse the max Cn transmitted by each server
+ // For each server, get the highest CN know from the current server
+ // Sum the difference betwenn the max and the last
+ int missingChanges = 0;
+ Iterator<Short> itc = maxCNs.keySet().iterator();
+ while (itc.hasNext())
+ {
+ Short sid = itc.next();
+ ChangeNumber maxCN = maxCNs.get(sid);
+ ChangeNumber last = state.getMaxChangeNumber(sid);
+ if (last == null)
+ {
+ last = new ChangeNumber(0,0, sid);
+ }
+ int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
+ missingChanges += missingChangesFromSID;
+ }
+ return missingChanges;
+ }
}
--
Gitblit v1.10.0