opends/src/messages/messages/replication.properties
@@ -252,7 +252,8 @@ NOTICE_SSL_SERVER_CON_ATTEMPT_ERROR_105=SSL connection attempt from %s (%s) \ failed: %s SEVERE_ERR_MISSING_REMOTE_MONITOR_DATA_106=Monitor data of remote servers \ are missing due to an error in the retrieval process are missing due to an error in the retrieval process. Potentially a server \ is too slow to provide its monitoring data over the protocol SEVERE_ERR_PROCESSING_REMOTE_MONITOR_DATA_107=Monitor data of remote servers \ are missing due to a processing error : %s SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \ opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.common; @@ -339,4 +339,23 @@ { return list.isEmpty(); } /** * Make a duplicate of this state. * @return The duplicate of this state. */ public ServerState duplicate() { ServerState newState = new ServerState(); synchronized (this) { for (Short key : list.keySet()) { ChangeNumber change = list.get(key); Short id = change.getServerId(); newState.list.put(id,change); } } return newState; } } opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -433,7 +433,7 @@ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn, maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, halfRcvWindow * 2, heartbeatInterval, state, protocolVersion, generationId, isSslEncryption); protocolVersion, generationId, isSslEncryption, !keepConnection); session.publish(msg); /* opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
@@ -65,15 +65,20 @@ } /** * Data structure to manage the state of the replication server * and the state informations for the LDAP servers connected. * Data structure to manage the state of this replication server * and the state informations for the servers connected to it. * */ class SubTopoMonitorData { ServerState replServerState; // This replication server DbState ServerState replServerDbState; // The data related to the LDAP servers connected to this RS HashMap<Short, ServerData> ldapStates = new HashMap<Short, ServerData>(); // The data related to the RS servers connected to this RS HashMap<Short, ServerData> rsStates = new HashMap<Short, ServerData>(); } SubTopoMonitorData data = new SubTopoMonitorData();; @@ -93,9 +98,9 @@ * Sets the state of the replication server. * @param state The state. */ public void setReplServerState(ServerState state) public void setReplServerDbState(ServerState state) { data.replServerState = state; data.replServerDbState = state; } /** @@ -103,20 +108,27 @@ * @param serverId The serverID. * @param state The server state. * @param approxFirstMissingDate The approximation of the date * of the older missing change. * * of the older missing change. null when none. * @param isLDAP Specifies whether the server is a LS or a RS */ public void setLDAPServerState(short serverId, ServerState state, Long approxFirstMissingDate) public void setServerState(short serverId, ServerState state, Long approxFirstMissingDate, boolean isLDAP) { if (data.ldapStates == null) { data.ldapStates = new HashMap<Short, ServerData>(); } if (data.rsStates == null) { data.rsStates = new HashMap<Short, ServerData>(); } ServerData sd = new ServerData(); sd.state = state; sd.approxFirstMissingDate = approxFirstMissingDate; data.ldapStates.put(serverId, sd); if (isLDAP) data.ldapStates.put(serverId, sd); else data.rsStates.put(serverId, sd); } /** @@ -130,16 +142,37 @@ } /** * Get the server state for the RS server with the provided serverId. * @param serverId The provided serverId. * @return The state. */ public ServerState getRSServerState(short serverId) { return data.rsStates.get(serverId).state; } /** * Get the approximation of the date of the older missing change for the * LDAP Server with the provided server Id. * @param serverId The provided serverId. * @return The approximated state. */ public Long getApproxFirstMissingDate(short serverId) public Long getLDAPApproxFirstMissingDate(short serverId) { return data.ldapStates.get(serverId).approxFirstMissingDate; } /** * Get the approximation of the date of the older missing change for the * RS Server with the provided server Id. * @param serverId The provided serverId. * @return The approximated state. */ public Long getRSApproxFirstMissingDate(short serverId) { return data.rsStates.get(serverId).approxFirstMissingDate; } /** * Creates a new EntryMessage from its encoded form. @@ -182,39 +215,51 @@ try { ASN1Sequence s0 = ASN1Sequence.decodeAsSequence(encodedS); // loop on the servers for (ASN1Element el0 : s0.elements()) { ServerState newState = new ServerState(); short serverId = 0; Long outime = (long)0; boolean isLDAPServer = false; ASN1Sequence s1 = el0.decodeAsSequence(); // loop on the list of CN of the state for (ASN1Element el1 : s1.elements()) { ASN1OctetString o = el1.decodeAsOctetString(); String s = o.stringValue(); ChangeNumber cn = new ChangeNumber(s); if ((data.replServerState != null) && (serverId == 0)) if ((data.replServerDbState != null) && (serverId == 0)) { // we are on the first CN that is a fake CN to store the serverId // and the older update time serverId = cn.getServerId(); outime = cn.getTime(); isLDAPServer = (cn.getSeqnum()>0); } else { // we are on a normal CN newState.update(cn); } } // the first state is the replication state if (data.replServerState == null) if (data.replServerDbState == null) { data.replServerState = newState; // the first state is the replication state data.replServerDbState = newState; } else { // the next states are the server states ServerData sd = new ServerData(); sd.state = newState; sd.approxFirstMissingDate = outime; data.ldapStates.put(serverId, sd); if (isLDAPServer) data.ldapStates.put(serverId, sd); else data.rsStates.put(serverId, sd); } } } catch(Exception e) @@ -245,14 +290,17 @@ ASN1Sequence stateElementSequence = new ASN1Sequence(); ArrayList<ASN1Element> stateElementList = new ArrayList<ASN1Element>(); // First loop computes the length /** * First loop computes the length */ /* Put the serverStates ... */ stateElementSequence = new ASN1Sequence(); stateElementList = new ArrayList<ASN1Element>(); /* first put the Replication Server state */ ArrayList<ASN1OctetString> cnOctetList = data.replServerState.toASN1ArrayList(); data.replServerDbState.toASN1ArrayList(); ArrayList<ASN1Element> cnElementList = new ArrayList<ASN1Element>(); for (ASN1OctetString soci : cnOctetList) { @@ -288,6 +336,35 @@ cnSequence = new ASN1Sequence(cnElementList); stateElementList.add(cnSequence); } // then the rs server data servers = data.rsStates.keySet(); for (Short sid : servers) { // State ServerState statei = data.rsStates.get(sid).state; // First missing date Long outime = data.rsStates.get(sid).approxFirstMissingDate; // retrieves the change numbers as an arrayList of ANSN1OctetString cnOctetList = statei.toASN1ArrayList(); cnElementList = new ArrayList<ASN1Element>(); // a fake changenumber helps storing the LDAP server ID // and the olderupdatetime ChangeNumber cn = new ChangeNumber(outime,0,sid); cnElementList.add(new ASN1OctetString(cn.toString())); // the changenumbers for (ASN1OctetString soci : cnOctetList) { cnElementList.add(soci); } cnSequence = new ASN1Sequence(cnElementList); stateElementList.add(cnSequence); } stateElementSequence.setElements(stateElementList); int seqLen = stateElementSequence.encode().length; @@ -298,7 +375,9 @@ // Allocate the array sized from the computed length byte[] resultByteArray = new byte[length]; // Second loop build the array /** * Second loop really builds the array */ /* put the type of the operation */ resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR; @@ -313,7 +392,7 @@ /* first put the Replication Server state */ cnOctetList = data.replServerState.toASN1ArrayList(); data.replServerDbState.toASN1ArrayList(); cnElementList = new ArrayList<ASN1Element>(); for (ASN1OctetString soci : cnOctetList) { @@ -322,7 +401,7 @@ cnSequence = new ASN1Sequence(cnElementList); stateElementList.add(cnSequence); // then the LDAP server state // then the LDAP server datas servers = data.ldapStates.keySet(); for (Short sid : servers) { @@ -334,10 +413,10 @@ cnElementList = new ArrayList<ASN1Element>(); // a fake changenumber helps storing the LDAP server ID ChangeNumber cn = new ChangeNumber(outime,0,sid); ChangeNumber cn = new ChangeNumber(outime,1,sid); cnElementList.add(new ASN1OctetString(cn.toString())); // the changenumbers // the changenumbers that make the state for (ASN1OctetString soci : cnOctetList) { cnElementList.add(soci); @@ -346,6 +425,33 @@ cnSequence = new ASN1Sequence(cnElementList); stateElementList.add(cnSequence); } // then the RS server datas servers = data.rsStates.keySet(); for (Short sid : servers) { ServerState statei = data.rsStates.get(sid).state; Long outime = data.rsStates.get(sid).approxFirstMissingDate; // retrieves the change numbers as an arrayList of ANSN1OctetString cnOctetList = statei.toASN1ArrayList(); cnElementList = new ArrayList<ASN1Element>(); // a fake changenumber helps storing the LDAP server ID ChangeNumber cn = new ChangeNumber(outime,0,sid); cnElementList.add(new ASN1OctetString(cn.toString())); // the changenumbers that make the state for (ASN1OctetString soci : cnOctetList) { cnElementList.add(soci); } cnSequence = new ASN1Sequence(cnElementList); stateElementList.add(cnSequence); } stateElementSequence.setElements(stateElementList); pos = addByteArray(stateElementSequence.encode(), resultByteArray, pos); @@ -361,41 +467,62 @@ * Get the state of the replication server that sent this message. * @return The state. */ public ServerState getReplServerState() public ServerState getReplServerDbState() { return data.replServerState; return data.replServerDbState; } /** * Returns an iterator on the serverId of the connected LDAP servers. * @return The iterator. */ public Iterator<Short> iterator() public Iterator<Short> ldapIterator() { return data.ldapStates.keySet().iterator(); } /** * Returns an iterator on the serverId of the connected RS servers. * @return The iterator. */ public Iterator<Short> rsIterator() { return data.rsStates.keySet().iterator(); } /** * {@inheritDoc} */ @Override public String toString() { String stateS = " RState:"; stateS += "/" + data.replServerState.toString(); stateS += " LDAPStates:"; Iterator<ServerData> it = data.ldapStates.values().iterator(); while (it.hasNext()) String stateS = "\nRState:["; stateS += data.replServerDbState.toString(); stateS += "]"; stateS += "\nLDAPStates:["; for (Short sid : data.ldapStates.keySet()) { ServerData sd = it.next(); stateS += "/ state=" + sd.state.toString() + " afmd=" + sd.approxFirstMissingDate + "] "; ServerData sd = data.ldapStates.get(sid); stateS += "\n[LSstate("+ sid + ")=" + sd.state.toString() + "]" + " afmd=" + sd.approxFirstMissingDate + "]"; } stateS += "\nRSStates:["; for (Short sid : data.rsStates.keySet()) { ServerData sd = data.rsStates.get(sid); stateS += "\n[RSState("+ sid + ")=" + sd.state.toString() + "]" + " afmd=" + sd.approxFirstMissingDate + "]"; } String me = this.getClass().getCanonicalName() + " sender=" + this.senderID + "[ sender=" + this.senderID + " destination=" + this.destination + " states=" + stateS + " data=[" + stateS + "]" + "]"; return me; } opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -54,6 +54,7 @@ private int maxReceiveDelay; private int maxSendDelay; private int windowSize; private boolean handshakeOnly; private ServerState serverState = null; /** @@ -87,6 +88,8 @@ * @param generationId The generationId for this server. * @param sslEncryption Whether to continue using SSL to encrypt messages * after the start messages have been exchanged. * @param handshakeOnly Whether this message is only to get an handshake * with the server or not. */ public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay, int maxReceiveQueue, int maxSendDelay, @@ -95,7 +98,8 @@ ServerState serverState, short protocolVersion, long generationId, boolean sslEncryption) boolean sslEncryption, boolean handshakeOnly) { super(protocolVersion, generationId); @@ -109,6 +113,7 @@ this.heartbeatInterval = heartbeatInterval; this.sslEncryption = sslEncryption; this.serverState = serverState; this.handshakeOnly = handshakeOnly; try { @@ -209,10 +214,19 @@ sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8")); pos += length +1; /* * read the handshakeOnly flag */ length = getNextLength(in, pos); handshakeOnly = Boolean.valueOf(new String(in, pos, length, "UTF-8")); pos += length +1; /* * read the ServerState */ serverState = new ServerState(in, pos, in.length-1); } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); @@ -322,6 +336,8 @@ byte[] byteSSLEncryption = String.valueOf(sslEncryption).getBytes("UTF-8"); byte[] byteServerState = serverState.getBytes(); byte[] byteHandshakeOnly = String.valueOf(handshakeOnly).getBytes("UTF-8"); int length = byteDn.length + 1 + byteServerId.length + 1 + byteServerUrl.length + 1 + @@ -332,6 +348,7 @@ byteWindowSize.length + 1 + byteHeartbeatInterval.length + 1 + byteSSLEncryption.length + 1 + byteHandshakeOnly.length + 1 + byteServerState.length + 1; /* encode the header in a byte[] large enough to also contain the mods */ @@ -358,6 +375,8 @@ pos = addByteArray(byteSSLEncryption, resultByteArray, pos); pos = addByteArray(byteHandshakeOnly, resultByteArray, pos); pos = addByteArray(byteServerState, resultByteArray, pos); return resultByteArray; @@ -401,4 +420,16 @@ { return sslEncryption; } /** * Get the SSL encryption value for the ldap server that created the * message. * * @return The SSL encryption value for the ldap server that created the * message. */ public boolean isHandshakeOnly() { return handshakeOnly; } } opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -200,16 +200,6 @@ @Override public ArrayList<Attribute> getMonitorData() { if (debugEnabled()) TRACER.debugInfo( "In " + this.replServerHandler.getDomain().getReplicationServer(). getMonitorInstanceName()+ " LWSH for remote server " + this.serverId + " connected to:" + this.replServerHandler.getMonitorInstanceName() + " getMonitor data"); ArrayList<Attribute> attributes = new ArrayList<Attribute>(); attributes.add(new Attribute("server-id", @@ -220,12 +210,12 @@ replServerHandler.getMonitorInstanceName())); // Retrieves the topology counters MonitorData md; try { rsDomain.retrievesRemoteMonitorData(); md = rsDomain.getMonitorData(); // Compute the latency for the current SH ServerState remoteState = rsDomain.getServerState(serverId); ServerState remoteState = md.getLDAPServerState(serverId); if (remoteState == null) { remoteState = new ServerState(); @@ -241,29 +231,39 @@ { values.add(new AttributeValue(type,str)); } if (values.size() == 0) { values.add(new AttributeValue(type,"unknown")); } Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values); attributes.add(attr); // add the latency attribute to our monitor data // Compute the latency for the current SH int missingChanges = rsDomain.getMissingChanges(remoteState); attributes.add(new Attribute("missing-changes", String.valueOf(missingChanges))); // Add the oldest missing update Long olderUpdateTime = rsDomain.getApproxFirstMissingDate(serverId); if (olderUpdateTime != null) // Oldest missing update Long approxFirstMissingDate=md.getApproxFirstMissingDate(serverId); if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0)) { Date date = new Date(olderUpdateTime); Date date = new Date(approxFirstMissingDate); attributes.add(new Attribute("approx-older-change-not-synchronized", date.toString())); attributes.add( new Attribute("approx-older-change-not-synchronized-millis", String.valueOf(olderUpdateTime))); new Attribute("approx-older-change-not-synchronized-millis", String.valueOf(approxFirstMissingDate))); } // Missing changes long missingChanges = md.getMissingChanges(serverId); attributes.add(new Attribute("missing-changes", String.valueOf(missingChanges))); // Replication delay long delay = md.getApproxDelay(serverId); attributes.add(new Attribute("approximate-delay", String.valueOf(delay))); } catch(Exception e) { // TODO: improve the log // We failed retrieving the remote monitor data. attributes.add(new Attribute("error", stackTraceToSingleLineString(e))); opends/src/server/org/opends/server/replication/server/MonitorData.java
New file @@ -0,0 +1,329 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2008 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; import org.opends.server.util.TimeThread; /** * This class defines the Monitor Data that are consolidated across the * whole replication topology. */ public class MonitorData { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * * - For each server, the max (most recent) CN produced * * - For each server, its state i.e. the last processed from of each * other LDAP server. * The change latency (missing changes) will be * the difference between the max above and the state here * * - For each server, the date of the first missing change. * The time latency (delay) will be the difference between now and the * date of the first missing change. */ /* The date of the last time they have been elaborated */ private long buildDate = 0; // For each LDAP server, its server state private ConcurrentHashMap<Short, ServerState> LDAPStates = new ConcurrentHashMap<Short, ServerState>(); // For each LDAP server, the last(max) CN it published private ConcurrentHashMap<Short, ChangeNumber> maxCNs = new ConcurrentHashMap<Short, ChangeNumber>(); // For each LDAP server, an approximation of the date of the first missing // change private ConcurrentHashMap<Short, Long> fmd = new ConcurrentHashMap<Short, Long>(); private ConcurrentHashMap<Short, Long> missingChanges = new ConcurrentHashMap<Short, Long>(); // For each RS server, an approximation of the date of the first missing // change private ConcurrentHashMap<Short, Long> fmRSDate = new ConcurrentHashMap<Short, Long>(); /** * Get an approximation of the latency delay of the replication. * @param serverId The server ID. * @return The delay */ public long getApproxDelay(short serverId) { Long afmd = fmd.get(serverId); if ((afmd != null) && (afmd>0)) return ((this.getBuildDate() - afmd)/1000); else return 0; } /** * Get an approximation of the date of the first missing update. * @param serverId The server ID. * @return The date. */ public long getApproxFirstMissingDate(short serverId) { Long res; if ((res = fmd.get(serverId)) != null) return res; return 0; } /** * Get the number of missing changes. * @param serverId The server ID. * @return The number of missing changes. */ public long getMissingChanges(short serverId) { Long res = missingChanges.get(serverId); if (res==null) return 0; else return res; } /** * Build the monitor data that are computed from the collected ones. */ public void completeComputing() { String mds = ""; // Computes the missing changes counters // For each LSi , // Regarding each other LSj // Sum the difference : max(LSj) - state(LSi) Iterator<Short> lsiStateItr = this.LDAPStates.keySet().iterator(); while (lsiStateItr.hasNext()) { Short lsiSid = lsiStateItr.next(); ServerState lsiState = this.LDAPStates.get(lsiSid); Long lsiMissingChanges = (long)0; if (lsiState != null) { Iterator<Short> lsjMaxItr = this.maxCNs.keySet().iterator(); while (lsjMaxItr.hasNext()) { Short lsjSid = lsjMaxItr.next(); ChangeNumber lsjMaxCN = this.maxCNs.get(lsjSid); ChangeNumber lsiLastCN = lsiState.getMaxChangeNumber(lsjSid); int missingChangesLsiLsj = ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN); mds += "+ diff("+lsjMaxCN+"-" +lsiLastCN+")="+missingChangesLsiLsj; lsiMissingChanges += missingChangesLsiLsj; } } mds += "=" + lsiMissingChanges; this.missingChanges.put(lsiSid,lsiMissingChanges); if (debugEnabled()) TRACER.debugInfo( "Complete monitor data : Missing changes ("+ lsiSid +")=" + mds); } this.setBuildDate(TimeThread.getTime()); } /** * Returns a <code>String</code> object representing this * object's value. * @return a string representation of the value of this object in */ public String toString() { String mds = "Monitor data=\n"; mds+= "Build date=" + this.getBuildDate(); // RS data Iterator<Short> rsite = fmRSDate.keySet().iterator(); while (rsite.hasNext()) { Short sid = rsite.next(); mds += "\nRSData(" + sid + ")=\t "+ "afmd=" + fmRSDate.get(sid); } // maxCNs Iterator<Short> itc = maxCNs.keySet().iterator(); while (itc.hasNext()) { Short sid = itc.next(); ChangeNumber cn = maxCNs.get(sid); mds += "\nmaxCNs(" + sid + ")= " + cn.toString(); } // LDAP data Iterator<Short> lsite = LDAPStates.keySet().iterator(); while (lsite.hasNext()) { Short sid = lsite.next(); ServerState ss = LDAPStates.get(sid); mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString() + "] afmd=" + this.getApproxFirstMissingDate(sid); if (getBuildDate()>0) { mds += " missingDelay=" + this.getApproxDelay(sid); } mds +=" missingCount=" + missingChanges.get(sid); } // mds += "--"; return mds; } /** * Sets the build date of the data. * @param buildDate The date. */ public void setBuildDate(long buildDate) { this.buildDate = buildDate; } /** * Returns the build date of the data. * @return The date. */ public long getBuildDate() { return buildDate; } /** * From a provided state, sets the max CN of the monitor data. * @param state the provided state. */ public void setMaxCNs(ServerState state) { Iterator<Short> it = state.iterator(); while (it.hasNext()) { short sid = it.next(); ChangeNumber newCN = state.getMaxChangeNumber(sid); setMaxCN(sid, newCN); } } /** * For the provided serverId, sets the provided CN as the max if * it is newer than the current max. * @param serverId the provided serverId * @param newCN the provided new CN */ public void setMaxCN(short serverId, ChangeNumber newCN) { if (newCN==null) return; ChangeNumber currentMaxCN = maxCNs.get(serverId); if (currentMaxCN == null) { maxCNs.put(serverId, newCN); } else { if (newCN.newer(currentMaxCN)) maxCNs.replace(serverId, newCN); } } /** * Get the highest know change number of the LDAP server with the provided * serverId. * @param serverId The server ID. * @return The highest change number. */ public ChangeNumber getMaxCN(short serverId) { return maxCNs.get(serverId); } /** * Get the state of the LDAP server with the provided serverId. * @param serverId The server ID. * @return The server state. */ public ServerState getLDAPServerState(short serverId) { return LDAPStates.get(serverId); } /** * Set the state of the LDAP server with the provided serverId. * @param serverId The server ID. * @param state The server state. */ public void setLDAPServerState(short serverId, ServerState state) { LDAPStates.put(serverId, state); } /** * Set the state of the LDAP server with the provided serverId. * @param serverId The server ID. * @param newFmd The first missing date. */ public void setFirstMissingDate(short serverId, Long newFmd) { if (newFmd==null) return; Long currentfmd = fmd.get(serverId); if (currentfmd==null) { fmd.put(serverId, newFmd); } else { if ((newFmd!=0) && (newFmd<currentfmd)) fmd.replace(serverId, newFmd); } } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,7 +37,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -129,8 +128,8 @@ /* Monitor data management */ // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable private long remoteMonitorDataLifeTime = 500; // TODO: Remote monitor data cache lifetime is 500ms/should be configurable private long monitorDataLifeTime = 500; /* Search op on monitor data is processed by a worker thread. * Requests are sent to the other RS,and responses are received by the @@ -139,21 +138,11 @@ */ Semaphore remoteMonitorResponsesSemaphore; /* The date of the last time they have been elaborated */ private long validityDate = 0; // For each LDAP server, its server state private HashMap<Short, ServerState> LDAPStates = new HashMap<Short, ServerState>(); // For each LDAP server, the last CN it published private HashMap<Short, ChangeNumber> maxCNs = new HashMap<Short, ChangeNumber>(); // For each LDAP server, an approximation of the date of the first missing // change private HashMap<Short, Long> approxFirstMissingDate = new HashMap<Short, Long>(); /** * The monitor data consolidated over the topology. */ private MonitorData monitorData = new MonitorData(); private MonitorData wrkMonitorData; /** * Creates a new ReplicationServerDomain associated to the DN baseDn. @@ -166,13 +155,7 @@ { this.baseDn = baseDn; this.replicationServer = replicationServer; if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " Created Cache for " + baseDn + " " + stackTraceToSingleLineString(new Exception())); } } /** * Add an update that has been received to the list of @@ -366,6 +349,10 @@ { replicationServers.remove(handler.getServerId()); handler.stopHandler(); // Update the remote replication servers with our list // of connected LDAP servers sendReplServerInfo(); } } else @@ -374,12 +361,12 @@ { connectedServers.remove(handler.getServerId()); handler.stopHandler(); // Update the remote replication servers with our list // of connected LDAP servers sendReplServerInfo(); } } // Update the remote replication servers with our list // of connected LDAP servers sendReplServerInfo(); } /** @@ -578,7 +565,8 @@ * * @param serverId Identifier of the server for which the iterator is created. * @param changeNumber Starting point for the iterator. * @return the created ReplicationIterator. * @return the created ReplicationIterator. Null when no DB is available * for the provided server Id. */ public ReplicationIterator getChangelogIterator(short serverId, ChangeNumber changeNumber) @@ -591,7 +579,8 @@ { return handler.generateIterator(changeNumber); } catch (Exception e) { catch (Exception e) { return null; } } @@ -759,6 +748,7 @@ */ public void process(RoutableMessage msg, ServerHandler senderHandler) { // Test the message for which a ReplicationServer is expected // to be the destination if (msg.getDestination() == this.replicationServer.getServerId()) @@ -779,20 +769,33 @@ replServerMonitorRequestMsg.getDestination(), replServerMonitorRequestMsg.getsenderID()); // Populate the RS state in the msg from the DbState monitorMsg.setReplServerState(this.getDbServerState()); // Populate for each connected LDAP Server // from the states stored in the serverHandler. // - the server state // - the older missing change for (ServerHandler lsh : this.connectedServers.values()) { monitorMsg.setLDAPServerState( monitorMsg.setServerState( lsh.getServerId(), lsh.getServerState(), lsh.getApproxFirstMissingDate()); lsh.getApproxFirstMissingDate(), true); } // Same for the connected RS for (ServerHandler rsh : this.replicationServers.values()) { monitorMsg.setServerState( rsh.getServerId(), rsh.getServerState(), rsh.getApproxFirstMissingDate(), false); } // Populate the RS state in the msg from the DbState monitorMsg.setReplServerDbState(this.getDbServerState()); try { senderHandler.send(monitorMsg); @@ -1305,118 +1308,135 @@ } } /* /* ======================= * Monitor Data generation * ======================= */ /** * Retrieves the remote monitor data. * * Retrieves the global monitor data. * @return The monitor data. * @throws DirectoryException When an error occurs. */ protected void retrievesRemoteMonitorData() synchronized protected MonitorData getMonitorData() throws DirectoryException { if (validityDate > TimeThread.getTime()) if (monitorData.getBuildDate() + monitorDataLifeTime > TimeThread.getTime()) { // The current data are still valid. No need to renew them. return; if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDn=" + baseDn + " getRemoteMonitorData in cache"); // The current data are still valid. No need to renew them. // FIXME return null; } // Clean this.LDAPStates.clear(); this.maxCNs.clear(); // Init the maxCNs of our direct LDAP servers from our own dbstate for (ServerHandler rs : connectedServers.values()) wrkMonitorData = new MonitorData(); synchronized(wrkMonitorData) { short serverID = rs.getServerId(); ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID); if (cn == null) { // we have nothing in db for that server cn = new ChangeNumber(0, 0 , serverID); } this.maxCNs.put(serverID, cn); } if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDn=" + baseDn + " Computing monitor data "); ServerState replServerState = this.getDbServerState(); Iterator<Short> it = replServerState.iterator(); while (it.hasNext()) { short sid = it.next(); ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid); ChangeNumber maxCN = this.maxCNs.get(sid); if ((maxCN != null) && (receivedCN.newer(maxCN))) // Let's process our directly connected LSes // - in the ServerHandler for a given LS1, the stored state contains : // - the max CN produced by LS1 // - the last CN consumed by LS1 from LS2..n // - in the RSdomain/dbHandler, the built-in state contains : // - the max CN produced by each server // So for a given LS connected we can take the state and the max from // the LS/state. for (ServerHandler directlsh : connectedServers.values()) { // We found a newer one this.maxCNs.remove(sid); this.maxCNs.put(sid, receivedCN); short serverID = directlsh.getServerId(); // the state comes from the state stored in the SH ServerState directlshState = directlsh.getServerState().duplicate(); // the max CN sent by that LS also comes from the SH ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID); if (maxcn == null) { // This directly connected LS has never produced any change maxcn = new ChangeNumber(0, 0 , serverID); } wrkMonitorData.setMaxCN(serverID, maxcn); wrkMonitorData.setLDAPServerState(serverID, directlshState); wrkMonitorData.setFirstMissingDate(serverID, directlsh. getApproxFirstMissingDate()); } // Then initialize the max CN for the LS that produced something // - from our own local db state // - whatever they are directly or undirectly connected ServerState dbServerState = getDbServerState(); Iterator<Short> it = dbServerState.iterator(); while (it.hasNext()) { short sid = it.next(); ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid); wrkMonitorData.setMaxCN(sid, storedCN); } // Now we have used all available local informations // and we need the remote ones. if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDn=" + baseDn + " Local monitor data: " + wrkMonitorData.toString()); } // Send Request to the other Replication Servers if (remoteMonitorResponsesSemaphore == null) { remoteMonitorResponsesSemaphore = new Semaphore( replicationServers.size() -1); sendMonitorDataRequest(); remoteMonitorResponsesSemaphore = new Semaphore(0); short requestCnt = sendMonitorDataRequest(); // Wait reponses from them or timeout waitMonitorDataResponses(replicationServers.size()); waitMonitorDataResponses(requestCnt); } else { // The processing of renewing the monitor cache is already running // We'll make it sleeping until the end // TODO: unit test for this case. while (remoteMonitorResponsesSemaphore!=null) { waitMonitorDataResponses(1); } } // Now we have the expected answers of an error occured validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime; wrkMonitorData.completeComputing(); if (debugEnabled()) // Store the new computed data as the reference synchronized(monitorData) { debugMonitorData(); } } private void debugMonitorData() { String mds = " Monitor data="; Iterator<Short> ite = LDAPStates.keySet().iterator(); while (ite.hasNext()) { Short sid = ite.next(); ServerState ss = LDAPStates.get(sid); mds += " LDAPState(" + sid + ")=" + ss.toString(); } Iterator<Short> itc = maxCNs.keySet().iterator(); while (itc.hasNext()) { Short sid = itc.next(); ChangeNumber cn = maxCNs.get(sid); mds += " maxCNs(" + sid + ")=" + cn.toString(); } mds += "--"; TRACER.debugInfo( // Now we have the expected answers or an error occured monitorData = wrkMonitorData; wrkMonitorData = null; if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDN=" + baseDn + mds); " baseDn=" + baseDn + " *** Computed MonitorData: " + monitorData.toString()); } return monitorData; } /** * Sends a MonitorRequest message to all connected RS. * @return the number of requests sent. * @throws DirectoryException when a problem occurs. */ protected void sendMonitorDataRequest() protected short sendMonitorDataRequest() throws DirectoryException { short sent=0; try { for (ServerHandler rs : replicationServers.values()) @@ -1425,6 +1445,7 @@ MonitorRequestMessage(this.replicationServer.getServerId(), rs.getServerId()); rs.send(msg); sent++; } } catch(Exception e) @@ -1434,6 +1455,7 @@ throw new DirectoryException(ResultCode.OTHER, message, e); } return sent; } /** @@ -1446,21 +1468,30 @@ { try { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDn=" + baseDn + " waiting for " + expectedResponses + " expected monitor messages"); boolean allPermitsAcquired = remoteMonitorResponsesSemaphore.tryAcquire( expectedResponses, (long) 500, TimeUnit.MILLISECONDS); (long) 5000, TimeUnit.MILLISECONDS); if (!allPermitsAcquired) { logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); // FIXME let's go on in best effort even with limited data received. } else { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + "Successfully received all " + replicationServers.size() " baseDn=" + baseDn + " Successfully received all " + expectedResponses + " expected monitor messages"); } } @@ -1482,48 +1513,94 @@ */ public void receivesMonitorDataResponse(MonitorMessage msg) { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + "Receiving " + msg + " from " + msg.getsenderID() + remoteMonitorResponsesSemaphore); if (remoteMonitorResponsesSemaphore == null) { // Ignoring the remote monitor data because an error occured previously // FIXME logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get( "In " + this.replicationServer.getMonitorInstanceName() + "Receiving " + msg + " from " + msg.getsenderID() + " remoteMonitorResponsesSemaphore should not be null")); // Ignoring the remote monitor data because an error occured // previously return; } try { // Here is the RS state : list <serverID, lastChangeNumber> // For each LDAP Server, we keep the max CN accross the RSes ServerState replServerState = msg.getReplServerState(); Iterator<Short> it = replServerState.iterator(); while (it.hasNext()) synchronized(wrkMonitorData) { short sid = it.next(); ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid); ChangeNumber maxCN = this.maxCNs.get(sid); if (receivedCN.newer(maxCN)) { // We found a newer one this.maxCNs.remove(sid); this.maxCNs.put(sid, receivedCN); } } // Here is the RS state : list <serverID, lastChangeNumber> // For each LDAP Server, we keep the max CN accross the RSes ServerState replServerState = msg.getReplServerDbState(); wrkMonitorData.setMaxCNs(replServerState); // Store the LDAP servers states Iterator<Short> sidIterator = msg.iterator(); while (sidIterator.hasNext()) { short sid = sidIterator.next(); ServerState ss = msg.getLDAPServerState(sid); this.LDAPStates.put(sid, ss); this.approxFirstMissingDate.put(sid, msg.getApproxFirstMissingDate(sid)); // Store the remote LDAP servers states Iterator<Short> lsidIterator = msg.ldapIterator(); while (lsidIterator.hasNext()) { short sid = lsidIterator.next(); wrkMonitorData.setLDAPServerState(sid, msg.getLDAPServerState(sid).duplicate()); wrkMonitorData.setFirstMissingDate(sid, msg.getLDAPApproxFirstMissingDate(sid)); } // Process the latency reported by the remote RSi on its connections // to the other RSes Iterator<Short> rsidIterator = msg.rsIterator(); while (rsidIterator.hasNext()) { short rsid = rsidIterator.next(); if (rsid == replicationServer.getServerId()) { // this is the latency of the remote RSi regarding the current RS // let's update the fmd of my connected LS for (ServerHandler connectedlsh : connectedServers.values()) { short connectedlsid = connectedlsh.getServerId(); Long newfmd = msg.getRSApproxFirstMissingDate(rsid); wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd); } } else { // this is the latency of the remote RSi regarding another RSj // let's update the latency of the LSes connected to RSj ServerHandler rsjHdr = replicationServers.get(rsid); for(short remotelsid : rsjHdr.getConnectedServerIds()) { Long newfmd = msg.getRSApproxFirstMissingDate(rsid); wrkMonitorData.setFirstMissingDate(remotelsid, newfmd); } } } if (debugEnabled()) { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDn=" + baseDn + " Processed msg from " + msg.getsenderID() + " New monitor data: " + wrkMonitorData.toString()); } } // Decreases the number of expected responses and potentially // wakes up the waiting requestor thread. remoteMonitorResponsesSemaphore.release(); } catch (Exception e) { logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() + stackTraceToSingleLineString(e))); // If an exception occurs while processing one of the expected message, // the processing is aborted and the waiting thread is awoke. remoteMonitorResponsesSemaphore.notifyAll(); @@ -1531,65 +1608,6 @@ } /** * Get the state of the LDAP server with the provided serverId. * @param serverId The server ID. * @return The server state. */ public ServerState getServerState(short serverId) { return LDAPStates.get(serverId); } /** * Get the highest know change number of the LDAP server with the provided * serverId. * @param serverId The server ID. * @return The highest change number. */ public ChangeNumber getMaxCN(short serverId) { return maxCNs.get(serverId); } /** * Get an approximation of the date of the oldest missing changes. * serverId. * @param serverId The server ID. * @return The approximation of the date of the oldest missing change. */ public Long getApproxFirstMissingDate(short serverId) { return approxFirstMissingDate.get(serverId); } /** * Get the number of missing change for the server with the provided state. * @param state The provided server state. * @return The number of missing changes. */ public int getMissingChanges(ServerState state) { // Traverse the max Cn transmitted by each server // For each server, get the highest CN know from the current server // Sum the difference betwenn the max and the last int missingChanges = 0; Iterator<Short> itc = maxCNs.keySet().iterator(); while (itc.hasNext()) { Short sid = itc.next(); ChangeNumber maxCN = maxCNs.get(sid); ChangeNumber last = state.getMaxChangeNumber(sid); if (last == null) { last = new ChangeNumber(0,0, sid); } int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last); missingChanges += missingChangesFromSID; } return missingChanges; } /** * Set the purge delay on all the db Handlers for this Domain * of Replicaiton. * opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -45,6 +45,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -124,12 +125,12 @@ /** * When this Handler is connected to a remote replication server * When this Handler is related to a remote replication server * this collection will contain as many elements as there are * LDAP servers connected to the remote replication server. */ private List<LightweightServerHandler> remoteLDAPservers = new ArrayList<LightweightServerHandler>(); private Map<Short, LightweightServerHandler> connectedServers = new ConcurrentHashMap<Short, LightweightServerHandler>(); /** * The time in milliseconds between heartbeats from the replication @@ -200,6 +201,8 @@ maxRcvWindow = windowSize; rcvWindow = windowSize; long localGenerationId = -1; boolean handshakeOnly = false; try { if (baseDn != null) @@ -244,6 +247,8 @@ maxSendQueue = receivedMsg.getMaxSendQueue(); heartbeatInterval = receivedMsg.getHeartbeatInterval(); handshakeOnly = receivedMsg.isHandshakeOnly(); // The session initiator decides whether to use SSL. sslEncryption = receivedMsg.getSSLEncryption(); @@ -524,60 +529,70 @@ replicationServerDomain = replicationServer. getReplicationServerDomain(this.baseDn,true); boolean started; if (serverIsLDAPserver) if (!handshakeOnly) { started = replicationServerDomain.startServer(this); } else { started = replicationServerDomain.startReplicationServer(this); } if (started) { // sendWindow MUST be created before starting the writer sendWindow = new Semaphore(sendWindowSize); writer = new ServerWriter(session, serverId, this, replicationServerDomain); reader = new ServerReader(session, serverId, this, replicationServerDomain); reader.start(); writer.start(); // Create a thread to send heartbeat messages. if (heartbeatInterval > 0) boolean started; if (serverIsLDAPserver) { heartbeatThread = new HeartbeatThread( "replication Heartbeat to " + serverURL + " for " + this.baseDn, session, heartbeatInterval/3); heartbeatThread.start(); started = replicationServerDomain.startServer(this); } else { started = replicationServerDomain.startReplicationServer(this); } DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); DirectoryServer.registerMonitorProvider(this); } else { // the connection is not valid, close it. try if (started) { if (debugEnabled()) // sendWindow MUST be created before starting the writer sendWindow = new Semaphore(sendWindowSize); writer = new ServerWriter(session, serverId, this, replicationServerDomain); reader = new ServerReader(session, serverId, this, replicationServerDomain); reader.start(); writer.start(); // Create a thread to send heartbeat messages. if (heartbeatInterval > 0) { TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " RS failed to start locally " + " the connection from serverID="+serverId); heartbeatThread = new HeartbeatThread( "replication Heartbeat to " + serverURL + " for " + this.baseDn, session, heartbeatInterval/3); heartbeatThread.start(); } session.close(); } catch (IOException e1) { // ignore DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); DirectoryServer.registerMonitorProvider(this); } else { // the connection is not valid, close it. try { if (debugEnabled()) { TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " RS failed to start locally " + " the connection from serverID="+serverId); } session.close(); } catch (IOException e1) { // ignore } } } else { // For a hanshakeOnly connection, let's only create a reader // in order to detect the connection closure. reader = new ServerReader(session, serverId, this, replicationServerDomain); reader.start(); } } catch (Exception e) @@ -842,22 +857,22 @@ /** * Get the age of the older change that has not yet been replicated * to the server handled by this ServerHandler. * * @return The age if the older change has not yet been replicated * to the server handled by this ServerHandler. */ public Long getApproxFirstMissingDate() { // Get the older CN received // From it, get the next sequence number // Get the CN for the next sequence number // If not present in the local RS db, // then approximate with the older update time ChangeNumber olderUpdateCN = getOlderUpdateCN(); if (olderUpdateCN == null) return null; Long result = (long)0; return olderUpdateCN.getTime(); // Get the older CN received ChangeNumber olderUpdateCN = getOlderUpdateCN(); if (olderUpdateCN != null) { // If not present in the local RS db, // then approximate with the older update time result=olderUpdateCN.getTime(); } return result; } /** @@ -874,29 +889,82 @@ /** * Get the older Change Number for that server. * Returns null when the queue is empty. * @return The older change number. */ public ChangeNumber getOlderUpdateCN() { ChangeNumber result = null; synchronized (msgQueue) { if (isFollowing()) { if (msgQueue.isEmpty()) return null; UpdateMessage msg = msgQueue.first(); return msg.getChangeNumber(); { result=null; } else { UpdateMessage msg = msgQueue.first(); result = msg.getChangeNumber(); } } else { if (lateQueue.isEmpty()) return null; { // isFollowing is false AND lateQueue is empty // We may be at the very moment when the writer has emptyed the // lateQueue when it sent the last update. The writer will fill again // the lateQueue when it will send the next update but we are not yet // there. So let's take the last change not sent directly from // the db. UpdateMessage msg = lateQueue.first(); return msg.getChangeNumber(); ReplicationIteratorComparator comparator = new ReplicationIteratorComparator(); SortedSet<ReplicationIterator> iteratorSortedSet = new TreeSet<ReplicationIterator>(comparator); try { // Build a list of candidates iterator (i.e. db i.e. server) for (short serverId : replicationServerDomain.getServers()) { // get the last already sent CN from that server ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); // get an iterator in this server db from that last change ReplicationIterator iterator = replicationServerDomain.getChangelogIterator(serverId, lastCsn); // if that iterator has changes, then it is a candidate // it is added in the sorted list at a position given by its // current change (see ReplicationIteratorComparator). if ((iterator != null) && (iterator.getChange() != null)) { iteratorSortedSet.add(iterator); } } UpdateMessage msg = iteratorSortedSet.first().getChange(); result = msg.getChangeNumber(); } catch(Exception e) { result=null; } finally { for (ReplicationIterator iterator : iteratorSortedSet) { iterator.releaseCursor(); } } } else { UpdateMessage msg = lateQueue.first(); result = msg.getChangeNumber(); } } } return result; } /** @@ -958,7 +1026,7 @@ */ while (msgQueue.size() > maxQueueSize) { following = false; setFollowing(false); msgQueue.removeFirst(); } } @@ -1083,6 +1151,13 @@ } } } // The loop below relies on the fact that it is sorted based // on the currentChange of each iterator to consider the next // change accross all servers. // Hence it is necessary to remove and eventual add again an iterator // when looping in order to keep consistent the order of the // iterators (see ReplicationIteratorComparator. while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100)) { ReplicationIterator iterator = iteratorSortedSet.first(); @@ -1107,7 +1182,7 @@ { if (msgQueue.size() < maxQueueSize) { following = true; setFollowing(true); } } } @@ -1119,7 +1194,7 @@ if (msgQueue.contains(msg)) { /* we finally catched up with the regular queue */ following = true; setFollowing(true); lateQueue.clear(); UpdateMessage msg1; do @@ -1459,14 +1534,6 @@ attributes.add(new Attribute("connected-to", this.replicationServerDomain. getReplicationServer().getMonitorInstanceName())); // Add the oldest missing update Long olderUpdateTime = this.getApproxFirstMissingDate(); if (olderUpdateTime != null) { Date date = new Date(olderUpdateTime); attributes.add(new Attribute("approx-older-change-not-synchronized", date.toString())); } } else { @@ -1477,27 +1544,42 @@ attributes.add(new Attribute("base-dn", baseDn.toString())); // Update stats // Retrieves the topology counters if (serverIsLDAPserver) { MonitorData md; try { replicationServerDomain.retrievesRemoteMonitorData(); md = replicationServerDomain.getMonitorData(); // Oldest missing update Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0)) { Date date = new Date(approxFirstMissingDate); attributes.add(new Attribute("approx-older-change-not-synchronized", date.toString())); attributes.add( new Attribute("approx-older-change-not-synchronized-millis", String.valueOf(approxFirstMissingDate))); } // Missing changes long missingChanges = md.getMissingChanges(serverId); attributes.add(new Attribute("missing-changes", String.valueOf(missingChanges))); // Replication delay long delay = md.getApproxDelay(serverId); attributes.add(new Attribute("approximate-delay", String.valueOf(delay))); } catch(Exception e) { // FIXME: We failed retrieving the remote monitor data // TODO: improve the log // We failed retrieving the remote monitor data. attributes.add(new Attribute("error", stackTraceToSingleLineString(e))); } // Compute the latency for the current SH int missingChanges = replicationServerDomain.getMissingChanges(serverState); // add the latency attribute to our monitor data attributes.add(new Attribute("missing-changes", String.valueOf(missingChanges))); } // Deprecated @@ -1532,8 +1614,6 @@ attributes.add(new Attribute("waiting-changes", String.valueOf(getRcvMsgQueueSize()))); // Age of oldest missing change attributes.add(new Attribute("approximate-delay", String.valueOf(getApproxDelay()))); // Date of the oldest missing change long olderUpdateTime = getOlderUpdateTime(); @@ -1731,14 +1811,14 @@ List<String> newRemoteLDAPservers = infoMsg.getConnectedServers(); generationId = infoMsg.getGenerationId(); synchronized(remoteLDAPservers) synchronized(connectedServers) { // Removes the existing structures for (LightweightServerHandler lsh : remoteLDAPservers) for (LightweightServerHandler lsh : connectedServers.values()) { lsh.stopHandler(); } remoteLDAPservers.clear(); connectedServers.clear(); // Creates the new structure according to the message received. for (String newConnectedServer : newRemoteLDAPservers) @@ -1746,7 +1826,7 @@ LightweightServerHandler lsh = new LightweightServerHandler(newConnectedServer, this); lsh.startHandler(); remoteLDAPservers.add(lsh); connectedServers.put(lsh.getServerId(), lsh); } } } @@ -1762,14 +1842,17 @@ */ public boolean isRemoteLDAPServer(short wantedServer) { for (LightweightServerHandler server : remoteLDAPservers) synchronized(connectedServers) { if (wantedServer == server.getServerId()) for (LightweightServerHandler server : connectedServers.values()) { return true; if (wantedServer == server.getServerId()) { return true; } } return false; } return false; } /** @@ -1781,7 +1864,7 @@ */ public boolean hasRemoteLDAPServers() { return !remoteLDAPservers.isEmpty(); return !connectedServers.isEmpty(); } /** @@ -1907,4 +1990,13 @@ { return this.replicationServerDomain; } /** * Return a Set containing the servers known by this replicationServer. * @return a set containing the servers known by this replicationServer. */ public Set<Short> getConnectedServerIds() { return connectedServers.keySet(); } } opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -120,6 +120,7 @@ { ReplicationMessage msg = session.receive(); /* if (debugEnabled()) { TRACER.debugInfo( @@ -128,6 +129,7 @@ (handler.isReplicationServer()?" From RS ":" From LS")+ " with serverId=" + serverId + " receives " + msg); } */ if (msg instanceof AckMessage) { AckMessage ack = (AckMessage) msg; opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -120,6 +120,7 @@ continue; } /* if (debugEnabled()) { TRACER.debugInfo( @@ -131,6 +132,7 @@ " server=" + handler.getServerId() + " generationId=" + handler.getGenerationId()); } */ session.publish(update); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
@@ -316,6 +316,10 @@ CN1 = new ChangeNumber((long)0, 3, (short)0); // 3-0 = 3 CN2 = new ChangeNumber((long)0, 0, (short)0); assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 3); // 3-1 = 2 CN2 = new ChangeNumber((long)0, 1, (short)0); assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 2); @@ -327,5 +331,15 @@ // 3-4 == MAXINT (modulo) CN2 = new ChangeNumber((long)0, 4, (short)0); assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE); CN1 = new ChangeNumber((long)0, 0, (short)0); // 0-0 = 0 CN2 = new ChangeNumber((long)0, 0, (short)0); assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 0); // 0-1 = MAXINT(modulo) CN2 = new ChangeNumber((long)0, 1, (short)0); assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -496,7 +496,7 @@ state.update(new ChangeNumber((long)1, 1,(short)1)); ServerStartMessage msg = new ServerStartMessage(serverId, baseDN, window, window, window, window, window, window, state, (short)1, (long)1, true); (long)1, true, false); ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes()); assertEquals(msg.getServerId(), newMsg.getServerId()); assertEquals(msg.getBaseDn(), newMsg.getBaseDn()); @@ -511,6 +511,7 @@ newMsg.getServerState().getMaxChangeNumber((short)1)); assertEquals(msg.getVersion(), newMsg.getVersion()); assertEquals(msg.getGenerationId(), newMsg.getGenerationId()); assertEquals(msg.isHandshakeOnly(), newMsg.isHandshakeOnly()); } @DataProvider(name="changelogStart") @@ -614,20 +615,28 @@ (short) 123, sid2); s2.update(cn2); // LS3 state ServerState s3 = new ServerState(); short sid3 = 333; ChangeNumber cn3 = new ChangeNumber(now, (short) 123, sid3); s3.update(cn3); MonitorMessage msg = new MonitorMessage(sender, dest); msg.setReplServerState(rsState); msg.setLDAPServerState(sid1, s1, now+1); msg.setLDAPServerState(sid2, s2, now+2); msg.setReplServerDbState(rsState); msg.setServerState(sid1, s1, now+1, true); msg.setServerState(sid2, s2, now+2, true); msg.setServerState(sid3, s3, now+3, false); byte[] b = msg.getBytes(); MonitorMessage newMsg = new MonitorMessage(b); assertEquals(rsState, msg.getReplServerState()); assertEquals(newMsg.getReplServerState().toString(), msg.getReplServerState().toString()); assertEquals(rsState, msg.getReplServerDbState()); assertEquals(newMsg.getReplServerDbState().toString(), msg.getReplServerDbState().toString()); Iterator<Short> it = newMsg.iterator(); Iterator<Short> it = newMsg.ldapIterator(); while (it.hasNext()) { short sid = it.next(); @@ -635,16 +644,32 @@ if (sid == sid1) { assertEquals(s.toString(), s1.toString(), ""); assertEquals((Long)(now+1), newMsg.getApproxFirstMissingDate(sid), ""); assertEquals((Long)(now+1), newMsg.getLDAPApproxFirstMissingDate(sid), ""); } else if (sid == sid2) { assertEquals(s.toString(), s2.toString()); assertEquals((Long)(now+2), newMsg.getApproxFirstMissingDate(sid), ""); assertEquals((Long)(now+2), newMsg.getLDAPApproxFirstMissingDate(sid), ""); } else { fail("Bad sid"); fail("Bad sid" + sid); } } Iterator<Short> it2 = newMsg.rsIterator(); while (it2.hasNext()) { short sid = it2.next(); ServerState s = newMsg.getRSServerState(sid); if (sid == sid3) { assertEquals(s.toString(), s3.toString(), ""); assertEquals((Long)(now+3), newMsg.getRSApproxFirstMissingDate(sid), ""); } else { fail("Bad sid " + sid); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -874,7 +874,7 @@ ServerStartMessage msg = new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"), 0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(), ProtocolVersion.currentVersion(), 0, sslEncryption); ProtocolVersion.currentVersion(), 0, sslEncryption, false); session.publish(msg); // Read the Replication Server state from the ReplServerStartMessage that @@ -907,7 +907,7 @@ 0, 0, 0, 0, WINDOW, (long) 5000, replServerState, ProtocolVersion.currentVersion(), ReplicationTestCase.getGenerationId(baseDn), sslEncryption); sslEncryption, false); session.publish(msg); // Read the ReplServerStartMessage that come back.