From 2942eaa1b7264228c9ca7535aabd206e663581e9 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 03 Jan 2008 09:41:49 +0000
Subject: [PATCH]
---
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 235 ++++++++++++++++++++++++++++++++++++++++------------------
1 files changed, 163 insertions(+), 72 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 2c4e3ba..cd499e3 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -22,33 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
- */
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -62,9 +36,9 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
+import java.util.Date;
import java.util.List;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -150,11 +124,12 @@
/**
- * When this Handler is connected to a changelog server this collection
- * will contain the list of LDAP servers connected to the remote changelog
- * server.
+ * When this Handler is connected to a remote replication server
+ * this collection will contain as many elements as there are
+ * LDAP servers connected to the remote replication server.
*/
- private List<String> remoteLDAPservers = new ArrayList<String>();
+ private List<LightweightServerHandler>
+ remoteLDAPservers = new ArrayList<LightweightServerHandler>();
/**
* The time in milliseconds between heartbeats from the replication
@@ -830,27 +805,8 @@
ServerState dbState = replicationServerDomain.getDbServerState();
for (short id : dbState)
{
- int max = dbState.getMaxChangeNumber(id).getSeqnum();
- ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
- if (currentChange != null)
- {
- int current = currentChange.getSeqnum();
- if (current == max)
- {
- }
- else if (current < max)
- {
- totalCount += max - current;
- }
- else
- {
- totalCount += Integer.MAX_VALUE - (current - max) + 1;
- }
- }
- else
- {
- totalCount += max;
- }
+ totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
+ serverState.getMaxChangeNumber(id));
}
return totalCount;
}
@@ -858,7 +814,7 @@
}
/**
- * Get an approximation of the delay by looking at the age of the odest
+ * Get an approximation of the delay by looking at the age of the oldest
* message that has not been sent to this server.
* This is an approximation because the age is calculated using the
* clock of the servee where the replicationServer is currently running
@@ -886,25 +842,65 @@
* @return The age if the older change has not yet been replicated
* to the server handled by this ServerHandler.
*/
+ public Long getApproxFirstMissingDate()
+ {
+ // Get the older CN received
+ // From it, get the next sequence number
+ // Get the CN for the next sequence number
+ // If not present in the local RS db,
+ // then approximate with the older update time
+ ChangeNumber olderUpdateCN = getOlderUpdateCN();
+ if (olderUpdateCN == null)
+ return null;
+
+ ReplicationIterator ri =
+ replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN);
+ if (ri != null)
+ {
+ if (ri.next())
+ {
+ ChangeNumber firstMissingChange = ri.getChange().getChangeNumber();
+ return firstMissingChange.getTime();
+ }
+ }
+ return olderUpdateCN.getTime();
+ }
+
+ /**
+ * Get the older update time for that server.
+ * @return The older update time.
+ */
public long getOlderUpdateTime()
{
+ ChangeNumber olderUpdateCN = getOlderUpdateCN();
+ if (olderUpdateCN == null)
+ return 0;
+ return olderUpdateCN.getTime();
+ }
+
+ /**
+ * Get the older Change Number for that server.
+ * @return The older change number.
+ */
+ public ChangeNumber getOlderUpdateCN()
+ {
synchronized (msgQueue)
{
if (isFollowing())
{
if (msgQueue.isEmpty())
- return 0;
+ return null;
UpdateMessage msg = msgQueue.first();
- return msg.getChangeNumber().getTime();
+ return msg.getChangeNumber();
}
else
{
if (lateQueue.isEmpty())
- return 0;
+ return null;
UpdateMessage msg = lateQueue.first();
- return msg.getChangeNumber().getTime();
+ return msg.getChangeNumber();
}
}
}
@@ -1190,6 +1186,16 @@
}
/**
+ * Get the state of this server.
+ *
+ * @return ServerState the state for this server..
+ */
+ public ServerState getServerState()
+ {
+ return serverState;
+ }
+
+ /**
* Stop this server handler processing.
*/
public void stopHandler()
@@ -1397,7 +1403,7 @@
" " + serverURL + " " + String.valueOf(serverId);
if (serverIsLDAPserver)
- return "Remote LDAP Server " + str;
+ return "Direct LDAP Server " + str;
else
return "Remote Repl Server " + str;
}
@@ -1445,28 +1451,68 @@
{
ArrayList<Attribute> attributes = new ArrayList<Attribute>();
if (serverIsLDAPserver)
+ {
attributes.add(new Attribute("LDAP-Server", serverURL));
+ attributes.add(new Attribute("connected-to", this.replicationServerDomain.
+ getReplicationServer().getMonitorInstanceName()));
+
+ // Add the oldest missing update
+ Long olderUpdateTime = this.getApproxFirstMissingDate();
+ if (olderUpdateTime != null)
+ {
+ Date date = new Date(olderUpdateTime);
+ attributes.add(new Attribute("approx-older-change-not-synchronized",
+ date.toString()));
+ }
+ }
else
+ {
attributes.add(new Attribute("ReplicationServer-Server", serverURL));
+ }
attributes.add(new Attribute("server-id",
String.valueOf(serverId)));
attributes.add(new Attribute("base-dn",
baseDn.toString()));
- attributes.add(new Attribute("waiting-changes",
- String.valueOf(getRcvMsgQueueSize())));
- attributes.add(new Attribute("max-waiting-changes",
- String.valueOf(maxQueueSize)));
- attributes.add(new Attribute("update-waiting-acks",
- String.valueOf(getWaitingAckSize())));
+
+ // Update stats
+
+ // Retrieves the topology counters
+ if (serverIsLDAPserver)
+ {
+ try
+ {
+ replicationServerDomain.retrievesRemoteMonitorData();
+ }
+ catch(Exception e)
+ {
+ // FIXME: We failed retrieving the remote monitor data
+ }
+
+ // Compute the latency for the current SH
+ int missingChanges =
+ replicationServerDomain.getMissingChanges(serverState);
+
+ // add the latency attribute to our monitor data
+ attributes.add(new Attribute("missing-changes",
+ String.valueOf(missingChanges)));
+ }
+
+ // Deprecated
+ // attributes.add(new Attribute("max-waiting-changes",
+ // String.valueOf(maxQueueSize)));
attributes.add(new Attribute("update-sent",
String.valueOf(getOutCount())));
attributes.add(new Attribute("update-received",
String.valueOf(getInCount())));
+
+ // Deprecated as long as assured is not exposed
+ attributes.add(new Attribute("update-waiting-acks",
+ String.valueOf(getWaitingAckSize())));
attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
attributes.add(new Attribute("ack-received",
String.valueOf(getInAckCount())));
- attributes.add(new Attribute("approximate-delay",
- String.valueOf(getApproxDelay())));
+
+ // Window stats
attributes.add(new Attribute("max-send-window",
String.valueOf(sendWindowSize)));
attributes.add(new Attribute("current-send-window",
@@ -1475,6 +1521,18 @@
String.valueOf(maxRcvWindow)));
attributes.add(new Attribute("current-rcv-window",
String.valueOf(rcvWindow)));
+
+ /*
+ * FIXME:PGB DEPRECATED
+ *
+ // Missing changes
+ attributes.add(new Attribute("waiting-changes",
+ String.valueOf(getRcvMsgQueueSize())));
+ // Age of oldest missing change
+ attributes.add(new Attribute("approximate-delay",
+ String.valueOf(getApproxDelay())));
+
+ // Date of the oldest missing change
long olderUpdateTime = getOlderUpdateTime();
if (olderUpdateTime != 0)
{
@@ -1482,6 +1540,7 @@
attributes.add(new Attribute("older-change-not-synchronized",
String.valueOf(date.toString())));
}
+ */
/* get the Server State */
final String ATTR_SERVER_STATE = "server-state";
@@ -1495,9 +1554,11 @@
Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
attributes.add(attr);
+ // Encryption
attributes.add(new Attribute("ssl-encryption",
String.valueOf(session.isEncrypted())));
+ // Data generation
attributes.add(new Attribute("generation-id",
String.valueOf(generationId)));
@@ -1663,8 +1724,28 @@
getMonitorInstanceName() +
" SH for remote server " + this.getMonitorInstanceName() +
" sets replServerInfo " + "<" + infoMsg + ">");
- remoteLDAPservers = infoMsg.getConnectedServers();
+
+ List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
generationId = infoMsg.getGenerationId();
+
+ synchronized(remoteLDAPservers)
+ {
+ // Removes the existing structures
+ for (LightweightServerHandler lsh : remoteLDAPservers)
+ {
+ lsh.stopHandler();
+ }
+ remoteLDAPservers.clear();
+
+ // Creates the new structure according to the message received.
+ for (String newConnectedServer : newRemoteLDAPservers)
+ {
+ LightweightServerHandler lsh
+ = new LightweightServerHandler(newConnectedServer, this);
+ lsh.startHandler();
+ remoteLDAPservers.add(lsh);
+ }
+ }
}
/**
@@ -1678,9 +1759,9 @@
*/
public boolean isRemoteLDAPServer(short wantedServer)
{
- for (String server : remoteLDAPservers)
+ for (LightweightServerHandler server : remoteLDAPservers)
{
- if (wantedServer == Short.valueOf(server))
+ if (wantedServer == server.getServerId())
{
return true;
}
@@ -1695,9 +1776,9 @@
* @return boolean True is the replication server has remote LDAP servers
* connected to it.
*/
- public List<String> getRemoteLDAPServers()
+ public boolean hasRemoteLDAPServers()
{
- return remoteLDAPservers;
+ return !remoteLDAPservers.isEmpty();
}
/**
@@ -1802,4 +1883,14 @@
{
this.generationId = generationId;
}
+
+ /**
+ * Returns the Replication Server Domain to which belongs this server handler.
+ *
+ * @return The replication server domain.
+ */
+ public ReplicationServerDomain getDomain()
+ {
+ return this.replicationServerDomain;
+ }
}
--
Gitblit v1.10.0