From 05d24dcca61eed7921987a98bb94d94a4aa030cd Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 01 Feb 2008 13:21:19 +0000
Subject: [PATCH] Fix 2598 - fixes for global replication monitoring
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java | 35 +
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 47 +
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 294 ++++++++----
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java | 199 ++++++-
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 400 ++++++++-------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java | 21
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java | 14
opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java | 50 +-
opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java | 329 +++++++++++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java | 2
opendj-sdk/opends/src/messages/messages/replication.properties | 3
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 4
14 files changed, 1,031 insertions(+), 371 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 39ddbcf..bdd615b 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -252,7 +252,8 @@
NOTICE_SSL_SERVER_CON_ATTEMPT_ERROR_105=SSL connection attempt from %s (%s) \
failed: %s
SEVERE_ERR_MISSING_REMOTE_MONITOR_DATA_106=Monitor data of remote servers \
- are missing due to an error in the retrieval process
+ are missing due to an error in the retrieval process. Potentially a server \
+ is too slow to provide its monitoring data over the protocol
SEVERE_ERR_PROCESSING_REMOTE_MONITOR_DATA_107=Monitor data of remote servers \
are missing due to a processing error : %s
SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index 1f8b711..04a11ef 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.common;
@@ -339,4 +339,23 @@
{
return list.isEmpty();
}
+
+ /**
+ * Make a duplicate of this state.
+ * @return The duplicate of this state.
+ */
+ public ServerState duplicate()
+ {
+ ServerState newState = new ServerState();
+ synchronized (this)
+ {
+ for (Short key : list.keySet())
+ {
+ ChangeNumber change = list.get(key);
+ Short id = change.getServerId();
+ newState.list.put(id,change);
+ }
+ }
+ return newState;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index a8eb808..a21ce43 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -433,7 +433,7 @@
ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
halfRcvWindow * 2, heartbeatInterval, state,
- protocolVersion, generationId, isSslEncryption);
+ protocolVersion, generationId, isSslEncryption, !keepConnection);
session.publish(msg);
/*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
index 2e50e35..89ef59a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
@@ -65,15 +65,20 @@
}
/**
- * Data structure to manage the state of the replication server
- * and the state informations for the LDAP servers connected.
+ * Data structure to manage the state of this replication server
+ * and the state informations for the servers connected to it.
*
*/
class SubTopoMonitorData
{
- ServerState replServerState;
+ // This replication server DbState
+ ServerState replServerDbState;
+ // The data related to the LDAP servers connected to this RS
HashMap<Short, ServerData> ldapStates =
new HashMap<Short, ServerData>();
+ // The data related to the RS servers connected to this RS
+ HashMap<Short, ServerData> rsStates =
+ new HashMap<Short, ServerData>();
}
SubTopoMonitorData data = new SubTopoMonitorData();;
@@ -93,9 +98,9 @@
* Sets the state of the replication server.
* @param state The state.
*/
- public void setReplServerState(ServerState state)
+ public void setReplServerDbState(ServerState state)
{
- data.replServerState = state;
+ data.replServerDbState = state;
}
/**
@@ -103,20 +108,27 @@
* @param serverId The serverID.
* @param state The server state.
* @param approxFirstMissingDate The approximation of the date
- * of the older missing change.
- *
+ * of the older missing change. null when none.
+ * @param isLDAP Specifies whether the server is a LS or a RS
*/
- public void setLDAPServerState(short serverId, ServerState state,
- Long approxFirstMissingDate)
+ public void setServerState(short serverId, ServerState state,
+ Long approxFirstMissingDate, boolean isLDAP)
{
if (data.ldapStates == null)
{
data.ldapStates = new HashMap<Short, ServerData>();
}
+ if (data.rsStates == null)
+ {
+ data.rsStates = new HashMap<Short, ServerData>();
+ }
ServerData sd = new ServerData();
sd.state = state;
sd.approxFirstMissingDate = approxFirstMissingDate;
- data.ldapStates.put(serverId, sd);
+ if (isLDAP)
+ data.ldapStates.put(serverId, sd);
+ else
+ data.rsStates.put(serverId, sd);
}
/**
@@ -130,16 +142,37 @@
}
/**
+ * Get the server state for the RS server with the provided serverId.
+ * @param serverId The provided serverId.
+ * @return The state.
+ */
+ public ServerState getRSServerState(short serverId)
+ {
+ return data.rsStates.get(serverId).state;
+ }
+
+
+ /**
* Get the approximation of the date of the older missing change for the
* LDAP Server with the provided server Id.
* @param serverId The provided serverId.
* @return The approximated state.
*/
- public Long getApproxFirstMissingDate(short serverId)
+ public Long getLDAPApproxFirstMissingDate(short serverId)
{
return data.ldapStates.get(serverId).approxFirstMissingDate;
}
+ /**
+ * Get the approximation of the date of the older missing change for the
+ * RS Server with the provided server Id.
+ * @param serverId The provided serverId.
+ * @return The approximated state.
+ */
+ public Long getRSApproxFirstMissingDate(short serverId)
+ {
+ return data.rsStates.get(serverId).approxFirstMissingDate;
+ }
/**
* Creates a new EntryMessage from its encoded form.
@@ -182,39 +215,51 @@
try
{
ASN1Sequence s0 = ASN1Sequence.decodeAsSequence(encodedS);
+ // loop on the servers
for (ASN1Element el0 : s0.elements())
{
ServerState newState = new ServerState();
short serverId = 0;
Long outime = (long)0;
+ boolean isLDAPServer = false;
ASN1Sequence s1 = el0.decodeAsSequence();
+
+ // loop on the list of CN of the state
for (ASN1Element el1 : s1.elements())
{
ASN1OctetString o = el1.decodeAsOctetString();
String s = o.stringValue();
ChangeNumber cn = new ChangeNumber(s);
- if ((data.replServerState != null) && (serverId == 0))
+ if ((data.replServerDbState != null) && (serverId == 0))
{
+ // we are on the first CN that is a fake CN to store the serverId
+ // and the older update time
serverId = cn.getServerId();
outime = cn.getTime();
+ isLDAPServer = (cn.getSeqnum()>0);
}
else
{
+ // we are on a normal CN
newState.update(cn);
}
}
- // the first state is the replication state
- if (data.replServerState == null)
+ if (data.replServerDbState == null)
{
- data.replServerState = newState;
+ // the first state is the replication state
+ data.replServerDbState = newState;
}
else
{
+ // the next states are the server states
ServerData sd = new ServerData();
sd.state = newState;
sd.approxFirstMissingDate = outime;
- data.ldapStates.put(serverId, sd);
+ if (isLDAPServer)
+ data.ldapStates.put(serverId, sd);
+ else
+ data.rsStates.put(serverId, sd);
}
}
} catch(Exception e)
@@ -245,14 +290,17 @@
ASN1Sequence stateElementSequence = new ASN1Sequence();
ArrayList<ASN1Element> stateElementList = new ArrayList<ASN1Element>();
- // First loop computes the length
+ /**
+ * First loop computes the length
+ */
+
/* Put the serverStates ... */
stateElementSequence = new ASN1Sequence();
stateElementList = new ArrayList<ASN1Element>();
/* first put the Replication Server state */
ArrayList<ASN1OctetString> cnOctetList =
- data.replServerState.toASN1ArrayList();
+ data.replServerDbState.toASN1ArrayList();
ArrayList<ASN1Element> cnElementList = new ArrayList<ASN1Element>();
for (ASN1OctetString soci : cnOctetList)
{
@@ -288,6 +336,35 @@
cnSequence = new ASN1Sequence(cnElementList);
stateElementList.add(cnSequence);
}
+
+ // then the rs server data
+ servers = data.rsStates.keySet();
+ for (Short sid : servers)
+ {
+ // State
+ ServerState statei = data.rsStates.get(sid).state;
+ // First missing date
+ Long outime = data.rsStates.get(sid).approxFirstMissingDate;
+
+ // retrieves the change numbers as an arrayList of ANSN1OctetString
+ cnOctetList = statei.toASN1ArrayList();
+ cnElementList = new ArrayList<ASN1Element>();
+
+ // a fake changenumber helps storing the LDAP server ID
+ // and the olderupdatetime
+ ChangeNumber cn = new ChangeNumber(outime,0,sid);
+ cnElementList.add(new ASN1OctetString(cn.toString()));
+
+ // the changenumbers
+ for (ASN1OctetString soci : cnOctetList)
+ {
+ cnElementList.add(soci);
+ }
+
+ cnSequence = new ASN1Sequence(cnElementList);
+ stateElementList.add(cnSequence);
+ }
+
stateElementSequence.setElements(stateElementList);
int seqLen = stateElementSequence.encode().length;
@@ -298,7 +375,9 @@
// Allocate the array sized from the computed length
byte[] resultByteArray = new byte[length];
- // Second loop build the array
+ /**
+ * Second loop really builds the array
+ */
/* put the type of the operation */
resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
@@ -313,7 +392,7 @@
/* first put the Replication Server state */
cnOctetList =
- data.replServerState.toASN1ArrayList();
+ data.replServerDbState.toASN1ArrayList();
cnElementList = new ArrayList<ASN1Element>();
for (ASN1OctetString soci : cnOctetList)
{
@@ -322,7 +401,7 @@
cnSequence = new ASN1Sequence(cnElementList);
stateElementList.add(cnSequence);
- // then the LDAP server state
+ // then the LDAP server datas
servers = data.ldapStates.keySet();
for (Short sid : servers)
{
@@ -334,10 +413,10 @@
cnElementList = new ArrayList<ASN1Element>();
// a fake changenumber helps storing the LDAP server ID
- ChangeNumber cn = new ChangeNumber(outime,0,sid);
+ ChangeNumber cn = new ChangeNumber(outime,1,sid);
cnElementList.add(new ASN1OctetString(cn.toString()));
- // the changenumbers
+ // the changenumbers that make the state
for (ASN1OctetString soci : cnOctetList)
{
cnElementList.add(soci);
@@ -346,6 +425,33 @@
cnSequence = new ASN1Sequence(cnElementList);
stateElementList.add(cnSequence);
}
+
+ // then the RS server datas
+ servers = data.rsStates.keySet();
+ for (Short sid : servers)
+ {
+ ServerState statei = data.rsStates.get(sid).state;
+ Long outime = data.rsStates.get(sid).approxFirstMissingDate;
+
+ // retrieves the change numbers as an arrayList of ANSN1OctetString
+ cnOctetList = statei.toASN1ArrayList();
+ cnElementList = new ArrayList<ASN1Element>();
+
+ // a fake changenumber helps storing the LDAP server ID
+ ChangeNumber cn = new ChangeNumber(outime,0,sid);
+ cnElementList.add(new ASN1OctetString(cn.toString()));
+
+ // the changenumbers that make the state
+ for (ASN1OctetString soci : cnOctetList)
+ {
+ cnElementList.add(soci);
+ }
+
+ cnSequence = new ASN1Sequence(cnElementList);
+ stateElementList.add(cnSequence);
+ }
+
+
stateElementSequence.setElements(stateElementList);
pos = addByteArray(stateElementSequence.encode(), resultByteArray, pos);
@@ -361,41 +467,62 @@
* Get the state of the replication server that sent this message.
* @return The state.
*/
- public ServerState getReplServerState()
+ public ServerState getReplServerDbState()
{
- return data.replServerState;
+ return data.replServerDbState;
}
/**
* Returns an iterator on the serverId of the connected LDAP servers.
* @return The iterator.
*/
- public Iterator<Short> iterator()
+ public Iterator<Short> ldapIterator()
{
return data.ldapStates.keySet().iterator();
}
/**
+ * Returns an iterator on the serverId of the connected RS servers.
+ * @return The iterator.
+ */
+ public Iterator<Short> rsIterator()
+ {
+ return data.rsStates.keySet().iterator();
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
public String toString()
{
- String stateS = " RState:";
- stateS += "/" + data.replServerState.toString();
- stateS += " LDAPStates:";
- Iterator<ServerData> it = data.ldapStates.values().iterator();
- while (it.hasNext())
+ String stateS = "\nRState:[";
+ stateS += data.replServerDbState.toString();
+ stateS += "]";
+
+ stateS += "\nLDAPStates:[";
+ for (Short sid : data.ldapStates.keySet())
{
- ServerData sd = it.next();
- stateS += "/ state=" + sd.state.toString()
- + " afmd=" + sd.approxFirstMissingDate + "] ";
+ ServerData sd = data.ldapStates.get(sid);
+ stateS +=
+ "\n[LSstate("+ sid + ")=" +
+ sd.state.toString() + "]" +
+ " afmd=" + sd.approxFirstMissingDate + "]";
}
+ stateS += "\nRSStates:[";
+ for (Short sid : data.rsStates.keySet())
+ {
+ ServerData sd = data.rsStates.get(sid);
+ stateS +=
+ "\n[RSState("+ sid + ")=" +
+ sd.state.toString() + "]" +
+ " afmd=" + sd.approxFirstMissingDate + "]";
+ }
String me = this.getClass().getCanonicalName() +
- " sender=" + this.senderID +
+ "[ sender=" + this.senderID +
" destination=" + this.destination +
- " states=" + stateS +
+ " data=[" + stateS + "]" +
"]";
return me;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
index 878973e..9ac6b32 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ * Portions Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -54,6 +54,7 @@
private int maxReceiveDelay;
private int maxSendDelay;
private int windowSize;
+ private boolean handshakeOnly;
private ServerState serverState = null;
/**
@@ -87,6 +88,8 @@
* @param generationId The generationId for this server.
* @param sslEncryption Whether to continue using SSL to encrypt messages
* after the start messages have been exchanged.
+ * @param handshakeOnly Whether this message is only to get an handshake
+ * with the server or not.
*/
public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
int maxReceiveQueue, int maxSendDelay,
@@ -95,7 +98,8 @@
ServerState serverState,
short protocolVersion,
long generationId,
- boolean sslEncryption)
+ boolean sslEncryption,
+ boolean handshakeOnly)
{
super(protocolVersion, generationId);
@@ -109,6 +113,7 @@
this.heartbeatInterval = heartbeatInterval;
this.sslEncryption = sslEncryption;
this.serverState = serverState;
+ this.handshakeOnly = handshakeOnly;
try
{
@@ -209,10 +214,19 @@
sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
pos += length +1;
+
+ /*
+ * read the handshakeOnly flag
+ */
+ length = getNextLength(in, pos);
+ handshakeOnly = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length +1;
+
/*
* read the ServerState
*/
serverState = new ServerState(in, pos, in.length-1);
+
} catch (UnsupportedEncodingException e)
{
throw new DataFormatException("UTF-8 is not supported by this jvm.");
@@ -322,6 +336,8 @@
byte[] byteSSLEncryption =
String.valueOf(sslEncryption).getBytes("UTF-8");
byte[] byteServerState = serverState.getBytes();
+ byte[] byteHandshakeOnly =
+ String.valueOf(handshakeOnly).getBytes("UTF-8");
int length = byteDn.length + 1 + byteServerId.length + 1 +
byteServerUrl.length + 1 +
@@ -332,6 +348,7 @@
byteWindowSize.length + 1 +
byteHeartbeatInterval.length + 1 +
byteSSLEncryption.length + 1 +
+ byteHandshakeOnly.length + 1 +
byteServerState.length + 1;
/* encode the header in a byte[] large enough to also contain the mods */
@@ -358,6 +375,8 @@
pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
+ pos = addByteArray(byteHandshakeOnly, resultByteArray, pos);
+
pos = addByteArray(byteServerState, resultByteArray, pos);
return resultByteArray;
@@ -401,4 +420,16 @@
{
return sslEncryption;
}
+
+ /**
+ * Get the SSL encryption value for the ldap server that created the
+ * message.
+ *
+ * @return The SSL encryption value for the ldap server that created the
+ * message.
+ */
+ public boolean isHandshakeOnly()
+ {
+ return handshakeOnly;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
index 0c42763..077a79c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -200,16 +200,6 @@
@Override
public ArrayList<Attribute> getMonitorData()
{
- if (debugEnabled())
- TRACER.debugInfo(
- "In " +
- this.replServerHandler.getDomain().getReplicationServer().
- getMonitorInstanceName()+
- " LWSH for remote server " + this.serverId +
- " connected to:" + this.replServerHandler.getMonitorInstanceName() +
- " getMonitor data");
-
-
ArrayList<Attribute> attributes = new ArrayList<Attribute>();
attributes.add(new Attribute("server-id",
@@ -220,12 +210,12 @@
replServerHandler.getMonitorInstanceName()));
// Retrieves the topology counters
+ MonitorData md;
try
{
- rsDomain.retrievesRemoteMonitorData();
+ md = rsDomain.getMonitorData();
- // Compute the latency for the current SH
- ServerState remoteState = rsDomain.getServerState(serverId);
+ ServerState remoteState = md.getLDAPServerState(serverId);
if (remoteState == null)
{
remoteState = new ServerState();
@@ -241,29 +231,39 @@
{
values.add(new AttributeValue(type,str));
}
+ if (values.size() == 0)
+ {
+ values.add(new AttributeValue(type,"unknown"));
+ }
Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
attributes.add(attr);
- // add the latency attribute to our monitor data
- // Compute the latency for the current SH
- int missingChanges = rsDomain.getMissingChanges(remoteState);
- attributes.add(new Attribute("missing-changes",
- String.valueOf(missingChanges)));
-
- // Add the oldest missing update
- Long olderUpdateTime = rsDomain.getApproxFirstMissingDate(serverId);
- if (olderUpdateTime != null)
+ // Oldest missing update
+ Long approxFirstMissingDate=md.getApproxFirstMissingDate(serverId);
+ if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
{
- Date date = new Date(olderUpdateTime);
+ Date date = new Date(approxFirstMissingDate);
attributes.add(new Attribute("approx-older-change-not-synchronized",
date.toString()));
attributes.add(
- new Attribute("approx-older-change-not-synchronized-millis",
- String.valueOf(olderUpdateTime)));
+ new Attribute("approx-older-change-not-synchronized-millis",
+ String.valueOf(approxFirstMissingDate)));
}
+
+ // Missing changes
+ long missingChanges = md.getMissingChanges(serverId);
+ attributes.add(new Attribute("missing-changes",
+ String.valueOf(missingChanges)));
+
+ // Replication delay
+ long delay = md.getApproxDelay(serverId);
+ attributes.add(new Attribute("approximate-delay",
+ String.valueOf(delay)));
+
}
catch(Exception e)
{
+ // TODO: improve the log
// We failed retrieving the remote monitor data.
attributes.add(new Attribute("error",
stackTraceToSingleLineString(e)));
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java
new file mode 100644
index 0000000..0801257
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -0,0 +1,329 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.server;
+
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.util.TimeThread;
+
+/**
+ * This class defines the Monitor Data that are consolidated across the
+ * whole replication topology.
+ */
+public class MonitorData
+{
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
+ /**
+ *
+ * - For each server, the max (most recent) CN produced
+ *
+ * - For each server, its state i.e. the last processed from of each
+ * other LDAP server.
+ * The change latency (missing changes) will be
+ * the difference between the max above and the state here
+ *
+ * - For each server, the date of the first missing change.
+ * The time latency (delay) will be the difference between now and the
+ * date of the first missing change.
+ */
+
+ /* The date of the last time they have been elaborated */
+ private long buildDate = 0;
+
+ // For each LDAP server, its server state
+ private ConcurrentHashMap<Short, ServerState> LDAPStates =
+ new ConcurrentHashMap<Short, ServerState>();
+
+ // For each LDAP server, the last(max) CN it published
+ private ConcurrentHashMap<Short, ChangeNumber> maxCNs =
+ new ConcurrentHashMap<Short, ChangeNumber>();
+
+ // For each LDAP server, an approximation of the date of the first missing
+ // change
+ private ConcurrentHashMap<Short, Long> fmd =
+ new ConcurrentHashMap<Short, Long>();
+
+ private ConcurrentHashMap<Short, Long> missingChanges =
+ new ConcurrentHashMap<Short, Long>();
+
+ // For each RS server, an approximation of the date of the first missing
+ // change
+ private ConcurrentHashMap<Short, Long> fmRSDate =
+ new ConcurrentHashMap<Short, Long>();
+
+
+ /**
+ * Get an approximation of the latency delay of the replication.
+ * @param serverId The server ID.
+ * @return The delay
+ */
+ public long getApproxDelay(short serverId)
+ {
+ Long afmd = fmd.get(serverId);
+ if ((afmd != null) && (afmd>0))
+ return ((this.getBuildDate() - afmd)/1000);
+ else
+ return 0;
+ }
+
+ /**
+ * Get an approximation of the date of the first missing update.
+ * @param serverId The server ID.
+ * @return The date.
+ */
+ public long getApproxFirstMissingDate(short serverId)
+ {
+ Long res;
+ if ((res = fmd.get(serverId)) != null)
+ return res;
+ return 0;
+ }
+
+ /**
+ * Get the number of missing changes.
+ * @param serverId The server ID.
+ * @return The number of missing changes.
+ */
+ public long getMissingChanges(short serverId)
+ {
+ Long res = missingChanges.get(serverId);
+ if (res==null)
+ return 0;
+ else
+ return res;
+ }
+
+ /**
+ * Build the monitor data that are computed from the collected ones.
+ */
+ public void completeComputing()
+ {
+ String mds = "";
+
+ // Computes the missing changes counters
+ // For each LSi ,
+ // Regarding each other LSj
+ // Sum the difference : max(LSj) - state(LSi)
+
+ Iterator<Short> lsiStateItr = this.LDAPStates.keySet().iterator();
+ while (lsiStateItr.hasNext())
+ {
+ Short lsiSid = lsiStateItr.next();
+ ServerState lsiState = this.LDAPStates.get(lsiSid);
+ Long lsiMissingChanges = (long)0;
+ if (lsiState != null)
+ {
+ Iterator<Short> lsjMaxItr = this.maxCNs.keySet().iterator();
+ while (lsjMaxItr.hasNext())
+ {
+ Short lsjSid = lsjMaxItr.next();
+ ChangeNumber lsjMaxCN = this.maxCNs.get(lsjSid);
+ ChangeNumber lsiLastCN = lsiState.getMaxChangeNumber(lsjSid);
+
+ int missingChangesLsiLsj =
+ ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN);
+
+ mds +=
+ "+ diff("+lsjMaxCN+"-"
+ +lsiLastCN+")="+missingChangesLsiLsj;
+
+ lsiMissingChanges += missingChangesLsiLsj;
+ }
+ }
+ mds += "=" + lsiMissingChanges;
+ this.missingChanges.put(lsiSid,lsiMissingChanges);
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "Complete monitor data : Missing changes ("+ lsiSid +")=" + mds);
+ }
+ this.setBuildDate(TimeThread.getTime());
+ }
+
+ /**
+ * Returns a <code>String</code> object representing this
+ * object's value.
+ * @return a string representation of the value of this object in
+ */
+ public String toString()
+ {
+ String mds = "Monitor data=\n";
+
+ mds+= "Build date=" + this.getBuildDate();
+ // RS data
+ Iterator<Short> rsite = fmRSDate.keySet().iterator();
+ while (rsite.hasNext())
+ {
+ Short sid = rsite.next();
+ mds += "\nRSData(" + sid + ")=\t "+ "afmd=" + fmRSDate.get(sid);
+ }
+
+ // maxCNs
+ Iterator<Short> itc = maxCNs.keySet().iterator();
+ while (itc.hasNext())
+ {
+ Short sid = itc.next();
+ ChangeNumber cn = maxCNs.get(sid);
+ mds += "\nmaxCNs(" + sid + ")= " + cn.toString();
+ }
+
+ // LDAP data
+ Iterator<Short> lsite = LDAPStates.keySet().iterator();
+ while (lsite.hasNext())
+ {
+ Short sid = lsite.next();
+ ServerState ss = LDAPStates.get(sid);
+ mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString()
+ + "] afmd=" + this.getApproxFirstMissingDate(sid);
+ if (getBuildDate()>0)
+ {
+ mds += " missingDelay=" + this.getApproxDelay(sid);
+ }
+ mds +=" missingCount=" + missingChanges.get(sid);
+ }
+ //
+ mds += "--";
+ return mds;
+ }
+
+ /**
+ * Sets the build date of the data.
+ * @param buildDate The date.
+ */
+ public void setBuildDate(long buildDate)
+ {
+ this.buildDate = buildDate;
+ }
+
+ /**
+ * Returns the build date of the data.
+ * @return The date.
+ */
+ public long getBuildDate()
+ {
+ return buildDate;
+ }
+
+ /**
+ * From a provided state, sets the max CN of the monitor data.
+ * @param state the provided state.
+ */
+ public void setMaxCNs(ServerState state)
+ {
+ Iterator<Short> it = state.iterator();
+ while (it.hasNext())
+ {
+ short sid = it.next();
+ ChangeNumber newCN = state.getMaxChangeNumber(sid);
+ setMaxCN(sid, newCN);
+ }
+ }
+
+ /**
+ * For the provided serverId, sets the provided CN as the max if
+ * it is newer than the current max.
+ * @param serverId the provided serverId
+ * @param newCN the provided new CN
+ */
+ public void setMaxCN(short serverId, ChangeNumber newCN)
+ {
+ if (newCN==null) return;
+ ChangeNumber currentMaxCN = maxCNs.get(serverId);
+ if (currentMaxCN == null)
+ {
+ maxCNs.put(serverId, newCN);
+ }
+ else
+ {
+ if (newCN.newer(currentMaxCN))
+ maxCNs.replace(serverId, newCN);
+ }
+ }
+
+ /**
+ * Get the highest know change number of the LDAP server with the provided
+ * serverId.
+ * @param serverId The server ID.
+ * @return The highest change number.
+ */
+ public ChangeNumber getMaxCN(short serverId)
+ {
+ return maxCNs.get(serverId);
+ }
+
+ /**
+ * Get the state of the LDAP server with the provided serverId.
+ * @param serverId The server ID.
+ * @return The server state.
+ */
+ public ServerState getLDAPServerState(short serverId)
+ {
+ return LDAPStates.get(serverId);
+ }
+
+ /**
+ * Set the state of the LDAP server with the provided serverId.
+ * @param serverId The server ID.
+ * @param state The server state.
+ */
+ public void setLDAPServerState(short serverId, ServerState state)
+ {
+ LDAPStates.put(serverId, state);
+ }
+
+ /**
+ * Set the state of the LDAP server with the provided serverId.
+ * @param serverId The server ID.
+ * @param newFmd The first missing date.
+ */
+ public void setFirstMissingDate(short serverId, Long newFmd)
+ {
+ if (newFmd==null) return;
+ Long currentfmd = fmd.get(serverId);
+ if (currentfmd==null)
+ {
+ fmd.put(serverId, newFmd);
+ }
+ else
+ {
+ if ((newFmd!=0) && (newFmd<currentfmd))
+ fmd.replace(serverId, newFmd);
+ }
+ }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 89726d4..77e5d5c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,7 +37,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -129,8 +128,8 @@
/* Monitor data management */
- // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
- private long remoteMonitorDataLifeTime = 500;
+ // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+ private long monitorDataLifeTime = 500;
/* Search op on monitor data is processed by a worker thread.
* Requests are sent to the other RS,and responses are received by the
@@ -139,21 +138,11 @@
*/
Semaphore remoteMonitorResponsesSemaphore;
- /* The date of the last time they have been elaborated */
- private long validityDate = 0;
-
- // For each LDAP server, its server state
- private HashMap<Short, ServerState> LDAPStates =
- new HashMap<Short, ServerState>();
-
- // For each LDAP server, the last CN it published
- private HashMap<Short, ChangeNumber> maxCNs =
- new HashMap<Short, ChangeNumber>();
-
- // For each LDAP server, an approximation of the date of the first missing
- // change
- private HashMap<Short, Long> approxFirstMissingDate =
- new HashMap<Short, Long>();
+ /**
+ * The monitor data consolidated over the topology.
+ */
+ private MonitorData monitorData = new MonitorData();
+ private MonitorData wrkMonitorData;
/**
* Creates a new ReplicationServerDomain associated to the DN baseDn.
@@ -166,13 +155,7 @@
{
this.baseDn = baseDn;
this.replicationServer = replicationServer;
-
- if (debugEnabled())
- TRACER.debugInfo(
- "In " + this.replicationServer.getMonitorInstanceName() +
- " Created Cache for " + baseDn + " " +
- stackTraceToSingleLineString(new Exception()));
-}
+ }
/**
* Add an update that has been received to the list of
@@ -366,6 +349,10 @@
{
replicationServers.remove(handler.getServerId());
handler.stopHandler();
+
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
}
}
else
@@ -374,12 +361,12 @@
{
connectedServers.remove(handler.getServerId());
handler.stopHandler();
+
+ // Update the remote replication servers with our list
+ // of connected LDAP servers
+ sendReplServerInfo();
}
}
-
- // Update the remote replication servers with our list
- // of connected LDAP servers
- sendReplServerInfo();
}
/**
@@ -578,7 +565,8 @@
*
* @param serverId Identifier of the server for which the iterator is created.
* @param changeNumber Starting point for the iterator.
- * @return the created ReplicationIterator.
+ * @return the created ReplicationIterator. Null when no DB is available
+ * for the provided server Id.
*/
public ReplicationIterator getChangelogIterator(short serverId,
ChangeNumber changeNumber)
@@ -591,7 +579,8 @@
{
return handler.generateIterator(changeNumber);
}
- catch (Exception e) {
+ catch (Exception e)
+ {
return null;
}
}
@@ -759,6 +748,7 @@
*/
public void process(RoutableMessage msg, ServerHandler senderHandler)
{
+
// Test the message for which a ReplicationServer is expected
// to be the destination
if (msg.getDestination() == this.replicationServer.getServerId())
@@ -779,20 +769,33 @@
replServerMonitorRequestMsg.getDestination(),
replServerMonitorRequestMsg.getsenderID());
- // Populate the RS state in the msg from the DbState
- monitorMsg.setReplServerState(this.getDbServerState());
-
// Populate for each connected LDAP Server
// from the states stored in the serverHandler.
// - the server state
// - the older missing change
for (ServerHandler lsh : this.connectedServers.values())
{
- monitorMsg.setLDAPServerState(
+ monitorMsg.setServerState(
lsh.getServerId(),
lsh.getServerState(),
- lsh.getApproxFirstMissingDate());
+ lsh.getApproxFirstMissingDate(),
+ true);
}
+
+ // Same for the connected RS
+ for (ServerHandler rsh : this.replicationServers.values())
+ {
+ monitorMsg.setServerState(
+ rsh.getServerId(),
+ rsh.getServerState(),
+ rsh.getApproxFirstMissingDate(),
+ false);
+ }
+
+ // Populate the RS state in the msg from the DbState
+ monitorMsg.setReplServerDbState(this.getDbServerState());
+
+
try
{
senderHandler.send(monitorMsg);
@@ -1305,118 +1308,135 @@
}
}
- /*
+ /* =======================
* Monitor Data generation
+ * =======================
*/
/**
- * Retrieves the remote monitor data.
- *
+ * Retrieves the global monitor data.
+ * @return The monitor data.
* @throws DirectoryException When an error occurs.
*/
- protected void retrievesRemoteMonitorData()
+ synchronized protected MonitorData getMonitorData()
throws DirectoryException
{
- if (validityDate > TimeThread.getTime())
+ if (monitorData.getBuildDate() + monitorDataLifeTime
+ > TimeThread.getTime())
{
- // The current data are still valid. No need to renew them.
- return;
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " getRemoteMonitorData in cache");
+ // The current data are still valid. No need to renew them.
+ // FIXME
+ return null;
}
- // Clean
- this.LDAPStates.clear();
- this.maxCNs.clear();
-
- // Init the maxCNs of our direct LDAP servers from our own dbstate
- for (ServerHandler rs : connectedServers.values())
+ wrkMonitorData = new MonitorData();
+ synchronized(wrkMonitorData)
{
- short serverID = rs.getServerId();
- ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
- if (cn == null)
- {
- // we have nothing in db for that server
- cn = new ChangeNumber(0, 0 , serverID);
- }
- this.maxCNs.put(serverID, cn);
- }
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " Computing monitor data ");
- ServerState replServerState = this.getDbServerState();
- Iterator<Short> it = replServerState.iterator();
- while (it.hasNext())
- {
- short sid = it.next();
- ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
- ChangeNumber maxCN = this.maxCNs.get(sid);
- if ((maxCN != null) && (receivedCN.newer(maxCN)))
+ // Let's process our directly connected LSes
+ // - in the ServerHandler for a given LS1, the stored state contains :
+ // - the max CN produced by LS1
+ // - the last CN consumed by LS1 from LS2..n
+ // - in the RSdomain/dbHandler, the built-in state contains :
+ // - the max CN produced by each server
+ // So for a given LS connected we can take the state and the max from
+ // the LS/state.
+
+ for (ServerHandler directlsh : connectedServers.values())
{
- // We found a newer one
- this.maxCNs.remove(sid);
- this.maxCNs.put(sid, receivedCN);
+ short serverID = directlsh.getServerId();
+
+ // the state comes from the state stored in the SH
+ ServerState directlshState = directlsh.getServerState().duplicate();
+
+ // the max CN sent by that LS also comes from the SH
+ ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
+ if (maxcn == null)
+ {
+ // This directly connected LS has never produced any change
+ maxcn = new ChangeNumber(0, 0 , serverID);
+ }
+ wrkMonitorData.setMaxCN(serverID, maxcn);
+ wrkMonitorData.setLDAPServerState(serverID, directlshState);
+ wrkMonitorData.setFirstMissingDate(serverID, directlsh.
+ getApproxFirstMissingDate());
}
+
+ // Then initialize the max CN for the LS that produced something
+ // - from our own local db state
+ // - whatever they are directly or undirectly connected
+ ServerState dbServerState = getDbServerState();
+ Iterator<Short> it = dbServerState.iterator();
+ while (it.hasNext())
+ {
+ short sid = it.next();
+ ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
+ wrkMonitorData.setMaxCN(sid, storedCN);
+ }
+
+ // Now we have used all available local informations
+ // and we need the remote ones.
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn + " Local monitor data: " +
+ wrkMonitorData.toString());
}
// Send Request to the other Replication Servers
if (remoteMonitorResponsesSemaphore == null)
{
- remoteMonitorResponsesSemaphore = new Semaphore(
- replicationServers.size() -1);
-
- sendMonitorDataRequest();
-
+ remoteMonitorResponsesSemaphore = new Semaphore(0);
+ short requestCnt = sendMonitorDataRequest();
// Wait reponses from them or timeout
- waitMonitorDataResponses(replicationServers.size());
+ waitMonitorDataResponses(requestCnt);
}
else
{
// The processing of renewing the monitor cache is already running
// We'll make it sleeping until the end
+ // TODO: unit test for this case.
while (remoteMonitorResponsesSemaphore!=null)
{
waitMonitorDataResponses(1);
}
}
- // Now we have the expected answers of an error occured
- validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
+ wrkMonitorData.completeComputing();
- if (debugEnabled())
+ // Store the new computed data as the reference
+ synchronized(monitorData)
{
- debugMonitorData();
- }
- }
-
- private void debugMonitorData()
- {
- String mds = " Monitor data=";
- Iterator<Short> ite = LDAPStates.keySet().iterator();
- while (ite.hasNext())
- {
- Short sid = ite.next();
- ServerState ss = LDAPStates.get(sid);
- mds += " LDAPState(" + sid + ")=" + ss.toString();
- }
- Iterator<Short> itc = maxCNs.keySet().iterator();
- while (itc.hasNext())
- {
- Short sid = itc.next();
- ChangeNumber cn = maxCNs.get(sid);
- mds += " maxCNs(" + sid + ")=" + cn.toString();
- }
-
- mds += "--";
- TRACER.debugInfo(
+ // Now we have the expected answers or an error occured
+ monitorData = wrkMonitorData;
+ wrkMonitorData = null;
+ if (debugEnabled())
+ TRACER.debugInfo(
"In " + this.replicationServer.getMonitorInstanceName() +
- " baseDN=" + baseDn +
- mds);
+ " baseDn=" + baseDn + " *** Computed MonitorData: " +
+ monitorData.toString());
+ }
+ return monitorData;
}
+
/**
* Sends a MonitorRequest message to all connected RS.
+ * @return the number of requests sent.
* @throws DirectoryException when a problem occurs.
*/
- protected void sendMonitorDataRequest()
+ protected short sendMonitorDataRequest()
throws DirectoryException
{
+ short sent=0;
try
{
for (ServerHandler rs : replicationServers.values())
@@ -1425,6 +1445,7 @@
MonitorRequestMessage(this.replicationServer.getServerId(),
rs.getServerId());
rs.send(msg);
+ sent++;
}
}
catch(Exception e)
@@ -1434,6 +1455,7 @@
throw new DirectoryException(ResultCode.OTHER,
message, e);
}
+ return sent;
}
/**
@@ -1446,21 +1468,30 @@
{
try
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn +
+ " waiting for " + expectedResponses
+ + " expected monitor messages");
+
boolean allPermitsAcquired =
remoteMonitorResponsesSemaphore.tryAcquire(
expectedResponses,
- (long) 500, TimeUnit.MILLISECONDS);
+ (long) 5000, TimeUnit.MILLISECONDS);
if (!allPermitsAcquired)
{
logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+ // FIXME let's go on in best effort even with limited data received.
}
else
{
if (debugEnabled())
TRACER.debugInfo(
"In " + this.replicationServer.getMonitorInstanceName() +
- "Successfully received all " + replicationServers.size()
+ " baseDn=" + baseDn +
+ " Successfully received all " + expectedResponses
+ " expected monitor messages");
}
}
@@ -1482,48 +1513,94 @@
*/
public void receivesMonitorDataResponse(MonitorMessage msg)
{
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ "Receiving " + msg + " from " + msg.getsenderID() +
+ remoteMonitorResponsesSemaphore);
+
if (remoteMonitorResponsesSemaphore == null)
{
- // Ignoring the remote monitor data because an error occured previously
+ // FIXME
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ "Receiving " + msg + " from " + msg.getsenderID() +
+ " remoteMonitorResponsesSemaphore should not be null"));
+ // Ignoring the remote monitor data because an error occured
+ // previously
return;
}
try
{
- // Here is the RS state : list <serverID, lastChangeNumber>
- // For each LDAP Server, we keep the max CN accross the RSes
- ServerState replServerState = msg.getReplServerState();
- Iterator<Short> it = replServerState.iterator();
- while (it.hasNext())
+ synchronized(wrkMonitorData)
{
- short sid = it.next();
- ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
- ChangeNumber maxCN = this.maxCNs.get(sid);
- if (receivedCN.newer(maxCN))
- {
- // We found a newer one
- this.maxCNs.remove(sid);
- this.maxCNs.put(sid, receivedCN);
- }
- }
+ // Here is the RS state : list <serverID, lastChangeNumber>
+ // For each LDAP Server, we keep the max CN accross the RSes
+ ServerState replServerState = msg.getReplServerDbState();
+ wrkMonitorData.setMaxCNs(replServerState);
- // Store the LDAP servers states
- Iterator<Short> sidIterator = msg.iterator();
- while (sidIterator.hasNext())
- {
- short sid = sidIterator.next();
- ServerState ss = msg.getLDAPServerState(sid);
- this.LDAPStates.put(sid, ss);
- this.approxFirstMissingDate.put(sid,
- msg.getApproxFirstMissingDate(sid));
+ // Store the remote LDAP servers states
+ Iterator<Short> lsidIterator = msg.ldapIterator();
+ while (lsidIterator.hasNext())
+ {
+ short sid = lsidIterator.next();
+ wrkMonitorData.setLDAPServerState(sid,
+ msg.getLDAPServerState(sid).duplicate());
+ wrkMonitorData.setFirstMissingDate(sid,
+ msg.getLDAPApproxFirstMissingDate(sid));
+ }
+
+ // Process the latency reported by the remote RSi on its connections
+ // to the other RSes
+ Iterator<Short> rsidIterator = msg.rsIterator();
+ while (rsidIterator.hasNext())
+ {
+ short rsid = rsidIterator.next();
+ if (rsid == replicationServer.getServerId())
+ {
+ // this is the latency of the remote RSi regarding the current RS
+ // let's update the fmd of my connected LS
+ for (ServerHandler connectedlsh : connectedServers.values())
+ {
+ short connectedlsid = connectedlsh.getServerId();
+ Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
+ wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
+ }
+ }
+ else
+ {
+ // this is the latency of the remote RSi regarding another RSj
+ // let's update the latency of the LSes connected to RSj
+ ServerHandler rsjHdr = replicationServers.get(rsid);
+ for(short remotelsid : rsjHdr.getConnectedServerIds())
+ {
+ Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
+ wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
+ }
+ }
+ }
+ if (debugEnabled())
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " baseDn=" + baseDn +
+ " Processed msg from " + msg.getsenderID() +
+ " New monitor data: " + wrkMonitorData.toString());
+ }
}
// Decreases the number of expected responses and potentially
// wakes up the waiting requestor thread.
remoteMonitorResponsesSemaphore.release();
+
}
catch (Exception e)
{
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
+ stackTraceToSingleLineString(e)));
+
// If an exception occurs while processing one of the expected message,
// the processing is aborted and the waiting thread is awoke.
remoteMonitorResponsesSemaphore.notifyAll();
@@ -1531,65 +1608,6 @@
}
/**
- * Get the state of the LDAP server with the provided serverId.
- * @param serverId The server ID.
- * @return The server state.
- */
- public ServerState getServerState(short serverId)
- {
- return LDAPStates.get(serverId);
- }
-
- /**
- * Get the highest know change number of the LDAP server with the provided
- * serverId.
- * @param serverId The server ID.
- * @return The highest change number.
- */
- public ChangeNumber getMaxCN(short serverId)
- {
- return maxCNs.get(serverId);
- }
-
- /**
- * Get an approximation of the date of the oldest missing changes.
- * serverId.
- * @param serverId The server ID.
- * @return The approximation of the date of the oldest missing change.
- */
- public Long getApproxFirstMissingDate(short serverId)
- {
- return approxFirstMissingDate.get(serverId);
- }
-
- /**
- * Get the number of missing change for the server with the provided state.
- * @param state The provided server state.
- * @return The number of missing changes.
- */
- public int getMissingChanges(ServerState state)
- {
- // Traverse the max Cn transmitted by each server
- // For each server, get the highest CN know from the current server
- // Sum the difference betwenn the max and the last
- int missingChanges = 0;
- Iterator<Short> itc = maxCNs.keySet().iterator();
- while (itc.hasNext())
- {
- Short sid = itc.next();
- ChangeNumber maxCN = maxCNs.get(sid);
- ChangeNumber last = state.getMaxChangeNumber(sid);
- if (last == null)
- {
- last = new ChangeNumber(0,0, sid);
- }
- int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
- missingChanges += missingChangesFromSID;
- }
- return missingChanges;
- }
-
- /**
* Set the purge delay on all the db Handlers for this Domain
* of Replicaiton.
*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 3b85cb1..0e759c0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -45,6 +45,7 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -124,12 +125,12 @@
/**
- * When this Handler is connected to a remote replication server
+ * When this Handler is related to a remote replication server
* this collection will contain as many elements as there are
* LDAP servers connected to the remote replication server.
*/
- private List<LightweightServerHandler>
- remoteLDAPservers = new ArrayList<LightweightServerHandler>();
+ private Map<Short, LightweightServerHandler> connectedServers =
+ new ConcurrentHashMap<Short, LightweightServerHandler>();
/**
* The time in milliseconds between heartbeats from the replication
@@ -200,6 +201,8 @@
maxRcvWindow = windowSize;
rcvWindow = windowSize;
long localGenerationId = -1;
+ boolean handshakeOnly = false;
+
try
{
if (baseDn != null)
@@ -244,6 +247,8 @@
maxSendQueue = receivedMsg.getMaxSendQueue();
heartbeatInterval = receivedMsg.getHeartbeatInterval();
+ handshakeOnly = receivedMsg.isHandshakeOnly();
+
// The session initiator decides whether to use SSL.
sslEncryption = receivedMsg.getSSLEncryption();
@@ -524,60 +529,70 @@
replicationServerDomain = replicationServer.
getReplicationServerDomain(this.baseDn,true);
- boolean started;
- if (serverIsLDAPserver)
+ if (!handshakeOnly)
{
- started = replicationServerDomain.startServer(this);
- }
- else
- {
- started = replicationServerDomain.startReplicationServer(this);
- }
-
- if (started)
- {
- // sendWindow MUST be created before starting the writer
- sendWindow = new Semaphore(sendWindowSize);
-
- writer = new ServerWriter(session, serverId,
- this, replicationServerDomain);
- reader = new ServerReader(session, serverId,
- this, replicationServerDomain);
-
- reader.start();
- writer.start();
-
- // Create a thread to send heartbeat messages.
- if (heartbeatInterval > 0)
+ boolean started;
+ if (serverIsLDAPserver)
{
- heartbeatThread = new HeartbeatThread(
- "replication Heartbeat to " + serverURL +
- " for " + this.baseDn,
- session, heartbeatInterval/3);
- heartbeatThread.start();
+ started = replicationServerDomain.startServer(this);
+ }
+ else
+ {
+ started = replicationServerDomain.startReplicationServer(this);
}
-
- DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
- DirectoryServer.registerMonitorProvider(this);
- }
- else
- {
- // the connection is not valid, close it.
- try
+ if (started)
{
- if (debugEnabled())
+ // sendWindow MUST be created before starting the writer
+ sendWindow = new Semaphore(sendWindowSize);
+
+ writer = new ServerWriter(session, serverId,
+ this, replicationServerDomain);
+ reader = new ServerReader(session, serverId,
+ this, replicationServerDomain);
+
+ reader.start();
+ writer.start();
+
+ // Create a thread to send heartbeat messages.
+ if (heartbeatInterval > 0)
{
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + " RS failed to start locally " +
- " the connection from serverID="+serverId);
+ heartbeatThread = new HeartbeatThread(
+ "replication Heartbeat to " + serverURL +
+ " for " + this.baseDn,
+ session, heartbeatInterval/3);
+ heartbeatThread.start();
}
- session.close();
- } catch (IOException e1)
- {
- // ignore
+
+ DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+ DirectoryServer.registerMonitorProvider(this);
}
+ else
+ {
+ // the connection is not valid, close it.
+ try
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
+ getMonitorInstanceName() + " RS failed to start locally " +
+ " the connection from serverID="+serverId);
+ }
+ session.close();
+ } catch (IOException e1)
+ {
+ // ignore
+ }
+ }
+ }
+ else
+ {
+ // For a hanshakeOnly connection, let's only create a reader
+ // in order to detect the connection closure.
+ reader = new ServerReader(session, serverId,
+ this, replicationServerDomain);
+ reader.start();
}
}
catch (Exception e)
@@ -842,22 +857,22 @@
/**
* Get the age of the older change that has not yet been replicated
* to the server handled by this ServerHandler.
- *
* @return The age if the older change has not yet been replicated
* to the server handled by this ServerHandler.
*/
public Long getApproxFirstMissingDate()
{
- // Get the older CN received
- // From it, get the next sequence number
- // Get the CN for the next sequence number
- // If not present in the local RS db,
- // then approximate with the older update time
- ChangeNumber olderUpdateCN = getOlderUpdateCN();
- if (olderUpdateCN == null)
- return null;
+ Long result = (long)0;
- return olderUpdateCN.getTime();
+ // Get the older CN received
+ ChangeNumber olderUpdateCN = getOlderUpdateCN();
+ if (olderUpdateCN != null)
+ {
+ // If not present in the local RS db,
+ // then approximate with the older update time
+ result=olderUpdateCN.getTime();
+ }
+ return result;
}
/**
@@ -874,29 +889,82 @@
/**
* Get the older Change Number for that server.
+ * Returns null when the queue is empty.
* @return The older change number.
*/
public ChangeNumber getOlderUpdateCN()
{
+ ChangeNumber result = null;
synchronized (msgQueue)
{
if (isFollowing())
{
if (msgQueue.isEmpty())
- return null;
-
- UpdateMessage msg = msgQueue.first();
- return msg.getChangeNumber();
+ {
+ result=null;
+ }
+ else
+ {
+ UpdateMessage msg = msgQueue.first();
+ result = msg.getChangeNumber();
+ }
}
else
{
if (lateQueue.isEmpty())
- return null;
+ {
+ // isFollowing is false AND lateQueue is empty
+ // We may be at the very moment when the writer has emptyed the
+ // lateQueue when it sent the last update. The writer will fill again
+ // the lateQueue when it will send the next update but we are not yet
+ // there. So let's take the last change not sent directly from
+ // the db.
- UpdateMessage msg = lateQueue.first();
- return msg.getChangeNumber();
+ ReplicationIteratorComparator comparator =
+ new ReplicationIteratorComparator();
+ SortedSet<ReplicationIterator> iteratorSortedSet =
+ new TreeSet<ReplicationIterator>(comparator);
+ try
+ {
+ // Build a list of candidates iterator (i.e. db i.e. server)
+ for (short serverId : replicationServerDomain.getServers())
+ {
+ // get the last already sent CN from that server
+ ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
+ // get an iterator in this server db from that last change
+ ReplicationIterator iterator =
+ replicationServerDomain.getChangelogIterator(serverId, lastCsn);
+ // if that iterator has changes, then it is a candidate
+ // it is added in the sorted list at a position given by its
+ // current change (see ReplicationIteratorComparator).
+ if ((iterator != null) && (iterator.getChange() != null))
+ {
+ iteratorSortedSet.add(iterator);
+ }
+ }
+ UpdateMessage msg = iteratorSortedSet.first().getChange();
+ result = msg.getChangeNumber();
+ }
+ catch(Exception e)
+ {
+ result=null;
+ }
+ finally
+ {
+ for (ReplicationIterator iterator : iteratorSortedSet)
+ {
+ iterator.releaseCursor();
+ }
+ }
+ }
+ else
+ {
+ UpdateMessage msg = lateQueue.first();
+ result = msg.getChangeNumber();
+ }
}
}
+ return result;
}
/**
@@ -958,7 +1026,7 @@
*/
while (msgQueue.size() > maxQueueSize)
{
- following = false;
+ setFollowing(false);
msgQueue.removeFirst();
}
}
@@ -1083,6 +1151,13 @@
}
}
}
+
+ // The loop below relies on the fact that it is sorted based
+ // on the currentChange of each iterator to consider the next
+ // change accross all servers.
+ // Hence it is necessary to remove and eventual add again an iterator
+ // when looping in order to keep consistent the order of the
+ // iterators (see ReplicationIteratorComparator.
while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
{
ReplicationIterator iterator = iteratorSortedSet.first();
@@ -1107,7 +1182,7 @@
{
if (msgQueue.size() < maxQueueSize)
{
- following = true;
+ setFollowing(true);
}
}
}
@@ -1119,7 +1194,7 @@
if (msgQueue.contains(msg))
{
/* we finally catched up with the regular queue */
- following = true;
+ setFollowing(true);
lateQueue.clear();
UpdateMessage msg1;
do
@@ -1459,14 +1534,6 @@
attributes.add(new Attribute("connected-to", this.replicationServerDomain.
getReplicationServer().getMonitorInstanceName()));
- // Add the oldest missing update
- Long olderUpdateTime = this.getApproxFirstMissingDate();
- if (olderUpdateTime != null)
- {
- Date date = new Date(olderUpdateTime);
- attributes.add(new Attribute("approx-older-change-not-synchronized",
- date.toString()));
- }
}
else
{
@@ -1477,27 +1544,42 @@
attributes.add(new Attribute("base-dn",
baseDn.toString()));
- // Update stats
-
- // Retrieves the topology counters
if (serverIsLDAPserver)
{
+ MonitorData md;
try
{
- replicationServerDomain.retrievesRemoteMonitorData();
+ md = replicationServerDomain.getMonitorData();
+
+ // Oldest missing update
+ Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
+ if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
+ {
+ Date date = new Date(approxFirstMissingDate);
+ attributes.add(new Attribute("approx-older-change-not-synchronized",
+ date.toString()));
+ attributes.add(
+ new Attribute("approx-older-change-not-synchronized-millis",
+ String.valueOf(approxFirstMissingDate)));
+ }
+
+ // Missing changes
+ long missingChanges = md.getMissingChanges(serverId);
+ attributes.add(new Attribute("missing-changes",
+ String.valueOf(missingChanges)));
+
+ // Replication delay
+ long delay = md.getApproxDelay(serverId);
+ attributes.add(new Attribute("approximate-delay",
+ String.valueOf(delay)));
}
catch(Exception e)
{
- // FIXME: We failed retrieving the remote monitor data
+ // TODO: improve the log
+ // We failed retrieving the remote monitor data.
+ attributes.add(new Attribute("error",
+ stackTraceToSingleLineString(e)));
}
-
- // Compute the latency for the current SH
- int missingChanges =
- replicationServerDomain.getMissingChanges(serverState);
-
- // add the latency attribute to our monitor data
- attributes.add(new Attribute("missing-changes",
- String.valueOf(missingChanges)));
}
// Deprecated
@@ -1532,8 +1614,6 @@
attributes.add(new Attribute("waiting-changes",
String.valueOf(getRcvMsgQueueSize())));
// Age of oldest missing change
- attributes.add(new Attribute("approximate-delay",
- String.valueOf(getApproxDelay())));
// Date of the oldest missing change
long olderUpdateTime = getOlderUpdateTime();
@@ -1731,14 +1811,14 @@
List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
generationId = infoMsg.getGenerationId();
- synchronized(remoteLDAPservers)
+ synchronized(connectedServers)
{
// Removes the existing structures
- for (LightweightServerHandler lsh : remoteLDAPservers)
+ for (LightweightServerHandler lsh : connectedServers.values())
{
lsh.stopHandler();
}
- remoteLDAPservers.clear();
+ connectedServers.clear();
// Creates the new structure according to the message received.
for (String newConnectedServer : newRemoteLDAPservers)
@@ -1746,7 +1826,7 @@
LightweightServerHandler lsh
= new LightweightServerHandler(newConnectedServer, this);
lsh.startHandler();
- remoteLDAPservers.add(lsh);
+ connectedServers.put(lsh.getServerId(), lsh);
}
}
}
@@ -1762,14 +1842,17 @@
*/
public boolean isRemoteLDAPServer(short wantedServer)
{
- for (LightweightServerHandler server : remoteLDAPservers)
+ synchronized(connectedServers)
{
- if (wantedServer == server.getServerId())
+ for (LightweightServerHandler server : connectedServers.values())
{
- return true;
+ if (wantedServer == server.getServerId())
+ {
+ return true;
+ }
}
+ return false;
}
- return false;
}
/**
@@ -1781,7 +1864,7 @@
*/
public boolean hasRemoteLDAPServers()
{
- return !remoteLDAPservers.isEmpty();
+ return !connectedServers.isEmpty();
}
/**
@@ -1907,4 +1990,13 @@
{
return this.replicationServerDomain;
}
+
+ /**
+ * Return a Set containing the servers known by this replicationServer.
+ * @return a set containing the servers known by this replicationServer.
+ */
+ public Set<Short> getConnectedServerIds()
+ {
+ return connectedServers.keySet();
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 323751e..943e6b7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -120,6 +120,7 @@
{
ReplicationMessage msg = session.receive();
+ /*
if (debugEnabled())
{
TRACER.debugInfo(
@@ -128,6 +129,7 @@
(handler.isReplicationServer()?" From RS ":" From LS")+
" with serverId=" + serverId + " receives " + msg);
}
+ */
if (msg instanceof AckMessage)
{
AckMessage ack = (AckMessage) msg;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 698ce5e..2e47c7b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -120,6 +120,7 @@
continue;
}
+ /*
if (debugEnabled())
{
TRACER.debugInfo(
@@ -131,6 +132,7 @@
" server=" + handler.getServerId() +
" generationId=" + handler.getGenerationId());
}
+ */
session.publish(update);
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
index 2376e20..34a22dd 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
@@ -316,6 +316,10 @@
CN1 = new ChangeNumber((long)0, 3, (short)0);
+ // 3-0 = 3
+ CN2 = new ChangeNumber((long)0, 0, (short)0);
+ assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 3);
+
// 3-1 = 2
CN2 = new ChangeNumber((long)0, 1, (short)0);
assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 2);
@@ -327,5 +331,15 @@
// 3-4 == MAXINT (modulo)
CN2 = new ChangeNumber((long)0, 4, (short)0);
assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE);
+
+ CN1 = new ChangeNumber((long)0, 0, (short)0);
+
+ // 0-0 = 0
+ CN2 = new ChangeNumber((long)0, 0, (short)0);
+ assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 0);
+
+ // 0-1 = MAXINT(modulo)
+ CN2 = new ChangeNumber((long)0, 1, (short)0);
+ assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE);
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 592cc29..6d5c866 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -496,7 +496,7 @@
state.update(new ChangeNumber((long)1, 1,(short)1));
ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
window, window, window, window, window, window, state, (short)1,
- (long)1, true);
+ (long)1, true, false);
ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
assertEquals(msg.getServerId(), newMsg.getServerId());
assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -511,6 +511,7 @@
newMsg.getServerState().getMaxChangeNumber((short)1));
assertEquals(msg.getVersion(), newMsg.getVersion());
assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
+ assertEquals(msg.isHandshakeOnly(), newMsg.isHandshakeOnly());
}
@DataProvider(name="changelogStart")
@@ -614,20 +615,28 @@
(short) 123, sid2);
s2.update(cn2);
+ // LS3 state
+ ServerState s3 = new ServerState();
+ short sid3 = 333;
+ ChangeNumber cn3 = new ChangeNumber(now,
+ (short) 123, sid3);
+ s3.update(cn3);
+
MonitorMessage msg =
new MonitorMessage(sender, dest);
- msg.setReplServerState(rsState);
- msg.setLDAPServerState(sid1, s1, now+1);
- msg.setLDAPServerState(sid2, s2, now+2);
+ msg.setReplServerDbState(rsState);
+ msg.setServerState(sid1, s1, now+1, true);
+ msg.setServerState(sid2, s2, now+2, true);
+ msg.setServerState(sid3, s3, now+3, false);
byte[] b = msg.getBytes();
MonitorMessage newMsg = new MonitorMessage(b);
- assertEquals(rsState, msg.getReplServerState());
- assertEquals(newMsg.getReplServerState().toString(),
- msg.getReplServerState().toString());
+ assertEquals(rsState, msg.getReplServerDbState());
+ assertEquals(newMsg.getReplServerDbState().toString(),
+ msg.getReplServerDbState().toString());
- Iterator<Short> it = newMsg.iterator();
+ Iterator<Short> it = newMsg.ldapIterator();
while (it.hasNext())
{
short sid = it.next();
@@ -635,16 +644,32 @@
if (sid == sid1)
{
assertEquals(s.toString(), s1.toString(), "");
- assertEquals((Long)(now+1), newMsg.getApproxFirstMissingDate(sid), "");
+ assertEquals((Long)(now+1), newMsg.getLDAPApproxFirstMissingDate(sid), "");
}
else if (sid == sid2)
{
assertEquals(s.toString(), s2.toString());
- assertEquals((Long)(now+2), newMsg.getApproxFirstMissingDate(sid), "");
+ assertEquals((Long)(now+2), newMsg.getLDAPApproxFirstMissingDate(sid), "");
}
else
{
- fail("Bad sid");
+ fail("Bad sid" + sid);
+ }
+ }
+
+ Iterator<Short> it2 = newMsg.rsIterator();
+ while (it2.hasNext())
+ {
+ short sid = it2.next();
+ ServerState s = newMsg.getRSServerState(sid);
+ if (sid == sid3)
+ {
+ assertEquals(s.toString(), s3.toString(), "");
+ assertEquals((Long)(now+3), newMsg.getRSApproxFirstMissingDate(sid), "");
+ }
+ else
+ {
+ fail("Bad sid " + sid);
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 98b6a97..8186d82 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -874,7 +874,7 @@
ServerStartMessage msg =
new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"),
0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(),
- ProtocolVersion.currentVersion(), 0, sslEncryption);
+ ProtocolVersion.currentVersion(), 0, sslEncryption, false);
session.publish(msg);
// Read the Replication Server state from the ReplServerStartMessage that
@@ -907,7 +907,7 @@
0, 0, 0, 0, WINDOW, (long) 5000, replServerState,
ProtocolVersion.currentVersion(),
ReplicationTestCase.getGenerationId(baseDn),
- sslEncryption);
+ sslEncryption, false);
session.publish(msg);
// Read the ReplServerStartMessage that come back.
--
Gitblit v1.10.0