From 8adb35fa24b145ab593ceba3a0602dfa43e28cb7 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 03 Jan 2008 09:41:49 +0000
Subject: [PATCH]
---
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 7
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 74 +
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java | 122 +++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java | 599 ++++++++++++++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 235 ++++-
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java | 397 ++++++++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 379 ++++++++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java | 15
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java | 14
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java | 27
opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java | 270 +++++++
opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java | 36
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java | 6
opendj-sdk/opends/src/messages/messages/replication.properties | 6
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java | 4
16 files changed, 2,101 insertions(+), 94 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index e41ddb7..d47fa34 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -251,4 +251,10 @@
export-ldif command must be run as a task
NOTICE_SSL_SERVER_CON_ATTEMPT_ERROR_105=SSL connection attempt from %s (%s) \
failed: %s
+SEVERE_ERR_MISSING_REMOTE_MONITOR_DATA_106=Monitor data of remote servers \
+ are missing due to an error in the retrieval process
+SEVERE_ERR_PROCESSING_REMOTE_MONITOR_DATA_107=Monitor data of remote servers \
+ are missing due to a processing error : %s
+SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \
+ sending request to get remote monitor data
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
index 998efb2..57128aa 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.common;
@@ -185,6 +185,40 @@
}
}
+
+ /**
+ * Computes the difference in number of changes between 2
+ * change numbers.
+ * @param op1 the first ChangeNumber
+ * @param op2 the second ChangeNumber
+ * @return the difference
+ */
+ public static int diffSeqNum(ChangeNumber op1, ChangeNumber op2)
+ {
+ int totalCount = 0;
+ int max = op1.getSeqnum();
+ if (op2 != null)
+ {
+ int current = op2.getSeqnum();
+ if (current == max)
+ {
+ }
+ else if (current < max)
+ {
+ totalCount += max - current;
+ }
+ else
+ {
+ totalCount += Integer.MAX_VALUE - (current - max) + 1;
+ }
+ }
+ else
+ {
+ totalCount += max;
+ }
+ return totalCount;
+ }
+
/**
* check if the current Object is strictly older than ChangeNumber
* given in parameter.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
index 5234eda..adc4b74 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -258,7 +258,7 @@
@Override
public String toString()
{
- return ("ADD " + getDn() + " " + getChangeNumber());
+ return ("ADD DN=(" + getDn() + ") CN=(" + getChangeNumber() + ")");
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
new file mode 100644
index 0000000..51863da
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
@@ -0,0 +1,397 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.protocols.asn1.ASN1Sequence;
+import org.opends.server.protocols.asn1.ASN1Element;
+import org.opends.server.replication.common.ChangeNumber;
+
+/**
+ * This message is part of the replication protocol.
+ * This message is sent by a server to one or several other servers and
+ * contain one entry to be sent over the protocol in the context of
+ * an import/export over the protocol.
+ */
+public class MonitorMessage extends RoutableMessage implements
+ Serializable
+{
+
+ private static final long serialVersionUID = -1900670921496804942L;
+
+ /**
+ * FIXME.
+ *
+ */
+ class ServerData
+ {
+ ServerState state;
+ Long approxFirstMissingDate;
+ }
+
+ /**
+ * FIXME.
+ *
+ */
+ class SubTopoMonitorData
+ {
+ ServerState replServerState;
+ HashMap<Short, ServerData> ldapStates =
+ new HashMap<Short, ServerData>();
+ }
+
+ SubTopoMonitorData data = new SubTopoMonitorData();;
+
+ /**
+ * Creates a new EntryMessage.
+ *
+ * @param sender The sender of this message.
+ * @param destination The destination of this message.
+ */
+ public MonitorMessage(short sender, short destination)
+ {
+ super(sender, destination);
+ }
+
+ /**
+ * FIXME.
+ * @param state a.
+ */
+ public void setReplServerState(ServerState state)
+ {
+ data.replServerState = state;
+ }
+
+ /**
+ * FIXME.
+ * @param serverId a.
+ * @param state a.
+ * @param olderUpdateTime a.
+ *
+ */
+ public void setLDAPServerState(short serverId, ServerState state,
+ Long olderUpdateTime)
+ {
+ if (data.ldapStates == null)
+ {
+ data.ldapStates = new HashMap<Short, ServerData>();
+ }
+ ServerData sd = new ServerData();
+ sd.state = state;
+ sd.approxFirstMissingDate = olderUpdateTime;
+ data.ldapStates.put(serverId, sd);
+ }
+
+ /**
+ * FIXME.
+ * @param serverId a.
+ * @return a.
+ */
+ public ServerState getLDAPServerState(short serverId)
+ {
+ return data.ldapStates.get(serverId).state;
+ }
+
+ /**
+ * FIXME.
+ * @param serverId a.
+ * @return a.
+ */
+ public Long getApproxFirstMissingDate(short serverId)
+ {
+ return data.ldapStates.get(serverId).approxFirstMissingDate;
+ }
+
+
+ /**
+ * Creates a new EntryMessage from its encoded form.
+ *
+ * @param in The byte array containing the encoded form of the message.
+ * @throws DataFormatException If the byte array does not contain a valid
+ * encoded form of the ServerStartMessage.
+ */
+ public MonitorMessage(byte[] in) throws DataFormatException
+ {
+ try
+ {
+ /* first byte is the type */
+ if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
+ throw new DataFormatException("input is not a valid " +
+ this.getClass().getCanonicalName());
+ int pos = 1;
+
+ // sender
+ int length = getNextLength(in, pos);
+ String senderIDString = new String(in, pos, length, "UTF-8");
+ this.senderID = Short.valueOf(senderIDString);
+ pos += length +1;
+
+ // destination
+ length = getNextLength(in, pos);
+ String destinationString = new String(in, pos, length, "UTF-8");
+ this.destination = Short.valueOf(destinationString);
+ pos += length +1;
+
+ /* Read the states : all the remaining bytes but the terminating 0 */
+ byte[] encodedS = new byte[in.length-pos-1];
+ int i =0;
+ while (pos<in.length-1)
+ {
+ encodedS[i++] = in[pos++];
+ }
+
+
+ try
+ {
+ ASN1Sequence s0 = ASN1Sequence.decodeAsSequence(encodedS);
+ for (ASN1Element el0 : s0.elements())
+ {
+ ServerState newState = new ServerState();
+ short serverId = 0;
+ Long outime = (long)0;
+ ASN1Sequence s1 = el0.decodeAsSequence();
+ for (ASN1Element el1 : s1.elements())
+ {
+ ASN1OctetString o = el1.decodeAsOctetString();
+ String s = o.stringValue();
+ ChangeNumber cn = new ChangeNumber(s);
+ if ((data.replServerState != null) && (serverId == 0))
+ {
+ serverId = cn.getServerId();
+ outime = cn.getTime();
+ }
+ else
+ {
+ newState.update(cn);
+ }
+ }
+
+ // the first state is the replication state
+ if (data.replServerState == null)
+ {
+ data.replServerState = newState;
+ }
+ else
+ {
+ ServerData sd = new ServerData();
+ sd.state = newState;
+ sd.approxFirstMissingDate = outime;
+ data.ldapStates.put(serverId, sd);
+ }
+ }
+ } catch(Exception e)
+ {
+
+ }
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ try
+ {
+ byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
+ byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+
+ int length = 1 + senderBytes.length +
+ 1 + destinationBytes.length;
+
+ ASN1Sequence stateElementSequence = new ASN1Sequence();
+ ArrayList<ASN1Element> stateElementList = new ArrayList<ASN1Element>();
+
+ // First loop computes the length
+ /* Put the serverStates ... */
+ stateElementSequence = new ASN1Sequence();
+ stateElementList = new ArrayList<ASN1Element>();
+
+ /* first put the Replication Server state */
+ ArrayList<ASN1OctetString> cnOctetList =
+ data.replServerState.toASN1ArrayList();
+ ArrayList<ASN1Element> cnElementList = new ArrayList<ASN1Element>();
+ for (ASN1OctetString soci : cnOctetList)
+ {
+ cnElementList.add(soci);
+ }
+ ASN1Sequence cnSequence = new ASN1Sequence(cnElementList);
+ stateElementList.add(cnSequence);
+
+ // then the LDAP server data
+ Set<Short> servers = data.ldapStates.keySet();
+ for (Short sid : servers)
+ {
+ // State
+ ServerState statei = data.ldapStates.get(sid).state;
+ // First missing date
+ Long outime = data.ldapStates.get(sid).approxFirstMissingDate;
+
+ // retrieves the change numbers as an arrayList of ANSN1OctetString
+ cnOctetList = statei.toASN1ArrayList();
+ cnElementList = new ArrayList<ASN1Element>();
+
+ // a fake changenumber helps storing the LDAP server ID
+ // and the olderupdatetime
+ ChangeNumber cn = new ChangeNumber(outime,0,sid);
+ cnElementList.add(new ASN1OctetString(cn.toString()));
+
+ // the changenumbers
+ for (ASN1OctetString soci : cnOctetList)
+ {
+ cnElementList.add(soci);
+ }
+
+ cnSequence = new ASN1Sequence(cnElementList);
+ stateElementList.add(cnSequence);
+ }
+ stateElementSequence.setElements(stateElementList);
+ int seqLen = stateElementSequence.encode().length;
+
+ //
+ length += seqLen;
+ length += 2;
+
+ // Allocate the array sized from the computed length
+ byte[] resultByteArray = new byte[length];
+
+ // Second loop build the array
+
+ /* put the type of the operation */
+ resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
+ int pos = 1;
+
+ pos = addByteArray(senderBytes, resultByteArray, pos);
+ pos = addByteArray(destinationBytes, resultByteArray, pos);
+
+ /* Put the serverStates ... */
+ stateElementSequence = new ASN1Sequence();
+ stateElementList = new ArrayList<ASN1Element>();
+
+ /* first put the Replication Server state */
+ cnOctetList =
+ data.replServerState.toASN1ArrayList();
+ cnElementList = new ArrayList<ASN1Element>();
+ for (ASN1OctetString soci : cnOctetList)
+ {
+ cnElementList.add(soci);
+ }
+ cnSequence = new ASN1Sequence(cnElementList);
+ stateElementList.add(cnSequence);
+
+ // then the LDAP server state
+ servers = data.ldapStates.keySet();
+ for (Short sid : servers)
+ {
+ ServerState statei = data.ldapStates.get(sid).state;
+ Long outime = data.ldapStates.get(sid).approxFirstMissingDate;
+
+ // retrieves the change numbers as an arrayList of ANSN1OctetString
+ cnOctetList = statei.toASN1ArrayList();
+ cnElementList = new ArrayList<ASN1Element>();
+
+ // a fake changenumber helps storing the LDAP server ID
+ ChangeNumber cn = new ChangeNumber(outime,0,sid);
+ cnElementList.add(new ASN1OctetString(cn.toString()));
+
+ // the changenumbers
+ for (ASN1OctetString soci : cnOctetList)
+ {
+ cnElementList.add(soci);
+ }
+
+ cnSequence = new ASN1Sequence(cnElementList);
+ stateElementList.add(cnSequence);
+ }
+ stateElementSequence.setElements(stateElementList);
+ pos = addByteArray(stateElementSequence.encode(), resultByteArray, pos);
+
+ return resultByteArray;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * FIXME.
+ * @return FIXME.
+ */
+ public ServerState getReplServerState()
+ {
+ return data.replServerState;
+ }
+
+ /**
+ * FIXME.
+ * @return a.
+ */
+ public Iterator<Short> iterator()
+ {
+ return data.ldapStates.keySet().iterator();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString()
+ {
+ String stateS = " RState:";
+ stateS += "/" + data.replServerState.toString();
+ stateS += " LDAPStates:";
+ Iterator<ServerData> it = data.ldapStates.values().iterator();
+ while (it.hasNext())
+ {
+ ServerData sd = it.next();
+ stateS += "/ state=" + sd.state.toString()
+ + " afmd=" + sd.approxFirstMissingDate + "] ";
+ }
+
+ String me = this.getClass().getCanonicalName() +
+ " sender=" + this.senderID +
+ " destination=" + this.destination +
+ " states=" + stateS +
+ "]";
+ return me;
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java
new file mode 100644
index 0000000..c762d41
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java
@@ -0,0 +1,122 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+/**
+ * This message is part of the replication protocol.
+ * FIXME: usage
+ */
+public class MonitorRequestMessage extends RoutableMessage implements
+ Serializable
+{
+
+ private static final long serialVersionUID = -2407640479423633234L;
+
+ /**
+ * Creates a message.
+ *
+ * @param sender The sender server of this message.
+ * @param destination The server or servers targetted by this message.
+ */
+ public MonitorRequestMessage(short sender, short destination)
+ {
+ super(sender, destination);
+ }
+
+ /**
+ * Creates a new message by decoding the provided byte array.
+ * @param in A byte array containing the encoded information for the message,
+ * @throws DataFormatException If the in does not contain a properly,
+ * encoded message.
+ */
+ public MonitorRequestMessage(byte[] in) throws DataFormatException
+ {
+ super();
+ try
+ {
+ // First byte is the type
+ if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
+ throw new DataFormatException("input is not a valid " +
+ this.getClass().getCanonicalName());
+ int pos = 1;
+
+ // sender
+ int length = getNextLength(in, pos);
+ String senderString = new String(in, pos, length, "UTF-8");
+ this.senderID = Short.valueOf(senderString);
+ pos += length +1;
+
+ // destination
+ length = getNextLength(in, pos);
+ String destinationString = new String(in, pos, length, "UTF-8");
+ this.destination = Short.valueOf(destinationString);
+ pos += length +1;
+
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ {
+ try
+ {
+ byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
+ byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+
+ int length = 1 + senderBytes.length + 1
+ + destinationBytes.length + 1;
+
+ byte[] resultByteArray = new byte[length];
+
+ /* put the type of the operation */
+ resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR_REQUEST;
+ int pos = 1;
+
+ /* put the sender */
+ pos = addByteArray(senderBytes, resultByteArray, pos);
+
+ /* put the destination */
+ pos = addByteArray(destinationBytes, resultByteArray, pos);
+
+ return resultByteArray;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ return null;
+ }
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
index 22215c6..6109d02 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2007 Sun Microsystems, Inc.
+ * Portions Copyright 2007-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -67,7 +67,7 @@
/* first byte is the type */
if (in.length < 1 || in[0] != MSG_TYPE_REPL_SERVER_INFO)
throw new DataFormatException(
- "Input is not a valid changelogInfo Message.");
+ "Input is not a valid " + this.getClass().getCanonicalName());
int pos = 1;
@@ -97,7 +97,7 @@
/**
- * Creates a new changelogInfo message from a list of the currently
+ * Creates a new ReplServerInfo message from a list of the currently
* connected servers.
*
* @param connectedServers The list of currently connected servers ID.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
index ba72bf7..9ec9f41 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -55,6 +55,8 @@
static final byte MSG_TYPE_WINDOW_PROBE = 15;
static final byte MSG_TYPE_REPL_SERVER_INFO = 16;
static final byte MSG_TYPE_RESET_GENERATION_ID = 17;
+ static final byte MSG_TYPE_REPL_SERVER_MONITOR_REQUEST = 18;
+ static final byte MSG_TYPE_REPL_SERVER_MONITOR = 19;
// Adding a new type of message here probably requires to
// change accordingly generateMsg method below
@@ -79,6 +81,8 @@
* MSG_TYPE_WINDOW_PROBE
* MSG_TYPE_REPL_SERVER_INFO
* MSG_TYPE_RESET_GENERATION_ID
+ * MSG_TYPE_REPL_SERVER_MONITOR_REQUEST
+ * MSG_TYPE_REPL_SERVER_MONITOR
*
* @return the byte[] representation of this message.
* @throws UnsupportedEncodingException When the encoding of the message
@@ -152,6 +156,12 @@
case MSG_TYPE_REPL_SERVER_INFO:
msg = new ReplServerInfoMessage(buffer);
break;
+ case MSG_TYPE_REPL_SERVER_MONITOR_REQUEST:
+ msg = new MonitorRequestMessage(buffer);
+ break;
+ case MSG_TYPE_REPL_SERVER_MONITOR:
+ msg = new MonitorMessage(buffer);
+ break;
default:
throw new DataFormatException("received message with unknown type");
}
@@ -192,7 +202,7 @@
while (in[offset++] != 0)
{
if (offset >= in.length)
- throw new DataFormatException("byte[] is not a valid modify msg");
+ throw new DataFormatException("byte[] is not a valid msg");
length++;
}
return length;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
new file mode 100644
index 0000000..63d6616
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -0,0 +1,270 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.server;
+
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashSet;
+
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.DN;
+import org.opends.server.types.InitializationException;
+
+/**
+ * This class defines a server handler dedicated to the remote LDAP servers
+ * connected to a remote Replication Server.
+ * This class is necessary because we want to provide monitor entries for them
+ * and because the monitor API only allows one entry by MonitorProvider instance
+ * so that no other class can provide the monitor entry for these objects.
+ *
+ * One instance of this class is created for each instance of remote LDAP server
+ * connected to a remote Replication Server.
+ */
+public class LightweightServerHandler
+ extends MonitorProvider<MonitorProviderCfg>
+{
+ // The tracer object for the debug logger.
+ private static final DebugTracer TRACER = getTracer();
+
+ short serverId;
+ ServerHandler replServerHandler;
+ ReplicationServerDomain rsDomain;
+ DN baseDn;
+
+ /**
+ * Creates a new LighweightServerHandler with the provided serverid, connected
+ * to the remote Replication Server represented by replServerHandler.
+ *
+ * @param serverId The serverId of this remote LDAP server.
+ * @param replServerHandler The server handler of the Replication Server to
+ * which this LDAP server is remotely connected.
+ */
+ public LightweightServerHandler(String serverId,
+ ServerHandler replServerHandler)
+ {
+ super("Server Handler");
+ this.serverId = Short.valueOf(serverId);
+ this.replServerHandler = replServerHandler;
+ this.rsDomain = replServerHandler.getDomain();
+ this.baseDn = rsDomain.getBaseDn();
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " +
+ replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName()+
+ " LWSH for remote server " + this.serverId +
+ " connected to:" + this.replServerHandler.getMonitorInstanceName() +
+ " ()");
+}
+
+ /**
+ * Get the serverID associated with this LDAP server.
+ * @return The serverId.
+ */
+ public short getServerId()
+ {
+ return Short.valueOf(serverId);
+ }
+
+ /**
+ * Stop this server handler processing.
+ */
+ public void startHandler()
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " +
+replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
+ " LWSH for remote server " + this.serverId +
+ " connected to:" + this.replServerHandler.getMonitorInstanceName() +
+ " start");
+ DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+ DirectoryServer.registerMonitorProvider(this);
+
+ }
+
+ /**
+ * Stop this server handler processing.
+ */
+ public void stopHandler()
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " +
+replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
+ " LWSH for remote server " + this.serverId +
+ " connected to:" + this.replServerHandler.getMonitorInstanceName() +
+ " stop");
+ DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void initializeMonitorProvider(MonitorProviderCfg configuration)
+ throws ConfigException,InitializationException
+ {
+ // Nothing to do for now
+ }
+
+ /**
+ * Retrieves the name of this monitor provider. It should be unique among all
+ * monitor providers, including all instances of the same monitor provider.
+ *
+ * @return The name of this monitor provider.
+ */
+ @Override
+ public String getMonitorInstanceName()
+ {
+ String serverURL=""; // FIXME
+ String str = baseDn.toString() + " " + serverURL + " "
+ + String.valueOf(serverId);
+ return "Undirect LDAP Server " + str;
+ }
+
+ /**
+ * Retrieves the length of time in milliseconds that should elapse between
+ * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero
+ * return value indicates that the <CODE>updateMonitorData()</CODE> method
+ * should not be periodically invoked.
+ *
+ * @return The length of time in milliseconds that should elapse between
+ * calls to the <CODE>updateMonitorData()</CODE> method.
+ */
+ @Override
+ public long getUpdateInterval()
+ {
+ /* we don't wont to do polling on this monitor */
+ return 0;
+ }
+
+ /**
+ * Performs any processing periodic processing that may be desired to update
+ * the information associated with this monitor. Note that best-effort
+ * attempts will be made to ensure that calls to this method come
+ * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
+ * be made.
+ */
+ @Override
+ public void updateMonitorData()
+ {
+ // As long as getUpdateInterval() returns 0, this will never get called
+
+ }
+
+ /**
+ * Retrieves a set of attributes containing monitor data that should be
+ * returned to the client if the corresponding monitor entry is requested.
+ *
+ * @return A set of attributes containing monitor data that should be
+ * returned to the client if the corresponding monitor entry is
+ * requested.
+ */
+ @Override
+ public ArrayList<Attribute> getMonitorData()
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " +
+ this.replServerHandler.getDomain().getReplicationServer().
+ getMonitorInstanceName()+
+ " LWSH for remote server " + this.serverId +
+ " connected to:" + this.replServerHandler.getMonitorInstanceName() +
+ " getMonitor data");
+
+
+ ArrayList<Attribute> attributes = new ArrayList<Attribute>();
+
+ attributes.add(new Attribute("server-id",
+ String.valueOf(serverId)));
+ attributes.add(new Attribute("base-dn",
+ rsDomain.getBaseDn().toNormalizedString()));
+ attributes.add(new Attribute("connected-to",
+ replServerHandler.getMonitorInstanceName()));
+
+ // Retrieves the topology counters
+ try
+ {
+ rsDomain.retrievesRemoteMonitorData();
+
+ // Compute the latency for the current SH
+ ServerState remoteState = rsDomain.getServerState(serverId);
+ if (remoteState == null)
+ {
+ remoteState = new ServerState();
+ }
+
+ /* get the Server State */
+ final String ATTR_SERVER_STATE = "server-state";
+ AttributeType type =
+ DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
+ LinkedHashSet<AttributeValue> values =
+ new LinkedHashSet<AttributeValue>();
+ for (String str : remoteState.toStringSet())
+ {
+ values.add(new AttributeValue(type,str));
+ }
+ Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
+ attributes.add(attr);
+
+ // add the latency attribute to our monitor data
+ // Compute the latency for the current SH
+ int missingChanges = rsDomain.getMissingChanges(remoteState);
+ attributes.add(new Attribute("missing-changes",
+ String.valueOf(missingChanges)));
+
+ // Add the oldest missing update
+ Long olderUpdateTime = rsDomain.getApproxFirstMissingDate(serverId);
+ if (olderUpdateTime != null)
+ {
+ Date date = new Date(olderUpdateTime);
+ attributes.add(new Attribute("approx-older-change-not-synchronized",
+ date.toString()));
+ }
+ }
+ catch(Exception e)
+ {
+ // We failed retrieving the remote monitor data.
+ attributes.add(new Attribute("error",
+ stackTraceToSingleLineString(e)));
+ }
+ return attributes;
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 7eadd3e..6382ca3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,25 +22,31 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.debug.DebugLogger.*;
+
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.messages.ToolMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.Iterator;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
@@ -49,9 +55,13 @@
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
+import org.opends.server.replication.protocol.MonitorMessage;
+import org.opends.server.replication.protocol.MonitorRequestMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.types.DN;
-
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.ResultCode;
+import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
/**
@@ -118,6 +128,34 @@
*/
private static final DebugTracer TRACER = getTracer();
+ /* Monitor data management */
+
+ // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
+ private long remoteMonitorDataLifeTime = 500;
+
+ /* Search op on monitor data is processed by a worker thread.
+ * Requests are sent to the other RS,and responses are received by the
+ * listener threads.
+ * The worker thread is awoke on this semaphore, or on timeout.
+ */
+ Semaphore remoteMonitorResponsesSemaphore;
+
+ /* The date of the last time they have been elaborated */
+ private long validityDate = 0;
+
+ // For each LDAP server, its server state
+ private HashMap<Short, ServerState> LDAPStates =
+ new HashMap<Short, ServerState>();
+
+ // For each LDAP server, the last CN it published
+ private HashMap<Short, ChangeNumber> maxCNs =
+ new HashMap<Short, ChangeNumber>();
+
+ // For each LDAP server, an approximation of the date of the first missing
+ // change
+ private HashMap<Short, Long> approxFirstMissingDate =
+ new HashMap<Short, Long>();
+
/**
* Creates a new ReplicationServerDomain associated to the DN baseDn.
*
@@ -352,7 +390,7 @@
}
else
{
- if (!rsh.getRemoteLDAPServers().isEmpty())
+ if (rsh.hasRemoteLDAPServers())
{
lDAPServersConnectedInTheTopology = true;
@@ -636,7 +674,7 @@
// server connected
for (ServerHandler rsh : replicationServers.values())
{
- if (!rsh.getRemoteLDAPServers().isEmpty())
+ if (rsh.hasRemoteLDAPServers())
{
servers.add(rsh);
}
@@ -693,15 +731,58 @@
*/
public void process(RoutableMessage msg, ServerHandler senderHandler)
{
- // A replication server is not expected to be the destination
- // of a routable message except for an error message.
+ // Test the message for which a ReplicationServer is expected
+ // to be the destination
if (msg.getDestination() == this.replicationServer.getServerId())
{
if (msg instanceof ErrorMessage)
{
ErrorMessage errorMsg = (ErrorMessage)msg;
logError(ERR_ERROR_MSG_RECEIVED.get(
- errorMsg.getDetails()));
+ errorMsg.getDetails()));
+ }
+ else if (msg instanceof MonitorRequestMessage)
+ {
+ MonitorRequestMessage replServerMonitorRequestMsg =
+ (MonitorRequestMessage) msg;
+
+ MonitorMessage monitorMsg =
+ new MonitorMessage(
+ replServerMonitorRequestMsg.getDestination(),
+ replServerMonitorRequestMsg.getsenderID());
+
+ // Populate the RS state in the msg from the DbState
+ monitorMsg.setReplServerState(this.getDbServerState());
+
+ // Populate for each connected LDAP Server
+ // from the states stored in the serverHandler.
+ // - the server state
+ // - the older missing change
+ for (ServerHandler lsh : this.connectedServers.values())
+ {
+ monitorMsg.setLDAPServerState(
+ lsh.getServerId(),
+ lsh.getServerState(),
+ lsh.getApproxFirstMissingDate());
+ }
+ try
+ {
+ senderHandler.send(monitorMsg);
+ }
+ catch(Exception e)
+ {
+ // We log the error. The requestor will detect a timeout or
+ // any other failure on the connection.
+ logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
+ Short.toString((msg.getDestination()))));
+ }
+ }
+ else if (msg instanceof MonitorMessage)
+ {
+ MonitorMessage monitorMsg =
+ (MonitorMessage) msg;
+
+ receivesMonitorDataResponse(monitorMsg);
}
else
{
@@ -1156,4 +1237,288 @@
{
return replicationServer;
}
+
+ /*
+ * Monitor Data generation
+ */
+
+ /**
+ * Retrieves the remote monitor data.
+ *
+ * @throws DirectoryException When an error occurs.
+ */
+ protected void retrievesRemoteMonitorData()
+ throws DirectoryException
+ {
+ if (validityDate > TimeThread.getTime())
+ {
+ // The current data are still valid. No need to renew them.
+ return;
+ }
+
+ // Clean
+ this.LDAPStates.clear();
+ this.maxCNs.clear();
+
+ // Init the maxCNs of our direct LDAP servers from our own dbstate
+ for (ServerHandler rs : connectedServers.values())
+ {
+ short serverID = rs.getServerId();
+ ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
+ if (cn == null)
+ {
+ // we have nothing in db for that server
+ cn = new ChangeNumber(0, 0 , serverID);
+ }
+ this.maxCNs.put(serverID, cn);
+ }
+
+ ServerState replServerState = this.getDbServerState();
+ Iterator<Short> it = replServerState.iterator();
+ while (it.hasNext())
+ {
+ short sid = it.next();
+ ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
+ ChangeNumber maxCN = this.maxCNs.get(sid);
+ if ((maxCN != null) && (receivedCN.newer(maxCN)))
+ {
+ // We found a newer one
+ this.maxCNs.remove(sid);
+ this.maxCNs.put(sid, receivedCN);
+ }
+ }
+
+ // Send Request to the other Replication Servers
+ if (remoteMonitorResponsesSemaphore == null)
+ {
+ remoteMonitorResponsesSemaphore = new Semaphore(
+ replicationServers.size() -1);
+
+ sendMonitorDataRequest();
+
+ // Wait reponses from them or timeout
+ waitMonitorDataResponses(replicationServers.size());
+ }
+ else
+ {
+ // The processing of renewing the monitor cache is already running
+ // We'll make it sleeping until the end
+ while (remoteMonitorResponsesSemaphore!=null)
+ {
+ waitMonitorDataResponses(1);
+ }
+ }
+
+ // Now we have the expected answers of an error occured
+ validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
+
+ if (debugEnabled())
+ {
+ debugMonitorData();
+ }
+ }
+
+ private void debugMonitorData()
+ {
+ String mds = " Monitor data=";
+ Iterator<Short> ite = LDAPStates.keySet().iterator();
+ while (ite.hasNext())
+ {
+ Short sid = ite.next();
+ ServerState ss = LDAPStates.get(sid);
+ mds += " LDAPState(" + sid + ")=" + ss.toString();
+ }
+ Iterator<Short> itc = maxCNs.keySet().iterator();
+ while (itc.hasNext())
+ {
+ Short sid = itc.next();
+ ChangeNumber cn = maxCNs.get(sid);
+ mds += " maxCNs(" + sid + ")=" + cn.toString();
+ }
+
+ mds += "--";
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDN=" + baseDn +
+ mds);
+ }
+
+ /**
+ * Sends a MonitorRequest message to all connected RS.
+ * @throws DirectoryException when a problem occurs.
+ */
+ protected void sendMonitorDataRequest()
+ throws DirectoryException
+ {
+ try
+ {
+ for (ServerHandler rs : replicationServers.values())
+ {
+ MonitorRequestMessage msg = new
+ MonitorRequestMessage(this.replicationServer.getServerId(),
+ rs.getServerId());
+ rs.send(msg);
+ }
+ }
+ catch(Exception e)
+ {
+ Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
+ logError(message);
+ throw new DirectoryException(ResultCode.OTHER,
+ message, e);
+ }
+ }
+
+ /**
+ * Wait for the expected count of received MonitorMessage.
+ * @param expectedResponses The number of expected answers.
+ * @throws DirectoryException When an error occurs.
+ */
+ protected void waitMonitorDataResponses(int expectedResponses)
+ throws DirectoryException
+ {
+ try
+ {
+ boolean allPermitsAcquired =
+ remoteMonitorResponsesSemaphore.tryAcquire(
+ expectedResponses,
+ (long) 500, TimeUnit.MILLISECONDS);
+
+ if (!allPermitsAcquired)
+ {
+ logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+ }
+ else
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ "Successfully received all " + replicationServers.size()
+ + " expected monitor messages");
+ }
+ }
+ catch(Exception e)
+ {
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
+ }
+ finally
+ {
+ remoteMonitorResponsesSemaphore = null;
+ }
+ }
+
+ /**
+ * Processes a Monitor message receives from a remote Replication Server
+ * and stores the data received.
+ *
+ * @param msg The message to be processed.
+ */
+ public void receivesMonitorDataResponse(MonitorMessage msg)
+ {
+ if (remoteMonitorResponsesSemaphore == null)
+ {
+ // Ignoring the remote monitor data because an error occured previously
+ return;
+ }
+
+ try
+ {
+ // Here is the RS state : list <serverID, lastChangeNumber>
+ // For each LDAP Server, we keep the max CN accross the RSes
+ ServerState replServerState = msg.getReplServerState();
+ Iterator<Short> it = replServerState.iterator();
+ while (it.hasNext())
+ {
+ short sid = it.next();
+ ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
+ ChangeNumber maxCN = this.maxCNs.get(sid);
+ if (receivedCN.newer(maxCN))
+ {
+ // We found a newer one
+ this.maxCNs.remove(sid);
+ this.maxCNs.put(sid, receivedCN);
+ }
+ }
+
+ // Store the LDAP servers states
+ Iterator<Short> sidIterator = msg.iterator();
+ while (sidIterator.hasNext())
+ {
+ short sid = sidIterator.next();
+ ServerState ss = msg.getLDAPServerState(sid);
+ this.LDAPStates.put(sid, ss);
+ this.approxFirstMissingDate.put(sid,
+ msg.getApproxFirstMissingDate(sid));
+ }
+
+ // Decreases the number of expected responses and potentially
+ // wakes up the waiting requestor thread.
+ remoteMonitorResponsesSemaphore.release();
+ }
+ catch (Exception e)
+ {
+ // If an exception occurs while processing one of the expected message,
+ // the processing is aborted and the waiting thread is awoke.
+ remoteMonitorResponsesSemaphore.notifyAll();
+ }
+ }
+
+ /**
+ * Get the state of the LDAP server with the provided serverId.
+ * @param serverId The server ID.
+ * @return The server state.
+ */
+ public ServerState getServerState(short serverId)
+ {
+ return LDAPStates.get(serverId);
+ }
+
+ /**
+ * Get the highest know change number of the LDAP server with the provided
+ * serverId.
+ * @param serverId The server ID.
+ * @return The highest change number.
+ */
+ public ChangeNumber getMaxCN(short serverId)
+ {
+ return maxCNs.get(serverId);
+ }
+
+ /**
+ * Get an approximation of the date of the oldest missing changes.
+ * serverId.
+ * @param serverId The server ID.
+ * @return The approximation of the date of the oldest missing change.
+ */
+ public Long getApproxFirstMissingDate(short serverId)
+ {
+ return approxFirstMissingDate.get(serverId);
+ }
+
+ /**
+ * Get the number of missing change for the server with the provided state.
+ * @param state The provided server state.
+ * @return The number of missing changes.
+ */
+ public int getMissingChanges(ServerState state)
+ {
+ // Traverse the max Cn transmitted by each server
+ // For each server, get the highest CN know from the current server
+ // Sum the difference betwenn the max and the last
+ int missingChanges = 0;
+ Iterator<Short> itc = maxCNs.keySet().iterator();
+ while (itc.hasNext())
+ {
+ Short sid = itc.next();
+ ChangeNumber maxCN = maxCNs.get(sid);
+ ChangeNumber last = state.getMaxChangeNumber(sid);
+ if (last == null)
+ {
+ last = new ChangeNumber(0,0, sid);
+ }
+ int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
+ missingChanges += missingChangesFromSID;
+ }
+ return missingChanges;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 2c4e3ba..cd499e3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -22,33 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
- */
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
@@ -62,9 +36,9 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
+import java.util.Date;
import java.util.List;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -150,11 +124,12 @@
/**
- * When this Handler is connected to a changelog server this collection
- * will contain the list of LDAP servers connected to the remote changelog
- * server.
+ * When this Handler is connected to a remote replication server
+ * this collection will contain as many elements as there are
+ * LDAP servers connected to the remote replication server.
*/
- private List<String> remoteLDAPservers = new ArrayList<String>();
+ private List<LightweightServerHandler>
+ remoteLDAPservers = new ArrayList<LightweightServerHandler>();
/**
* The time in milliseconds between heartbeats from the replication
@@ -830,27 +805,8 @@
ServerState dbState = replicationServerDomain.getDbServerState();
for (short id : dbState)
{
- int max = dbState.getMaxChangeNumber(id).getSeqnum();
- ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
- if (currentChange != null)
- {
- int current = currentChange.getSeqnum();
- if (current == max)
- {
- }
- else if (current < max)
- {
- totalCount += max - current;
- }
- else
- {
- totalCount += Integer.MAX_VALUE - (current - max) + 1;
- }
- }
- else
- {
- totalCount += max;
- }
+ totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
+ serverState.getMaxChangeNumber(id));
}
return totalCount;
}
@@ -858,7 +814,7 @@
}
/**
- * Get an approximation of the delay by looking at the age of the odest
+ * Get an approximation of the delay by looking at the age of the oldest
* message that has not been sent to this server.
* This is an approximation because the age is calculated using the
* clock of the servee where the replicationServer is currently running
@@ -886,25 +842,65 @@
* @return The age if the older change has not yet been replicated
* to the server handled by this ServerHandler.
*/
+ public Long getApproxFirstMissingDate()
+ {
+ // Get the older CN received
+ // From it, get the next sequence number
+ // Get the CN for the next sequence number
+ // If not present in the local RS db,
+ // then approximate with the older update time
+ ChangeNumber olderUpdateCN = getOlderUpdateCN();
+ if (olderUpdateCN == null)
+ return null;
+
+ ReplicationIterator ri =
+ replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN);
+ if (ri != null)
+ {
+ if (ri.next())
+ {
+ ChangeNumber firstMissingChange = ri.getChange().getChangeNumber();
+ return firstMissingChange.getTime();
+ }
+ }
+ return olderUpdateCN.getTime();
+ }
+
+ /**
+ * Get the older update time for that server.
+ * @return The older update time.
+ */
public long getOlderUpdateTime()
{
+ ChangeNumber olderUpdateCN = getOlderUpdateCN();
+ if (olderUpdateCN == null)
+ return 0;
+ return olderUpdateCN.getTime();
+ }
+
+ /**
+ * Get the older Change Number for that server.
+ * @return The older change number.
+ */
+ public ChangeNumber getOlderUpdateCN()
+ {
synchronized (msgQueue)
{
if (isFollowing())
{
if (msgQueue.isEmpty())
- return 0;
+ return null;
UpdateMessage msg = msgQueue.first();
- return msg.getChangeNumber().getTime();
+ return msg.getChangeNumber();
}
else
{
if (lateQueue.isEmpty())
- return 0;
+ return null;
UpdateMessage msg = lateQueue.first();
- return msg.getChangeNumber().getTime();
+ return msg.getChangeNumber();
}
}
}
@@ -1190,6 +1186,16 @@
}
/**
+ * Get the state of this server.
+ *
+ * @return ServerState the state for this server..
+ */
+ public ServerState getServerState()
+ {
+ return serverState;
+ }
+
+ /**
* Stop this server handler processing.
*/
public void stopHandler()
@@ -1397,7 +1403,7 @@
" " + serverURL + " " + String.valueOf(serverId);
if (serverIsLDAPserver)
- return "Remote LDAP Server " + str;
+ return "Direct LDAP Server " + str;
else
return "Remote Repl Server " + str;
}
@@ -1445,28 +1451,68 @@
{
ArrayList<Attribute> attributes = new ArrayList<Attribute>();
if (serverIsLDAPserver)
+ {
attributes.add(new Attribute("LDAP-Server", serverURL));
+ attributes.add(new Attribute("connected-to", this.replicationServerDomain.
+ getReplicationServer().getMonitorInstanceName()));
+
+ // Add the oldest missing update
+ Long olderUpdateTime = this.getApproxFirstMissingDate();
+ if (olderUpdateTime != null)
+ {
+ Date date = new Date(olderUpdateTime);
+ attributes.add(new Attribute("approx-older-change-not-synchronized",
+ date.toString()));
+ }
+ }
else
+ {
attributes.add(new Attribute("ReplicationServer-Server", serverURL));
+ }
attributes.add(new Attribute("server-id",
String.valueOf(serverId)));
attributes.add(new Attribute("base-dn",
baseDn.toString()));
- attributes.add(new Attribute("waiting-changes",
- String.valueOf(getRcvMsgQueueSize())));
- attributes.add(new Attribute("max-waiting-changes",
- String.valueOf(maxQueueSize)));
- attributes.add(new Attribute("update-waiting-acks",
- String.valueOf(getWaitingAckSize())));
+
+ // Update stats
+
+ // Retrieves the topology counters
+ if (serverIsLDAPserver)
+ {
+ try
+ {
+ replicationServerDomain.retrievesRemoteMonitorData();
+ }
+ catch(Exception e)
+ {
+ // FIXME: We failed retrieving the remote monitor data
+ }
+
+ // Compute the latency for the current SH
+ int missingChanges =
+ replicationServerDomain.getMissingChanges(serverState);
+
+ // add the latency attribute to our monitor data
+ attributes.add(new Attribute("missing-changes",
+ String.valueOf(missingChanges)));
+ }
+
+ // Deprecated
+ // attributes.add(new Attribute("max-waiting-changes",
+ // String.valueOf(maxQueueSize)));
attributes.add(new Attribute("update-sent",
String.valueOf(getOutCount())));
attributes.add(new Attribute("update-received",
String.valueOf(getInCount())));
+
+ // Deprecated as long as assured is not exposed
+ attributes.add(new Attribute("update-waiting-acks",
+ String.valueOf(getWaitingAckSize())));
attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
attributes.add(new Attribute("ack-received",
String.valueOf(getInAckCount())));
- attributes.add(new Attribute("approximate-delay",
- String.valueOf(getApproxDelay())));
+
+ // Window stats
attributes.add(new Attribute("max-send-window",
String.valueOf(sendWindowSize)));
attributes.add(new Attribute("current-send-window",
@@ -1475,6 +1521,18 @@
String.valueOf(maxRcvWindow)));
attributes.add(new Attribute("current-rcv-window",
String.valueOf(rcvWindow)));
+
+ /*
+ * FIXME:PGB DEPRECATED
+ *
+ // Missing changes
+ attributes.add(new Attribute("waiting-changes",
+ String.valueOf(getRcvMsgQueueSize())));
+ // Age of oldest missing change
+ attributes.add(new Attribute("approximate-delay",
+ String.valueOf(getApproxDelay())));
+
+ // Date of the oldest missing change
long olderUpdateTime = getOlderUpdateTime();
if (olderUpdateTime != 0)
{
@@ -1482,6 +1540,7 @@
attributes.add(new Attribute("older-change-not-synchronized",
String.valueOf(date.toString())));
}
+ */
/* get the Server State */
final String ATTR_SERVER_STATE = "server-state";
@@ -1495,9 +1554,11 @@
Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
attributes.add(attr);
+ // Encryption
attributes.add(new Attribute("ssl-encryption",
String.valueOf(session.isEncrypted())));
+ // Data generation
attributes.add(new Attribute("generation-id",
String.valueOf(generationId)));
@@ -1663,8 +1724,28 @@
getMonitorInstanceName() +
" SH for remote server " + this.getMonitorInstanceName() +
" sets replServerInfo " + "<" + infoMsg + ">");
- remoteLDAPservers = infoMsg.getConnectedServers();
+
+ List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
generationId = infoMsg.getGenerationId();
+
+ synchronized(remoteLDAPservers)
+ {
+ // Removes the existing structures
+ for (LightweightServerHandler lsh : remoteLDAPservers)
+ {
+ lsh.stopHandler();
+ }
+ remoteLDAPservers.clear();
+
+ // Creates the new structure according to the message received.
+ for (String newConnectedServer : newRemoteLDAPservers)
+ {
+ LightweightServerHandler lsh
+ = new LightweightServerHandler(newConnectedServer, this);
+ lsh.startHandler();
+ remoteLDAPservers.add(lsh);
+ }
+ }
}
/**
@@ -1678,9 +1759,9 @@
*/
public boolean isRemoteLDAPServer(short wantedServer)
{
- for (String server : remoteLDAPservers)
+ for (LightweightServerHandler server : remoteLDAPservers)
{
- if (wantedServer == Short.valueOf(server))
+ if (wantedServer == server.getServerId())
{
return true;
}
@@ -1695,9 +1776,9 @@
* @return boolean True is the replication server has remote LDAP servers
* connected to it.
*/
- public List<String> getRemoteLDAPServers()
+ public boolean hasRemoteLDAPServers()
{
- return remoteLDAPservers;
+ return !remoteLDAPservers.isEmpty();
}
/**
@@ -1802,4 +1883,14 @@
{
this.generationId = generationId;
}
+
+ /**
+ * Returns the Replication Server Domain to which belongs this server handler.
+ *
+ * @return The replication server domain.
+ */
+ public ReplicationServerDomain getDomain()
+ {
+ return this.replicationServerDomain;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index b1071ff..9193c99 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.Message;
@@ -49,6 +49,8 @@
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
+import org.opends.server.replication.protocol.MonitorMessage;
+import org.opends.server.replication.protocol.MonitorRequestMessage;
import org.opends.server.loggers.debug.DebugTracer;
@@ -246,6 +248,17 @@
}
}
}
+ else if (msg instanceof MonitorRequestMessage)
+ {
+ MonitorRequestMessage replServerMonitorRequestMsg =
+ (MonitorRequestMessage) msg;
+ handler.process(replServerMonitorRequestMsg);
+ }
+ else if (msg instanceof MonitorMessage)
+ {
+ MonitorMessage replServerMonitorMsg = (MonitorMessage) msg;
+ handler.process(replServerMonitorMsg);
+ }
else if (msg == null)
{
/*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index fca9f9f..698ce5e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.Message;
@@ -126,7 +126,7 @@
"In " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
", writer to " + this.handler.getMonitorInstanceName() +
- " publishes msg=" + update.toString() +
+ " publishes msg=[" + update.toString() + "]"+
" refgenId=" + referenceGenerationId +
" server=" + handler.getServerId() +
" generationId=" + handler.getGenerationId());
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index c0b615b..e91164d 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication;
@@ -917,6 +917,11 @@
TRACER.debugInfo("Failed to add entry " + entry.getDN() +
"Result code = : " + addOp.getResultCode());
}
+ else
+ {
+ TRACER.debugInfo(entry.getDN() +
+ " added " + addOp.getResultCode());
+ }
// They will be removed at the end of the test
entryList.addLast(entry.getDN());
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
index fdc1bf7..2376e20 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.common;
@@ -303,4 +303,29 @@
CN2 = cng.newChangeNumber();
assertTrue(CN1.compareTo(CN2) != 0 );
}
+
+ /**
+ * Test the difference in seq num between 2 change numbers.
+ */
+ @Test
+ public void changeNumberDiffSeqNum()
+ throws Exception
+ {
+ ChangeNumber CN1;
+ ChangeNumber CN2;
+
+ CN1 = new ChangeNumber((long)0, 3, (short)0);
+
+ // 3-1 = 2
+ CN2 = new ChangeNumber((long)0, 1, (short)0);
+ assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 2);
+
+ // 3-3 = 0
+ CN2 = new ChangeNumber((long)0, 3, (short)0);
+ assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 0);
+
+ // 3-4 == MAXINT (modulo)
+ CN2 = new ChangeNumber((long)0, 4, (short)0);
+ assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE);
+ }
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 5d09572..592cc29 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -30,7 +30,9 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.Iterator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
@@ -560,7 +562,7 @@
* an exception.
*/
@Test()
- public void WindowProbeTest() throws Exception
+ public void windowProbeTest() throws Exception
{
WindowProbe msg = new WindowProbe();
new WindowProbe(msg.getBytes());
@@ -583,6 +585,74 @@
}
/**
+ * Test MonitorMessage
+ */
+ @Test()
+ public void monitorMessageTest() throws Exception
+ {
+ short sender = 2;
+ short dest = 3;
+
+ // RS State
+ ServerState rsState = new ServerState();
+ ChangeNumber rscn1 = new ChangeNumber(1, (short) 1, (short) 1);
+ ChangeNumber rscn2 = new ChangeNumber(1, (short) 1, (short) 2);
+ rsState.update(rscn1);
+ rsState.update(rscn2);
+
+ // LS1 state
+ ServerState s1 = new ServerState();
+ short sid1 = 111;
+ ChangeNumber cn1 = new ChangeNumber(1, (short) 1, sid1);
+ s1.update(cn1);
+
+ // LS2 state
+ ServerState s2 = new ServerState();
+ short sid2 = 222;
+ Long now = TimeThread.getTime();
+ ChangeNumber cn2 = new ChangeNumber(now,
+ (short) 123, sid2);
+ s2.update(cn2);
+
+ MonitorMessage msg =
+ new MonitorMessage(sender, dest);
+ msg.setReplServerState(rsState);
+ msg.setLDAPServerState(sid1, s1, now+1);
+ msg.setLDAPServerState(sid2, s2, now+2);
+
+ byte[] b = msg.getBytes();
+ MonitorMessage newMsg = new MonitorMessage(b);
+
+ assertEquals(rsState, msg.getReplServerState());
+ assertEquals(newMsg.getReplServerState().toString(),
+ msg.getReplServerState().toString());
+
+ Iterator<Short> it = newMsg.iterator();
+ while (it.hasNext())
+ {
+ short sid = it.next();
+ ServerState s = newMsg.getLDAPServerState(sid);
+ if (sid == sid1)
+ {
+ assertEquals(s.toString(), s1.toString(), "");
+ assertEquals((Long)(now+1), newMsg.getApproxFirstMissingDate(sid), "");
+ }
+ else if (sid == sid2)
+ {
+ assertEquals(s.toString(), s2.toString());
+ assertEquals((Long)(now+2), newMsg.getApproxFirstMissingDate(sid), "");
+ }
+ else
+ {
+ fail("Bad sid");
+ }
+ }
+
+ assertEquals(newMsg.getsenderID(), msg.getsenderID());
+ assertEquals(newMsg.getDestination(), msg.getDestination());
+ }
+
+ /**
* Test that EntryMessage encoding and decoding works
* by checking that : msg == new EntryMessageTest(msg.getBytes()).
*/
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
new file mode 100644
index 0000000..4c2f81d
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -0,0 +1,599 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.server;
+
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.replication.plugin.ReplicationDomain;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.ReplicationMessage;
+import org.opends.server.replication.protocol.SocketSession;
+import org.opends.server.tools.LDAPSearch;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for the replicationServer code.
+ */
+
+public class MonitorTest extends ReplicationTestCase
+{
+ // The tracer object for the debug logger
+ private static final DebugTracer TRACER = getTracer();
+
+ private static final String baseDnStr = "dc=example,dc=com";
+ private static final String baseSnStr = "genidcom";
+
+ private static final int WINDOW_SIZE = 10;
+ private static final int CHANGELOG_QUEUE_SIZE = 100;
+ private static final short server1ID = 1;
+ private static final short server2ID = 2;
+ private static final short server3ID = 3;
+ private static final short server4ID = 4;
+ private static final short changelog1ID = 11;
+ private static final short changelog2ID = 12;
+ private static final short changelog3ID = 13;
+
+ private DN baseDn;
+ private ReplicationBroker broker2 = null;
+ private ReplicationBroker broker3 = null;
+ private ReplicationBroker broker4 = null;
+ private ReplicationServer replServer1 = null;
+ private ReplicationServer replServer2 = null;
+ private ReplicationServer replServer3 = null;
+ private boolean emptyOldChanges = true;
+ ReplicationDomain replDomain = null;
+ SocketSession ssSession = null;
+ boolean ssShutdownRequested = false;
+ protected String[] updatedEntries;
+
+ private static int[] replServerPort = new int[20];
+
+ private void debugInfo(String s)
+ {
+ logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("** TEST **" + s);
+ }
+ }
+ protected void debugInfo(String message, Exception e)
+ {
+ debugInfo(message + stackTraceToSingleLineString(e));
+ }
+
+ /**
+ * Set up the environment for performing the tests in this Class.
+ *
+ * @throws Exception
+ * If the environment could not be set up.
+ */
+ @BeforeClass
+ public void setUp() throws Exception
+ {
+ //log("Starting generationIdTest setup: debugEnabled:" + debugEnabled());
+
+ // This test suite depends on having the schema available.
+ TestCaseUtils.startServer();
+
+ baseDn = DN.decode(baseDnStr);
+
+ updatedEntries = newLDIFEntries();
+
+ // Create an internal connection in order to provide operations
+ // to DS to populate the db -
+ connection = InternalClientConnection.getRootConnection();
+
+ // Synchro provider
+ String synchroStringDN = "cn=Synchronization Providers,cn=config";
+
+ // Synchro multi-master
+ synchroPluginStringDN = "cn=Multimaster Synchronization, "
+ + synchroStringDN;
+
+ // Synchro suffix
+ synchroServerEntry = null;
+
+ // Add config entries to the current DS server based on :
+ // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN
+ // Add synchroServerEntry
+ // Add replServerEntry
+ configureReplication();
+
+ // Change log
+ String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
+ String changeLogLdif = "dn: " + changeLogStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+ + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
+ + "ds-cfg-changelog-server-id: 1\n"
+ + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
+ + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
+ replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
+ replServerEntry = null;
+
+ }
+
+ /*
+ * Creates entries necessary to the test.
+ */
+ private String[] newLDIFEntries()
+ {
+ String[] entries =
+ {
+ "dn: " + baseDn + "\n"
+ + "objectClass: top\n"
+ + "objectClass: domain\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
+ + "\n",
+ "dn: ou=People," + baseDn + "\n"
+ + "objectClass: top\n"
+ + "objectClass: organizationalUnit\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
+ + "\n",
+ "dn: cn=Fiona Jensen,ou=people," + baseDn + "\n"
+ + "objectclass: top\n"
+ + "objectclass: person\n"
+ + "objectclass: organizationalPerson\n"
+ + "objectclass: inetOrgPerson\n"
+ + "cn: Fiona Jensen\n"
+ + "sn: Jensen\n"
+ + "uid: fiona\n"
+ + "telephonenumber: +1 408 555 1212\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
+ + "\n",
+ "dn: cn=Robert Langman,ou=people," + baseDn + "\n"
+ + "objectclass: top\n"
+ + "objectclass: person\n"
+ + "objectclass: organizationalPerson\n"
+ + "objectclass: inetOrgPerson\n"
+ + "cn: Robert Langman\n"
+ + "sn: Langman\n"
+ + "uid: robert\n"
+ + "telephonenumber: +1 408 555 1213\n"
+ + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
+ + "\n"
+ };
+
+ return entries;
+ }
+
+ /**
+ * Creates a new replicationServer.
+ * @param changelogId The serverID of the replicationServer to create.
+ * @param all Specifies whether to coonect the created replication
+ * server to the other replication servers in the test.
+ * @return The new created replication server.
+ */
+ private ReplicationServer createReplicationServer(short changelogId,
+ boolean all, String suffix)
+ {
+ SortedSet<String> servers = null;
+ servers = new TreeSet<String>();
+ try
+ {
+ if (changelogId==changelog1ID)
+ {
+ if (replServer1!=null)
+ return replServer1;
+ }
+ else if (changelogId==changelog2ID)
+ {
+ if (replServer2!=null)
+ return replServer2;
+ }
+ else if (changelogId==changelog3ID)
+ {
+ if (replServer3!=null)
+ return replServer3;
+ }
+ if (all)
+ {
+ servers.add("localhost:" + getChangelogPort(changelog1ID));
+ servers.add("localhost:" + getChangelogPort(changelog2ID));
+ }
+ int chPort = getChangelogPort(changelogId);
+ String chDir = "genid"+changelogId+suffix+"Db";
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100,
+ servers);
+ ReplicationServer replicationServer = new ReplicationServer(conf);
+ Thread.sleep(1000);
+
+ return replicationServer;
+
+ }
+ catch (Exception e)
+ {
+ fail("createChangelog" + stackTraceToSingleLineString(e));
+ }
+ return null;
+ }
+
+ /**
+ * Create a synchronized suffix in the current server providing the
+ * replication Server ID.
+ * @param changelogID
+ */
+ private void connectServer1ToChangelog(short changelogID)
+ {
+ // Connect DS to the replicationServer
+ try
+ {
+ // suffix synchronized
+ String synchroServerStringDN = synchroPluginStringDN;
+ String synchroServerLdif =
+ "dn: cn=" + baseSnStr + ", cn=domains," + synchroServerStringDN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-replication-domain\n"
+ + "cn: " + baseSnStr + "\n"
+ + "ds-cfg-base-dn: " + baseDnStr + "\n"
+ + "ds-cfg-replication-server: localhost:"
+ + getChangelogPort(changelogID)+"\n"
+ + "ds-cfg-server-id: " + server1ID + "\n"
+ + "ds-cfg-receive-status: true\n"
+ + "ds-cfg-window-size: " + WINDOW_SIZE;
+
+ if (synchroServerEntry == null)
+ {
+ synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+ DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+ "Unable to add the synchronized server");
+ configEntryList.add(synchroServerEntry.getDN());
+
+ replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
+
+ }
+ if (replDomain != null)
+ {
+ debugInfo("ReplicationDomain: Import/Export is running ? " + replDomain.ieRunning());
+ }
+ }
+ catch(Exception e)
+ {
+ debugInfo("connectToReplServer", e);
+ fail("connectToReplServer", e);
+ }
+ }
+
+ /*
+ * Disconnect DS from the replicationServer
+ */
+ private void disconnectFromReplServer(short changelogID)
+ {
+ try
+ {
+ // suffix synchronized
+ String synchroServerStringDN = "cn=" + baseSnStr + ", cn=domains," +
+ synchroPluginStringDN;
+ if (synchroServerEntry != null)
+ {
+ DN synchroServerDN = DN.decode(synchroServerStringDN);
+ DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN,null);
+ assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN())==null,
+ "Unable to delete the synchronized domain");
+ synchroServerEntry = null;
+
+ configEntryList.remove(configEntryList.indexOf(synchroServerDN));
+ }
+ }
+ catch(Exception e)
+ {
+ fail("disconnectFromReplServer", e);
+ }
+ }
+
+ private int getChangelogPort(short changelogID)
+ {
+ if (replServerPort[changelogID] == 0)
+ {
+ try
+ {
+ // Find a free port for the replicationServer
+ ServerSocket socket = TestCaseUtils.bindFreePort();
+ replServerPort[changelogID] = socket.getLocalPort();
+ socket.close();
+ }
+ catch(Exception e)
+ {
+ fail("Cannot retrieve a free port for replication server."
+ + e.getMessage());
+ }
+ }
+ return replServerPort[changelogID];
+ }
+
+ private String createEntry(UUID uid)
+ {
+ String user2dn = "uid=user"+uid+",ou=People," + baseDnStr;
+ return new String(
+ "dn: "+ user2dn + "\n"
+ + "objectClass: top\n" + "objectClass: person\n"
+ + "objectClass: organizationalPerson\n"
+ + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+ + "homePhone: 951-245-7634\n"
+ + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+ + "mobile: 027-085-0537\n"
+ + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+ + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
+ + "cn: Aaccf Amar2\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+ + "street: 17984 Thirteenth Street\n"
+ + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 2\n"
+ + "sn: Amar2\n" + "givenName: Aaccf2\n" + "postalCode: 85762\n"
+ + "userPassword: password\n" + "initials: AA\n");
+ }
+
+ static protected ReplicationMessage createAddMsg(short serverId)
+ {
+ Entry personWithUUIDEntry = null;
+ String user1entryUUID;
+ String baseUUID = null;
+ String user1dn;
+
+ /*
+ * Create a Change number generator to generate new changenumbers
+ * when we need to send operation messages to the replicationServer.
+ */
+ ChangeNumberGenerator gen = new ChangeNumberGenerator(serverId, 0);
+
+ user1entryUUID = "33333333-3333-3333-3333-333333333333";
+ user1dn = "uid=user1,ou=People," + baseDnStr;
+ String entryWithUUIDldif = "dn: "+ user1dn + "\n"
+ + "objectClass: top\n" + "objectClass: person\n"
+ + "objectClass: organizationalPerson\n"
+ + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+ + "homePhone: 951-245-7634\n"
+ + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+ + "mobile: 027-085-0537\n"
+ + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+ + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n"
+ + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+ + "street: 17984 Thirteenth Street\n"
+ + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+ + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+ + "userPassword: password\n" + "initials: AA\n"
+ + "entryUUID: " + user1entryUUID + "\n";
+
+ try
+ {
+ personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif);
+ }
+ catch(Exception e)
+ {
+ fail(e.getMessage());
+ }
+
+ // Create and publish an update message to add an entry.
+ AddMsg addMsg = new AddMsg(gen.newChangeNumber(),
+ personWithUUIDEntry.getDN().toString(),
+ user1entryUUID,
+ baseUUID,
+ personWithUUIDEntry.getObjectClassAttribute(),
+ personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+
+ return addMsg;
+ }
+
+ @Test(enabled=false)
+ public void testMultiRS() throws Exception
+ {
+ String testCase = "testMultiRS";
+ debugInfo("Starting " + testCase);
+
+ ReplicationDomain.clearJEBackend(false,
+ "userRoot",
+ baseDn.toNormalizedString());
+
+ debugInfo ("Creating 2 RS");
+ replServer1 = createReplicationServer(changelog1ID, true, testCase);
+ replServer2 = createReplicationServer(changelog2ID, true, testCase);
+ replServer3 = createReplicationServer(changelog3ID, true, testCase);
+ Thread.sleep(500);
+
+ debugInfo("Connecting DS to replServer1");
+ connectServer1ToChangelog(changelog1ID);
+ Thread.sleep(1500);
+
+ try
+ {
+ debugInfo("Connecting broker2 to replServer1");
+ broker2 = openReplicationSession(baseDn,
+ server2ID, 100, getChangelogPort(changelog1ID),
+ 1000, !emptyOldChanges);
+ Thread.sleep(1000);
+ }
+ catch(SocketException se)
+ {
+ fail("Broker connection is expected to be accepted.");
+ }
+
+ try
+ {
+ debugInfo("Connecting broker3 to replServer2");
+ broker3 = openReplicationSession(baseDn,
+ server3ID, 100, getChangelogPort(changelog2ID),
+ 1000, !emptyOldChanges);
+ Thread.sleep(1000);
+ }
+ catch(SocketException se)
+ {
+ fail("Broker connection is expected to be accepted.");
+ }
+
+ try
+ {
+ debugInfo("Connecting broker4 to replServer2");
+ broker4 = openReplicationSession(baseDn,
+ server4ID, 100, getChangelogPort(changelog2ID),
+ 1000, !emptyOldChanges);
+ Thread.sleep(1000);
+ }
+ catch(SocketException se)
+ {
+ fail("Broker connection is expected to be accepted.");
+ }
+
+ // Do a bunch of change
+ updatedEntries = newLDIFEntries();
+ this.addTestEntriesToDB(updatedEntries);
+
+ for (int i = 0; i<200; i++)
+ {
+ String ent1[] = { createEntry(UUID.randomUUID()) };
+ this.addTestEntriesToDB(ent1);
+ }
+
+ for (int i=0; i<10; i++)
+ {
+ broker3.publish(createAddMsg(server3ID));
+ }
+
+ searchMonitor();
+
+ debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
+ disconnectFromReplServer(changelog1ID);
+
+ debugInfo("Cleaning entries");
+ postTest();
+
+ debugInfo("Successfully ending " + testCase);
+ }
+
+ /**
+ * Disconnect broker and remove entries from the local DB
+ * @throws Exception
+ */
+ protected void postTest()
+ {
+ debugInfo("Post test cleaning.");
+
+ // Clean brokers
+ if (broker2 != null)
+ broker2.stop();
+ broker2 = null;
+ if (broker3 != null)
+ broker3.stop();
+ broker3 = null;
+ if (broker4 != null)
+ broker4.stop();
+ broker4 = null;
+
+ if (replServer1 != null)
+ replServer1.remove();
+ if (replServer2 != null)
+ replServer2.remove();
+ if (replServer3 != null)
+ replServer3.remove();
+ replServer1 = null;
+ replServer2 = null;
+ replServer3 = null;
+
+ super.cleanRealEntries();
+
+ try
+ {
+ ReplicationDomain.clearJEBackend(false,
+ replDomain.getBackend().getBackendID(),
+ baseDn.toNormalizedString());
+ }
+ catch (Exception e) {}
+ }
+
+ @Test(enabled=true)
+ public void MonitorTest() throws Exception
+ {
+ testMultiRS();
+ }
+
+ private static final ByteArrayOutputStream oStream =
+ new ByteArrayOutputStream();
+ private static final ByteArrayOutputStream eStream =
+ new ByteArrayOutputStream();
+
+ private void searchMonitor()
+ {
+ // test search as directory manager returns content
+ String[] args3 =
+ {
+ "-h", "127.0.0.1",
+ "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
+ "-D", "cn=Directory Manager",
+ "-w", "password",
+ "-b", "cn=monitor",
+ "-s", "sub",
+ "(base-dn=*)"
+ };
+
+ oStream.reset();
+ eStream.reset();
+ int retVal =
+ LDAPSearch.mainSearch(args3, false, oStream, eStream);
+ String entries = oStream.toString();
+ debugInfo("Entries:" + entries);
+ try
+ {
+ assertEquals(retVal, 0, "Returned error: " + eStream);
+ assertTrue(!entries.equalsIgnoreCase(""), "Returned entries: " + entries);
+ }
+ catch(Exception e)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ stackTraceToSingleLineString(new Exception()));
+ fail(e.getMessage());
+ }
+ }
+}
\ No newline at end of file
--
Gitblit v1.10.0