opends/src/messages/messages/replication.properties
@@ -251,4 +251,10 @@ export-ldif command must be run as a task NOTICE_SSL_SERVER_CON_ATTEMPT_ERROR_105=SSL connection attempt from %s (%s) \ failed: %s SEVERE_ERR_MISSING_REMOTE_MONITOR_DATA_106=Monitor data of remote servers \ are missing due to an error in the retrieval process SEVERE_ERR_PROCESSING_REMOTE_MONITOR_DATA_107=Monitor data of remote servers \ are missing due to a processing error : %s SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \ sending request to get remote monitor data opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.common; @@ -185,6 +185,40 @@ } } /** * Computes the difference in number of changes between 2 * change numbers. * @param op1 the first ChangeNumber * @param op2 the second ChangeNumber * @return the difference */ public static int diffSeqNum(ChangeNumber op1, ChangeNumber op2) { int totalCount = 0; int max = op1.getSeqnum(); if (op2 != null) { int current = op2.getSeqnum(); if (current == max) { } else if (current < max) { totalCount += max - current; } else { totalCount += Integer.MAX_VALUE - (current - max) + 1; } } else { totalCount += max; } return totalCount; } /** * check if the current Object is strictly older than ChangeNumber * given in parameter. opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -258,7 +258,7 @@ @Override public String toString() { return ("ADD " + getDn() + " " + getChangeNumber()); return ("ADD DN=(" + getDn() + ") CN=(" + getChangeNumber() + ")"); } /** opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
New file @@ -0,0 +1,397 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2008 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Set; import org.opends.server.replication.common.ServerState; import org.opends.server.protocols.asn1.ASN1OctetString; import org.opends.server.protocols.asn1.ASN1Sequence; import org.opends.server.protocols.asn1.ASN1Element; import org.opends.server.replication.common.ChangeNumber; /** * This message is part of the replication protocol. * This message is sent by a server to one or several other servers and * contain one entry to be sent over the protocol in the context of * an import/export over the protocol. */ public class MonitorMessage extends RoutableMessage implements Serializable { private static final long serialVersionUID = -1900670921496804942L; /** * FIXME. * */ class ServerData { ServerState state; Long approxFirstMissingDate; } /** * FIXME. * */ class SubTopoMonitorData { ServerState replServerState; HashMap<Short, ServerData> ldapStates = new HashMap<Short, ServerData>(); } SubTopoMonitorData data = new SubTopoMonitorData();; /** * Creates a new EntryMessage. * * @param sender The sender of this message. * @param destination The destination of this message. */ public MonitorMessage(short sender, short destination) { super(sender, destination); } /** * FIXME. * @param state a. */ public void setReplServerState(ServerState state) { data.replServerState = state; } /** * FIXME. * @param serverId a. * @param state a. * @param olderUpdateTime a. * */ public void setLDAPServerState(short serverId, ServerState state, Long olderUpdateTime) { if (data.ldapStates == null) { data.ldapStates = new HashMap<Short, ServerData>(); } ServerData sd = new ServerData(); sd.state = state; sd.approxFirstMissingDate = olderUpdateTime; data.ldapStates.put(serverId, sd); } /** * FIXME. * @param serverId a. * @return a. */ public ServerState getLDAPServerState(short serverId) { return data.ldapStates.get(serverId).state; } /** * FIXME. * @param serverId a. * @return a. */ public Long getApproxFirstMissingDate(short serverId) { return data.ldapStates.get(serverId).approxFirstMissingDate; } /** * Creates a new EntryMessage from its encoded form. * * @param in The byte array containing the encoded form of the message. * @throws DataFormatException If the byte array does not contain a valid * encoded form of the ServerStartMessage. */ public MonitorMessage(byte[] in) throws DataFormatException { try { /* first byte is the type */ if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR) throw new DataFormatException("input is not a valid " + this.getClass().getCanonicalName()); int pos = 1; // sender int length = getNextLength(in, pos); String senderIDString = new String(in, pos, length, "UTF-8"); this.senderID = Short.valueOf(senderIDString); pos += length +1; // destination length = getNextLength(in, pos); String destinationString = new String(in, pos, length, "UTF-8"); this.destination = Short.valueOf(destinationString); pos += length +1; /* Read the states : all the remaining bytes but the terminating 0 */ byte[] encodedS = new byte[in.length-pos-1]; int i =0; while (pos<in.length-1) { encodedS[i++] = in[pos++]; } try { ASN1Sequence s0 = ASN1Sequence.decodeAsSequence(encodedS); for (ASN1Element el0 : s0.elements()) { ServerState newState = new ServerState(); short serverId = 0; Long outime = (long)0; ASN1Sequence s1 = el0.decodeAsSequence(); for (ASN1Element el1 : s1.elements()) { ASN1OctetString o = el1.decodeAsOctetString(); String s = o.stringValue(); ChangeNumber cn = new ChangeNumber(s); if ((data.replServerState != null) && (serverId == 0)) { serverId = cn.getServerId(); outime = cn.getTime(); } else { newState.update(cn); } } // the first state is the replication state if (data.replServerState == null) { data.replServerState = newState; } else { ServerData sd = new ServerData(); sd.state = newState; sd.approxFirstMissingDate = outime; data.ldapStates.put(serverId, sd); } } } catch(Exception e) { } } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * {@inheritDoc} */ @Override public byte[] getBytes() { try { byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8"); byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8"); int length = 1 + senderBytes.length + 1 + destinationBytes.length; ASN1Sequence stateElementSequence = new ASN1Sequence(); ArrayList<ASN1Element> stateElementList = new ArrayList<ASN1Element>(); // First loop computes the length /* Put the serverStates ... */ stateElementSequence = new ASN1Sequence(); stateElementList = new ArrayList<ASN1Element>(); /* first put the Replication Server state */ ArrayList<ASN1OctetString> cnOctetList = data.replServerState.toASN1ArrayList(); ArrayList<ASN1Element> cnElementList = new ArrayList<ASN1Element>(); for (ASN1OctetString soci : cnOctetList) { cnElementList.add(soci); } ASN1Sequence cnSequence = new ASN1Sequence(cnElementList); stateElementList.add(cnSequence); // then the LDAP server data Set<Short> servers = data.ldapStates.keySet(); for (Short sid : servers) { // State ServerState statei = data.ldapStates.get(sid).state; // First missing date Long outime = data.ldapStates.get(sid).approxFirstMissingDate; // retrieves the change numbers as an arrayList of ANSN1OctetString cnOctetList = statei.toASN1ArrayList(); cnElementList = new ArrayList<ASN1Element>(); // a fake changenumber helps storing the LDAP server ID // and the olderupdatetime ChangeNumber cn = new ChangeNumber(outime,0,sid); cnElementList.add(new ASN1OctetString(cn.toString())); // the changenumbers for (ASN1OctetString soci : cnOctetList) { cnElementList.add(soci); } cnSequence = new ASN1Sequence(cnElementList); stateElementList.add(cnSequence); } stateElementSequence.setElements(stateElementList); int seqLen = stateElementSequence.encode().length; // length += seqLen; length += 2; // Allocate the array sized from the computed length byte[] resultByteArray = new byte[length]; // Second loop build the array /* put the type of the operation */ resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR; int pos = 1; pos = addByteArray(senderBytes, resultByteArray, pos); pos = addByteArray(destinationBytes, resultByteArray, pos); /* Put the serverStates ... */ stateElementSequence = new ASN1Sequence(); stateElementList = new ArrayList<ASN1Element>(); /* first put the Replication Server state */ cnOctetList = data.replServerState.toASN1ArrayList(); cnElementList = new ArrayList<ASN1Element>(); for (ASN1OctetString soci : cnOctetList) { cnElementList.add(soci); } cnSequence = new ASN1Sequence(cnElementList); stateElementList.add(cnSequence); // then the LDAP server state servers = data.ldapStates.keySet(); for (Short sid : servers) { ServerState statei = data.ldapStates.get(sid).state; Long outime = data.ldapStates.get(sid).approxFirstMissingDate; // retrieves the change numbers as an arrayList of ANSN1OctetString cnOctetList = statei.toASN1ArrayList(); cnElementList = new ArrayList<ASN1Element>(); // a fake changenumber helps storing the LDAP server ID ChangeNumber cn = new ChangeNumber(outime,0,sid); cnElementList.add(new ASN1OctetString(cn.toString())); // the changenumbers for (ASN1OctetString soci : cnOctetList) { cnElementList.add(soci); } cnSequence = new ASN1Sequence(cnElementList); stateElementList.add(cnSequence); } stateElementSequence.setElements(stateElementList); pos = addByteArray(stateElementSequence.encode(), resultByteArray, pos); return resultByteArray; } catch (UnsupportedEncodingException e) { return null; } } /** * FIXME. * @return FIXME. */ public ServerState getReplServerState() { return data.replServerState; } /** * FIXME. * @return a. */ public Iterator<Short> iterator() { return data.ldapStates.keySet().iterator(); } /** * {@inheritDoc} */ @Override public String toString() { String stateS = " RState:"; stateS += "/" + data.replServerState.toString(); stateS += " LDAPStates:"; Iterator<ServerData> it = data.ldapStates.values().iterator(); while (it.hasNext()) { ServerData sd = it.next(); stateS += "/ state=" + sd.state.toString() + " afmd=" + sd.approxFirstMissingDate + "] "; } String me = this.getClass().getCanonicalName() + " sender=" + this.senderID + " destination=" + this.destination + " states=" + stateS + "]"; return me; } } opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java
New file @@ -0,0 +1,122 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2008 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; /** * This message is part of the replication protocol. * FIXME: usage */ public class MonitorRequestMessage extends RoutableMessage implements Serializable { private static final long serialVersionUID = -2407640479423633234L; /** * Creates a message. * * @param sender The sender server of this message. * @param destination The server or servers targetted by this message. */ public MonitorRequestMessage(short sender, short destination) { super(sender, destination); } /** * Creates a new message by decoding the provided byte array. * @param in A byte array containing the encoded information for the message, * @throws DataFormatException If the in does not contain a properly, * encoded message. */ public MonitorRequestMessage(byte[] in) throws DataFormatException { super(); try { // First byte is the type if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST) throw new DataFormatException("input is not a valid " + this.getClass().getCanonicalName()); int pos = 1; // sender int length = getNextLength(in, pos); String senderString = new String(in, pos, length, "UTF-8"); this.senderID = Short.valueOf(senderString); pos += length +1; // destination length = getNextLength(in, pos); String destinationString = new String(in, pos, length, "UTF-8"); this.destination = Short.valueOf(destinationString); pos += length +1; } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * {@inheritDoc} */ @Override public byte[] getBytes() { try { byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8"); byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8"); int length = 1 + senderBytes.length + 1 + destinationBytes.length + 1; byte[] resultByteArray = new byte[length]; /* put the type of the operation */ resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR_REQUEST; int pos = 1; /* put the sender */ pos = addByteArray(senderBytes, resultByteArray, pos); /* put the destination */ pos = addByteArray(destinationBytes, resultByteArray, pos); return resultByteArray; } catch (UnsupportedEncodingException e) { return null; } } } opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2007 Sun Microsystems, Inc. * Portions Copyright 2007-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -67,7 +67,7 @@ /* first byte is the type */ if (in.length < 1 || in[0] != MSG_TYPE_REPL_SERVER_INFO) throw new DataFormatException( "Input is not a valid changelogInfo Message."); "Input is not a valid " + this.getClass().getCanonicalName()); int pos = 1; @@ -97,7 +97,7 @@ /** * Creates a new changelogInfo message from a list of the currently * Creates a new ReplServerInfo message from a list of the currently * connected servers. * * @param connectedServers The list of currently connected servers ID. opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -55,6 +55,8 @@ static final byte MSG_TYPE_WINDOW_PROBE = 15; static final byte MSG_TYPE_REPL_SERVER_INFO = 16; static final byte MSG_TYPE_RESET_GENERATION_ID = 17; static final byte MSG_TYPE_REPL_SERVER_MONITOR_REQUEST = 18; static final byte MSG_TYPE_REPL_SERVER_MONITOR = 19; // Adding a new type of message here probably requires to // change accordingly generateMsg method below @@ -79,6 +81,8 @@ * MSG_TYPE_WINDOW_PROBE * MSG_TYPE_REPL_SERVER_INFO * MSG_TYPE_RESET_GENERATION_ID * MSG_TYPE_REPL_SERVER_MONITOR_REQUEST * MSG_TYPE_REPL_SERVER_MONITOR * * @return the byte[] representation of this message. * @throws UnsupportedEncodingException When the encoding of the message @@ -152,6 +156,12 @@ case MSG_TYPE_REPL_SERVER_INFO: msg = new ReplServerInfoMessage(buffer); break; case MSG_TYPE_REPL_SERVER_MONITOR_REQUEST: msg = new MonitorRequestMessage(buffer); break; case MSG_TYPE_REPL_SERVER_MONITOR: msg = new MonitorMessage(buffer); break; default: throw new DataFormatException("received message with unknown type"); } @@ -192,7 +202,7 @@ while (in[offset++] != 0) { if (offset >= in.length) throw new DataFormatException("byte[] is not a valid modify msg"); throw new DataFormatException("byte[] is not a valid msg"); length++; } return length; opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
New file @@ -0,0 +1,270 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2008 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.util.ArrayList; import java.util.Date; import java.util.LinkedHashSet; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.MonitorProvider; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ServerState; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.InitializationException; /** * This class defines a server handler dedicated to the remote LDAP servers * connected to a remote Replication Server. * This class is necessary because we want to provide monitor entries for them * and because the monitor API only allows one entry by MonitorProvider instance * so that no other class can provide the monitor entry for these objects. * * One instance of this class is created for each instance of remote LDAP server * connected to a remote Replication Server. */ public class LightweightServerHandler extends MonitorProvider<MonitorProviderCfg> { // The tracer object for the debug logger. private static final DebugTracer TRACER = getTracer(); short serverId; ServerHandler replServerHandler; ReplicationServerDomain rsDomain; DN baseDn; /** * Creates a new LighweightServerHandler with the provided serverid, connected * to the remote Replication Server represented by replServerHandler. * * @param serverId The serverId of this remote LDAP server. * @param replServerHandler The server handler of the Replication Server to * which this LDAP server is remotely connected. */ public LightweightServerHandler(String serverId, ServerHandler replServerHandler) { super("Server Handler"); this.serverId = Short.valueOf(serverId); this.replServerHandler = replServerHandler; this.rsDomain = replServerHandler.getDomain(); this.baseDn = rsDomain.getBaseDn(); if (debugEnabled()) TRACER.debugInfo( "In " + replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName()+ " LWSH for remote server " + this.serverId + " connected to:" + this.replServerHandler.getMonitorInstanceName() + " ()"); } /** * Get the serverID associated with this LDAP server. * @return The serverId. */ public short getServerId() { return Short.valueOf(serverId); } /** * Stop this server handler processing. */ public void startHandler() { if (debugEnabled()) TRACER.debugInfo( "In " + replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() + " LWSH for remote server " + this.serverId + " connected to:" + this.replServerHandler.getMonitorInstanceName() + " start"); DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); DirectoryServer.registerMonitorProvider(this); } /** * Stop this server handler processing. */ public void stopHandler() { if (debugEnabled()) TRACER.debugInfo( "In " + replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() + " LWSH for remote server " + this.serverId + " connected to:" + this.replServerHandler.getMonitorInstanceName() + " stop"); DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); } /** * {@inheritDoc} */ @Override public void initializeMonitorProvider(MonitorProviderCfg configuration) throws ConfigException,InitializationException { // Nothing to do for now } /** * Retrieves the name of this monitor provider. It should be unique among all * monitor providers, including all instances of the same monitor provider. * * @return The name of this monitor provider. */ @Override public String getMonitorInstanceName() { String serverURL=""; // FIXME String str = baseDn.toString() + " " + serverURL + " " + String.valueOf(serverId); return "Undirect LDAP Server " + str; } /** * Retrieves the length of time in milliseconds that should elapse between * calls to the <CODE>updateMonitorData()</CODE> method. A negative or zero * return value indicates that the <CODE>updateMonitorData()</CODE> method * should not be periodically invoked. * * @return The length of time in milliseconds that should elapse between * calls to the <CODE>updateMonitorData()</CODE> method. */ @Override public long getUpdateInterval() { /* we don't wont to do polling on this monitor */ return 0; } /** * Performs any processing periodic processing that may be desired to update * the information associated with this monitor. Note that best-effort * attempts will be made to ensure that calls to this method come * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will * be made. */ @Override public void updateMonitorData() { // As long as getUpdateInterval() returns 0, this will never get called } /** * Retrieves a set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is requested. * * @return A set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is * requested. */ @Override public ArrayList<Attribute> getMonitorData() { if (debugEnabled()) TRACER.debugInfo( "In " + this.replServerHandler.getDomain().getReplicationServer(). getMonitorInstanceName()+ " LWSH for remote server " + this.serverId + " connected to:" + this.replServerHandler.getMonitorInstanceName() + " getMonitor data"); ArrayList<Attribute> attributes = new ArrayList<Attribute>(); attributes.add(new Attribute("server-id", String.valueOf(serverId))); attributes.add(new Attribute("base-dn", rsDomain.getBaseDn().toNormalizedString())); attributes.add(new Attribute("connected-to", replServerHandler.getMonitorInstanceName())); // Retrieves the topology counters try { rsDomain.retrievesRemoteMonitorData(); // Compute the latency for the current SH ServerState remoteState = rsDomain.getServerState(serverId); if (remoteState == null) { remoteState = new ServerState(); } /* get the Server State */ final String ATTR_SERVER_STATE = "server-state"; AttributeType type = DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE); LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); for (String str : remoteState.toStringSet()) { values.add(new AttributeValue(type,str)); } Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values); attributes.add(attr); // add the latency attribute to our monitor data // Compute the latency for the current SH int missingChanges = rsDomain.getMissingChanges(remoteState); attributes.add(new Attribute("missing-changes", String.valueOf(missingChanges))); // Add the oldest missing update Long olderUpdateTime = rsDomain.getApproxFirstMissingDate(serverId); if (olderUpdateTime != null) { Date date = new Date(olderUpdateTime); attributes.add(new Attribute("approx-older-change-not-synchronized", date.toString())); } } catch(Exception e) { // We failed retrieving the remote monitor data. attributes.add(new Attribute("error", stackTraceToSingleLineString(e))); } return attributes; } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,25 +22,31 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import static org.opends.server.loggers.debug.DebugLogger.*; import org.opends.server.loggers.debug.DebugTracer; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.messages.ReplicationMessages.*; import static org.opends.messages.ToolMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.Iterator; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; @@ -49,9 +55,13 @@ import org.opends.server.replication.protocol.RoutableMessage; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.replication.protocol.ReplServerInfoMessage; import org.opends.server.replication.protocol.MonitorMessage; import org.opends.server.replication.protocol.MonitorRequestMessage; import org.opends.server.replication.protocol.ResetGenerationId; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.ResultCode; import org.opends.server.util.TimeThread; import com.sleepycat.je.DatabaseException; /** @@ -118,6 +128,34 @@ */ private static final DebugTracer TRACER = getTracer(); /* Monitor data management */ // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable private long remoteMonitorDataLifeTime = 500; /* Search op on monitor data is processed by a worker thread. * Requests are sent to the other RS,and responses are received by the * listener threads. * The worker thread is awoke on this semaphore, or on timeout. */ Semaphore remoteMonitorResponsesSemaphore; /* The date of the last time they have been elaborated */ private long validityDate = 0; // For each LDAP server, its server state private HashMap<Short, ServerState> LDAPStates = new HashMap<Short, ServerState>(); // For each LDAP server, the last CN it published private HashMap<Short, ChangeNumber> maxCNs = new HashMap<Short, ChangeNumber>(); // For each LDAP server, an approximation of the date of the first missing // change private HashMap<Short, Long> approxFirstMissingDate = new HashMap<Short, Long>(); /** * Creates a new ReplicationServerDomain associated to the DN baseDn. * @@ -352,7 +390,7 @@ } else { if (!rsh.getRemoteLDAPServers().isEmpty()) if (rsh.hasRemoteLDAPServers()) { lDAPServersConnectedInTheTopology = true; @@ -636,7 +674,7 @@ // server connected for (ServerHandler rsh : replicationServers.values()) { if (!rsh.getRemoteLDAPServers().isEmpty()) if (rsh.hasRemoteLDAPServers()) { servers.add(rsh); } @@ -693,15 +731,58 @@ */ public void process(RoutableMessage msg, ServerHandler senderHandler) { // A replication server is not expected to be the destination // of a routable message except for an error message. // Test the message for which a ReplicationServer is expected // to be the destination if (msg.getDestination() == this.replicationServer.getServerId()) { if (msg instanceof ErrorMessage) { ErrorMessage errorMsg = (ErrorMessage)msg; logError(ERR_ERROR_MSG_RECEIVED.get( errorMsg.getDetails())); errorMsg.getDetails())); } else if (msg instanceof MonitorRequestMessage) { MonitorRequestMessage replServerMonitorRequestMsg = (MonitorRequestMessage) msg; MonitorMessage monitorMsg = new MonitorMessage( replServerMonitorRequestMsg.getDestination(), replServerMonitorRequestMsg.getsenderID()); // Populate the RS state in the msg from the DbState monitorMsg.setReplServerState(this.getDbServerState()); // Populate for each connected LDAP Server // from the states stored in the serverHandler. // - the server state // - the older missing change for (ServerHandler lsh : this.connectedServers.values()) { monitorMsg.setLDAPServerState( lsh.getServerId(), lsh.getServerState(), lsh.getApproxFirstMissingDate()); } try { senderHandler.send(monitorMsg); } catch(Exception e) { // We log the error. The requestor will detect a timeout or // any other failure on the connection. logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( Short.toString((msg.getDestination())))); } } else if (msg instanceof MonitorMessage) { MonitorMessage monitorMsg = (MonitorMessage) msg; receivesMonitorDataResponse(monitorMsg); } else { @@ -1156,4 +1237,288 @@ { return replicationServer; } /* * Monitor Data generation */ /** * Retrieves the remote monitor data. * * @throws DirectoryException When an error occurs. */ protected void retrievesRemoteMonitorData() throws DirectoryException { if (validityDate > TimeThread.getTime()) { // The current data are still valid. No need to renew them. return; } // Clean this.LDAPStates.clear(); this.maxCNs.clear(); // Init the maxCNs of our direct LDAP servers from our own dbstate for (ServerHandler rs : connectedServers.values()) { short serverID = rs.getServerId(); ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID); if (cn == null) { // we have nothing in db for that server cn = new ChangeNumber(0, 0 , serverID); } this.maxCNs.put(serverID, cn); } ServerState replServerState = this.getDbServerState(); Iterator<Short> it = replServerState.iterator(); while (it.hasNext()) { short sid = it.next(); ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid); ChangeNumber maxCN = this.maxCNs.get(sid); if ((maxCN != null) && (receivedCN.newer(maxCN))) { // We found a newer one this.maxCNs.remove(sid); this.maxCNs.put(sid, receivedCN); } } // Send Request to the other Replication Servers if (remoteMonitorResponsesSemaphore == null) { remoteMonitorResponsesSemaphore = new Semaphore( replicationServers.size() -1); sendMonitorDataRequest(); // Wait reponses from them or timeout waitMonitorDataResponses(replicationServers.size()); } else { // The processing of renewing the monitor cache is already running // We'll make it sleeping until the end while (remoteMonitorResponsesSemaphore!=null) { waitMonitorDataResponses(1); } } // Now we have the expected answers of an error occured validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime; if (debugEnabled()) { debugMonitorData(); } } private void debugMonitorData() { String mds = " Monitor data="; Iterator<Short> ite = LDAPStates.keySet().iterator(); while (ite.hasNext()) { Short sid = ite.next(); ServerState ss = LDAPStates.get(sid); mds += " LDAPState(" + sid + ")=" + ss.toString(); } Iterator<Short> itc = maxCNs.keySet().iterator(); while (itc.hasNext()) { Short sid = itc.next(); ChangeNumber cn = maxCNs.get(sid); mds += " maxCNs(" + sid + ")=" + cn.toString(); } mds += "--"; TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + " baseDN=" + baseDn + mds); } /** * Sends a MonitorRequest message to all connected RS. * @throws DirectoryException when a problem occurs. */ protected void sendMonitorDataRequest() throws DirectoryException { try { for (ServerHandler rs : replicationServers.values()) { MonitorRequestMessage msg = new MonitorRequestMessage(this.replicationServer.getServerId(), rs.getServerId()); rs.send(msg); } } catch(Exception e) { Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get(); logError(message); throw new DirectoryException(ResultCode.OTHER, message, e); } } /** * Wait for the expected count of received MonitorMessage. * @param expectedResponses The number of expected answers. * @throws DirectoryException When an error occurs. */ protected void waitMonitorDataResponses(int expectedResponses) throws DirectoryException { try { boolean allPermitsAcquired = remoteMonitorResponsesSemaphore.tryAcquire( expectedResponses, (long) 500, TimeUnit.MILLISECONDS); if (!allPermitsAcquired) { logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); } else { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + "Successfully received all " + replicationServers.size() + " expected monitor messages"); } } catch(Exception e) { logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage())); } finally { remoteMonitorResponsesSemaphore = null; } } /** * Processes a Monitor message receives from a remote Replication Server * and stores the data received. * * @param msg The message to be processed. */ public void receivesMonitorDataResponse(MonitorMessage msg) { if (remoteMonitorResponsesSemaphore == null) { // Ignoring the remote monitor data because an error occured previously return; } try { // Here is the RS state : list <serverID, lastChangeNumber> // For each LDAP Server, we keep the max CN accross the RSes ServerState replServerState = msg.getReplServerState(); Iterator<Short> it = replServerState.iterator(); while (it.hasNext()) { short sid = it.next(); ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid); ChangeNumber maxCN = this.maxCNs.get(sid); if (receivedCN.newer(maxCN)) { // We found a newer one this.maxCNs.remove(sid); this.maxCNs.put(sid, receivedCN); } } // Store the LDAP servers states Iterator<Short> sidIterator = msg.iterator(); while (sidIterator.hasNext()) { short sid = sidIterator.next(); ServerState ss = msg.getLDAPServerState(sid); this.LDAPStates.put(sid, ss); this.approxFirstMissingDate.put(sid, msg.getApproxFirstMissingDate(sid)); } // Decreases the number of expected responses and potentially // wakes up the waiting requestor thread. remoteMonitorResponsesSemaphore.release(); } catch (Exception e) { // If an exception occurs while processing one of the expected message, // the processing is aborted and the waiting thread is awoke. remoteMonitorResponsesSemaphore.notifyAll(); } } /** * Get the state of the LDAP server with the provided serverId. * @param serverId The server ID. * @return The server state. */ public ServerState getServerState(short serverId) { return LDAPStates.get(serverId); } /** * Get the highest know change number of the LDAP server with the provided * serverId. * @param serverId The server ID. * @return The highest change number. */ public ChangeNumber getMaxCN(short serverId) { return maxCNs.get(serverId); } /** * Get an approximation of the date of the oldest missing changes. * serverId. * @param serverId The server ID. * @return The approximation of the date of the oldest missing change. */ public Long getApproxFirstMissingDate(short serverId) { return approxFirstMissingDate.get(serverId); } /** * Get the number of missing change for the server with the provided state. * @param state The provided server state. * @return The number of missing changes. */ public int getMissingChanges(ServerState state) { // Traverse the max Cn transmitted by each server // For each server, get the highest CN know from the current server // Sum the difference betwenn the max and the last int missingChanges = 0; Iterator<Short> itc = maxCNs.keySet().iterator(); while (itc.hasNext()) { Short sid = itc.next(); ChangeNumber maxCN = maxCNs.get(sid); ChangeNumber last = state.getMaxChangeNumber(sid); if (last == null) { last = new ChangeNumber(0,0, sid); } int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last); missingChanges += missingChangesFromSID; } return missingChanges; } } opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -22,33 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.server; @@ -62,9 +36,9 @@ import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.IOException; import java.util.Date; import java.util.List; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.Map; @@ -150,11 +124,12 @@ /** * When this Handler is connected to a changelog server this collection * will contain the list of LDAP servers connected to the remote changelog * server. * When this Handler is connected to a remote replication server * this collection will contain as many elements as there are * LDAP servers connected to the remote replication server. */ private List<String> remoteLDAPservers = new ArrayList<String>(); private List<LightweightServerHandler> remoteLDAPservers = new ArrayList<LightweightServerHandler>(); /** * The time in milliseconds between heartbeats from the replication @@ -830,27 +805,8 @@ ServerState dbState = replicationServerDomain.getDbServerState(); for (short id : dbState) { int max = dbState.getMaxChangeNumber(id).getSeqnum(); ChangeNumber currentChange = serverState.getMaxChangeNumber(id); if (currentChange != null) { int current = currentChange.getSeqnum(); if (current == max) { } else if (current < max) { totalCount += max - current; } else { totalCount += Integer.MAX_VALUE - (current - max) + 1; } } else { totalCount += max; } totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id), serverState.getMaxChangeNumber(id)); } return totalCount; } @@ -858,7 +814,7 @@ } /** * Get an approximation of the delay by looking at the age of the odest * Get an approximation of the delay by looking at the age of the oldest * message that has not been sent to this server. * This is an approximation because the age is calculated using the * clock of the servee where the replicationServer is currently running @@ -886,25 +842,65 @@ * @return The age if the older change has not yet been replicated * to the server handled by this ServerHandler. */ public Long getApproxFirstMissingDate() { // Get the older CN received // From it, get the next sequence number // Get the CN for the next sequence number // If not present in the local RS db, // then approximate with the older update time ChangeNumber olderUpdateCN = getOlderUpdateCN(); if (olderUpdateCN == null) return null; ReplicationIterator ri = replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN); if (ri != null) { if (ri.next()) { ChangeNumber firstMissingChange = ri.getChange().getChangeNumber(); return firstMissingChange.getTime(); } } return olderUpdateCN.getTime(); } /** * Get the older update time for that server. * @return The older update time. */ public long getOlderUpdateTime() { ChangeNumber olderUpdateCN = getOlderUpdateCN(); if (olderUpdateCN == null) return 0; return olderUpdateCN.getTime(); } /** * Get the older Change Number for that server. * @return The older change number. */ public ChangeNumber getOlderUpdateCN() { synchronized (msgQueue) { if (isFollowing()) { if (msgQueue.isEmpty()) return 0; return null; UpdateMessage msg = msgQueue.first(); return msg.getChangeNumber().getTime(); return msg.getChangeNumber(); } else { if (lateQueue.isEmpty()) return 0; return null; UpdateMessage msg = lateQueue.first(); return msg.getChangeNumber().getTime(); return msg.getChangeNumber(); } } } @@ -1190,6 +1186,16 @@ } /** * Get the state of this server. * * @return ServerState the state for this server.. */ public ServerState getServerState() { return serverState; } /** * Stop this server handler processing. */ public void stopHandler() @@ -1397,7 +1403,7 @@ " " + serverURL + " " + String.valueOf(serverId); if (serverIsLDAPserver) return "Remote LDAP Server " + str; return "Direct LDAP Server " + str; else return "Remote Repl Server " + str; } @@ -1445,28 +1451,68 @@ { ArrayList<Attribute> attributes = new ArrayList<Attribute>(); if (serverIsLDAPserver) { attributes.add(new Attribute("LDAP-Server", serverURL)); attributes.add(new Attribute("connected-to", this.replicationServerDomain. getReplicationServer().getMonitorInstanceName())); // Add the oldest missing update Long olderUpdateTime = this.getApproxFirstMissingDate(); if (olderUpdateTime != null) { Date date = new Date(olderUpdateTime); attributes.add(new Attribute("approx-older-change-not-synchronized", date.toString())); } } else { attributes.add(new Attribute("ReplicationServer-Server", serverURL)); } attributes.add(new Attribute("server-id", String.valueOf(serverId))); attributes.add(new Attribute("base-dn", baseDn.toString())); attributes.add(new Attribute("waiting-changes", String.valueOf(getRcvMsgQueueSize()))); attributes.add(new Attribute("max-waiting-changes", String.valueOf(maxQueueSize))); attributes.add(new Attribute("update-waiting-acks", String.valueOf(getWaitingAckSize()))); // Update stats // Retrieves the topology counters if (serverIsLDAPserver) { try { replicationServerDomain.retrievesRemoteMonitorData(); } catch(Exception e) { // FIXME: We failed retrieving the remote monitor data } // Compute the latency for the current SH int missingChanges = replicationServerDomain.getMissingChanges(serverState); // add the latency attribute to our monitor data attributes.add(new Attribute("missing-changes", String.valueOf(missingChanges))); } // Deprecated // attributes.add(new Attribute("max-waiting-changes", // String.valueOf(maxQueueSize))); attributes.add(new Attribute("update-sent", String.valueOf(getOutCount()))); attributes.add(new Attribute("update-received", String.valueOf(getInCount()))); // Deprecated as long as assured is not exposed attributes.add(new Attribute("update-waiting-acks", String.valueOf(getWaitingAckSize()))); attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount()))); attributes.add(new Attribute("ack-received", String.valueOf(getInAckCount()))); attributes.add(new Attribute("approximate-delay", String.valueOf(getApproxDelay()))); // Window stats attributes.add(new Attribute("max-send-window", String.valueOf(sendWindowSize))); attributes.add(new Attribute("current-send-window", @@ -1475,6 +1521,18 @@ String.valueOf(maxRcvWindow))); attributes.add(new Attribute("current-rcv-window", String.valueOf(rcvWindow))); /* * FIXME:PGB DEPRECATED * // Missing changes attributes.add(new Attribute("waiting-changes", String.valueOf(getRcvMsgQueueSize()))); // Age of oldest missing change attributes.add(new Attribute("approximate-delay", String.valueOf(getApproxDelay()))); // Date of the oldest missing change long olderUpdateTime = getOlderUpdateTime(); if (olderUpdateTime != 0) { @@ -1482,6 +1540,7 @@ attributes.add(new Attribute("older-change-not-synchronized", String.valueOf(date.toString()))); } */ /* get the Server State */ final String ATTR_SERVER_STATE = "server-state"; @@ -1495,9 +1554,11 @@ Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values); attributes.add(attr); // Encryption attributes.add(new Attribute("ssl-encryption", String.valueOf(session.isEncrypted()))); // Data generation attributes.add(new Attribute("generation-id", String.valueOf(generationId))); @@ -1663,8 +1724,28 @@ getMonitorInstanceName() + " SH for remote server " + this.getMonitorInstanceName() + " sets replServerInfo " + "<" + infoMsg + ">"); remoteLDAPservers = infoMsg.getConnectedServers(); List<String> newRemoteLDAPservers = infoMsg.getConnectedServers(); generationId = infoMsg.getGenerationId(); synchronized(remoteLDAPservers) { // Removes the existing structures for (LightweightServerHandler lsh : remoteLDAPservers) { lsh.stopHandler(); } remoteLDAPservers.clear(); // Creates the new structure according to the message received. for (String newConnectedServer : newRemoteLDAPservers) { LightweightServerHandler lsh = new LightweightServerHandler(newConnectedServer, this); lsh.startHandler(); remoteLDAPservers.add(lsh); } } } /** @@ -1678,9 +1759,9 @@ */ public boolean isRemoteLDAPServer(short wantedServer) { for (String server : remoteLDAPservers) for (LightweightServerHandler server : remoteLDAPservers) { if (wantedServer == Short.valueOf(server)) if (wantedServer == server.getServerId()) { return true; } @@ -1695,9 +1776,9 @@ * @return boolean True is the replication server has remote LDAP servers * connected to it. */ public List<String> getRemoteLDAPServers() public boolean hasRemoteLDAPServers() { return remoteLDAPservers; return !remoteLDAPservers.isEmpty(); } /** @@ -1802,4 +1883,14 @@ { this.generationId = generationId; } /** * Returns the Replication Server Domain to which belongs this server handler. * * @return The replication server domain. */ public ReplicationServerDomain getDomain() { return this.replicationServerDomain; } } opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import org.opends.messages.Message; @@ -49,6 +49,8 @@ import org.opends.server.replication.protocol.WindowMessage; import org.opends.server.replication.protocol.WindowProbe; import org.opends.server.replication.protocol.ReplServerInfoMessage; import org.opends.server.replication.protocol.MonitorMessage; import org.opends.server.replication.protocol.MonitorRequestMessage; import org.opends.server.loggers.debug.DebugTracer; @@ -246,6 +248,17 @@ } } } else if (msg instanceof MonitorRequestMessage) { MonitorRequestMessage replServerMonitorRequestMsg = (MonitorRequestMessage) msg; handler.process(replServerMonitorRequestMsg); } else if (msg instanceof MonitorMessage) { MonitorMessage replServerMonitorMsg = (MonitorMessage) msg; handler.process(replServerMonitorMsg); } else if (msg == null) { /* opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import org.opends.messages.Message; @@ -126,7 +126,7 @@ "In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + ", writer to " + this.handler.getMonitorInstanceName() + " publishes msg=" + update.toString() + " publishes msg=[" + update.toString() + "]"+ " refgenId=" + referenceGenerationId + " server=" + handler.getServerId() + " generationId=" + handler.getGenerationId()); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication; @@ -917,6 +917,11 @@ TRACER.debugInfo("Failed to add entry " + entry.getDN() + "Result code = : " + addOp.getResultCode()); } else { TRACER.debugInfo(entry.getDN() + " added " + addOp.getResultCode()); } // They will be removed at the end of the test entryList.addLast(entry.getDN()); } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.common; @@ -303,4 +303,29 @@ CN2 = cng.newChangeNumber(); assertTrue(CN1.compareTo(CN2) != 0 ); } /** * Test the difference in seq num between 2 change numbers. */ @Test public void changeNumberDiffSeqNum() throws Exception { ChangeNumber CN1; ChangeNumber CN2; CN1 = new ChangeNumber((long)0, 3, (short)0); // 3-1 = 2 CN2 = new ChangeNumber((long)0, 1, (short)0); assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 2); // 3-3 = 0 CN2 = new ChangeNumber((long)0, 3, (short)0); assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 0); // 3-4 == MAXINT (modulo) CN2 = new ChangeNumber((long)0, 4, (short)0); assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.protocol; @@ -30,7 +30,9 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.util.Iterator; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashSet; @@ -560,7 +562,7 @@ * an exception. */ @Test() public void WindowProbeTest() throws Exception public void windowProbeTest() throws Exception { WindowProbe msg = new WindowProbe(); new WindowProbe(msg.getBytes()); @@ -583,6 +585,74 @@ } /** * Test MonitorMessage */ @Test() public void monitorMessageTest() throws Exception { short sender = 2; short dest = 3; // RS State ServerState rsState = new ServerState(); ChangeNumber rscn1 = new ChangeNumber(1, (short) 1, (short) 1); ChangeNumber rscn2 = new ChangeNumber(1, (short) 1, (short) 2); rsState.update(rscn1); rsState.update(rscn2); // LS1 state ServerState s1 = new ServerState(); short sid1 = 111; ChangeNumber cn1 = new ChangeNumber(1, (short) 1, sid1); s1.update(cn1); // LS2 state ServerState s2 = new ServerState(); short sid2 = 222; Long now = TimeThread.getTime(); ChangeNumber cn2 = new ChangeNumber(now, (short) 123, sid2); s2.update(cn2); MonitorMessage msg = new MonitorMessage(sender, dest); msg.setReplServerState(rsState); msg.setLDAPServerState(sid1, s1, now+1); msg.setLDAPServerState(sid2, s2, now+2); byte[] b = msg.getBytes(); MonitorMessage newMsg = new MonitorMessage(b); assertEquals(rsState, msg.getReplServerState()); assertEquals(newMsg.getReplServerState().toString(), msg.getReplServerState().toString()); Iterator<Short> it = newMsg.iterator(); while (it.hasNext()) { short sid = it.next(); ServerState s = newMsg.getLDAPServerState(sid); if (sid == sid1) { assertEquals(s.toString(), s1.toString(), ""); assertEquals((Long)(now+1), newMsg.getApproxFirstMissingDate(sid), ""); } else if (sid == sid2) { assertEquals(s.toString(), s2.toString()); assertEquals((Long)(now+2), newMsg.getApproxFirstMissingDate(sid), ""); } else { fail("Bad sid"); } } assertEquals(newMsg.getsenderID(), msg.getsenderID()); assertEquals(newMsg.getDestination(), msg.getDestination()); } /** * Test that EntryMessage encoding and decoding works * by checking that : msg == new EntryMessageTest(msg.getBytes()). */ opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
New file @@ -0,0 +1,599 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2008 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.ByteArrayOutputStream; import java.net.ServerSocket; import java.net.SocketException; import java.util.ArrayList; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.plugin.ReplicationBroker; import org.opends.server.replication.plugin.ReplicationDomain; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.ReplicationMessage; import org.opends.server.replication.protocol.SocketSession; import org.opends.server.tools.LDAPSearch; import org.opends.server.types.Attribute; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; /** * Tests for the replicationServer code. */ public class MonitorTest extends ReplicationTestCase { // The tracer object for the debug logger private static final DebugTracer TRACER = getTracer(); private static final String baseDnStr = "dc=example,dc=com"; private static final String baseSnStr = "genidcom"; private static final int WINDOW_SIZE = 10; private static final int CHANGELOG_QUEUE_SIZE = 100; private static final short server1ID = 1; private static final short server2ID = 2; private static final short server3ID = 3; private static final short server4ID = 4; private static final short changelog1ID = 11; private static final short changelog2ID = 12; private static final short changelog3ID = 13; private DN baseDn; private ReplicationBroker broker2 = null; private ReplicationBroker broker3 = null; private ReplicationBroker broker4 = null; private ReplicationServer replServer1 = null; private ReplicationServer replServer2 = null; private ReplicationServer replServer3 = null; private boolean emptyOldChanges = true; ReplicationDomain replDomain = null; SocketSession ssSession = null; boolean ssShutdownRequested = false; protected String[] updatedEntries; private static int[] replServerPort = new int[20]; private void debugInfo(String s) { logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); if (debugEnabled()) { TRACER.debugInfo("** TEST **" + s); } } protected void debugInfo(String message, Exception e) { debugInfo(message + stackTraceToSingleLineString(e)); } /** * Set up the environment for performing the tests in this Class. * * @throws Exception * If the environment could not be set up. */ @BeforeClass public void setUp() throws Exception { //log("Starting generationIdTest setup: debugEnabled:" + debugEnabled()); // This test suite depends on having the schema available. TestCaseUtils.startServer(); baseDn = DN.decode(baseDnStr); updatedEntries = newLDIFEntries(); // Create an internal connection in order to provide operations // to DS to populate the db - connection = InternalClientConnection.getRootConnection(); // Synchro provider String synchroStringDN = "cn=Synchronization Providers,cn=config"; // Synchro multi-master synchroPluginStringDN = "cn=Multimaster Synchronization, " + synchroStringDN; // Synchro suffix synchroServerEntry = null; // Add config entries to the current DS server based on : // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN // Add synchroServerEntry // Add replServerEntry configureReplication(); // Change log String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN; String changeLogLdif = "dn: " + changeLogStringDN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-changelog-server-config\n" + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n" + "ds-cfg-changelog-server-id: 1\n" + "ds-cfg-window-size: " + WINDOW_SIZE + "\n" + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE; replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif); replServerEntry = null; } /* * Creates entries necessary to the test. */ private String[] newLDIFEntries() { String[] entries = { "dn: " + baseDn + "\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: 21111111-1111-1111-1111-111111111111\n" + "\n", "dn: ou=People," + baseDn + "\n" + "objectClass: top\n" + "objectClass: organizationalUnit\n" + "entryUUID: 21111111-1111-1111-1111-111111111112\n" + "\n", "dn: cn=Fiona Jensen,ou=people," + baseDn + "\n" + "objectclass: top\n" + "objectclass: person\n" + "objectclass: organizationalPerson\n" + "objectclass: inetOrgPerson\n" + "cn: Fiona Jensen\n" + "sn: Jensen\n" + "uid: fiona\n" + "telephonenumber: +1 408 555 1212\n" + "entryUUID: 21111111-1111-1111-1111-111111111113\n" + "\n", "dn: cn=Robert Langman,ou=people," + baseDn + "\n" + "objectclass: top\n" + "objectclass: person\n" + "objectclass: organizationalPerson\n" + "objectclass: inetOrgPerson\n" + "cn: Robert Langman\n" + "sn: Langman\n" + "uid: robert\n" + "telephonenumber: +1 408 555 1213\n" + "entryUUID: 21111111-1111-1111-1111-111111111114\n" + "\n" }; return entries; } /** * Creates a new replicationServer. * @param changelogId The serverID of the replicationServer to create. * @param all Specifies whether to coonect the created replication * server to the other replication servers in the test. * @return The new created replication server. */ private ReplicationServer createReplicationServer(short changelogId, boolean all, String suffix) { SortedSet<String> servers = null; servers = new TreeSet<String>(); try { if (changelogId==changelog1ID) { if (replServer1!=null) return replServer1; } else if (changelogId==changelog2ID) { if (replServer2!=null) return replServer2; } else if (changelogId==changelog3ID) { if (replServer3!=null) return replServer3; } if (all) { servers.add("localhost:" + getChangelogPort(changelog1ID)); servers.add("localhost:" + getChangelogPort(changelog2ID)); } int chPort = getChangelogPort(changelogId); String chDir = "genid"+changelogId+suffix+"Db"; ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100, servers); ReplicationServer replicationServer = new ReplicationServer(conf); Thread.sleep(1000); return replicationServer; } catch (Exception e) { fail("createChangelog" + stackTraceToSingleLineString(e)); } return null; } /** * Create a synchronized suffix in the current server providing the * replication Server ID. * @param changelogID */ private void connectServer1ToChangelog(short changelogID) { // Connect DS to the replicationServer try { // suffix synchronized String synchroServerStringDN = synchroPluginStringDN; String synchroServerLdif = "dn: cn=" + baseSnStr + ", cn=domains," + synchroServerStringDN + "\n" + "objectClass: top\n" + "objectClass: ds-cfg-replication-domain\n" + "cn: " + baseSnStr + "\n" + "ds-cfg-base-dn: " + baseDnStr + "\n" + "ds-cfg-replication-server: localhost:" + getChangelogPort(changelogID)+"\n" + "ds-cfg-server-id: " + server1ID + "\n" + "ds-cfg-receive-status: true\n" + "ds-cfg-window-size: " + WINDOW_SIZE; if (synchroServerEntry == null) { synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif); DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), "Unable to add the synchronized server"); configEntryList.add(synchroServerEntry.getDN()); replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn); } if (replDomain != null) { debugInfo("ReplicationDomain: Import/Export is running ? " + replDomain.ieRunning()); } } catch(Exception e) { debugInfo("connectToReplServer", e); fail("connectToReplServer", e); } } /* * Disconnect DS from the replicationServer */ private void disconnectFromReplServer(short changelogID) { try { // suffix synchronized String synchroServerStringDN = "cn=" + baseSnStr + ", cn=domains," + synchroPluginStringDN; if (synchroServerEntry != null) { DN synchroServerDN = DN.decode(synchroServerStringDN); DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN,null); assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN())==null, "Unable to delete the synchronized domain"); synchroServerEntry = null; configEntryList.remove(configEntryList.indexOf(synchroServerDN)); } } catch(Exception e) { fail("disconnectFromReplServer", e); } } private int getChangelogPort(short changelogID) { if (replServerPort[changelogID] == 0) { try { // Find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); replServerPort[changelogID] = socket.getLocalPort(); socket.close(); } catch(Exception e) { fail("Cannot retrieve a free port for replication server." + e.getMessage()); } } return replServerPort[changelogID]; } private String createEntry(UUID uid) { String user2dn = "uid=user"+uid+",ou=People," + baseDnStr; return new String( "dn: "+ user2dn + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + "objectClass: inetOrgPerson\n" + "uid: user.1\n" + "homePhone: 951-245-7634\n" + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" + "mobile: 027-085-0537\n" + "postalAddress: Aaccf Amar$17984 Thirteenth Street" + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" + "cn: Aaccf Amar2\n" + "l: Rockford\n" + "pager: 508-763-4246\n" + "street: 17984 Thirteenth Street\n" + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 2\n" + "sn: Amar2\n" + "givenName: Aaccf2\n" + "postalCode: 85762\n" + "userPassword: password\n" + "initials: AA\n"); } static protected ReplicationMessage createAddMsg(short serverId) { Entry personWithUUIDEntry = null; String user1entryUUID; String baseUUID = null; String user1dn; /* * Create a Change number generator to generate new changenumbers * when we need to send operation messages to the replicationServer. */ ChangeNumberGenerator gen = new ChangeNumberGenerator(serverId, 0); user1entryUUID = "33333333-3333-3333-3333-333333333333"; user1dn = "uid=user1,ou=People," + baseDnStr; String entryWithUUIDldif = "dn: "+ user1dn + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + "objectClass: inetOrgPerson\n" + "uid: user.1\n" + "homePhone: 951-245-7634\n" + "description: This is the description for Aaccf Amar.\n" + "st: NC\n" + "mobile: 027-085-0537\n" + "postalAddress: Aaccf Amar$17984 Thirteenth Street" + "$Rockford, NC 85762\n" + "mail: user.1@example.com\n" + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n" + "street: 17984 Thirteenth Street\n" + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n" + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n" + "userPassword: password\n" + "initials: AA\n" + "entryUUID: " + user1entryUUID + "\n"; try { personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif); } catch(Exception e) { fail(e.getMessage()); } // Create and publish an update message to add an entry. AddMsg addMsg = new AddMsg(gen.newChangeNumber(), personWithUUIDEntry.getDN().toString(), user1entryUUID, baseUUID, personWithUUIDEntry.getObjectClassAttribute(), personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>()); return addMsg; } @Test(enabled=false) public void testMultiRS() throws Exception { String testCase = "testMultiRS"; debugInfo("Starting " + testCase); ReplicationDomain.clearJEBackend(false, "userRoot", baseDn.toNormalizedString()); debugInfo ("Creating 2 RS"); replServer1 = createReplicationServer(changelog1ID, true, testCase); replServer2 = createReplicationServer(changelog2ID, true, testCase); replServer3 = createReplicationServer(changelog3ID, true, testCase); Thread.sleep(500); debugInfo("Connecting DS to replServer1"); connectServer1ToChangelog(changelog1ID); Thread.sleep(1500); try { debugInfo("Connecting broker2 to replServer1"); broker2 = openReplicationSession(baseDn, server2ID, 100, getChangelogPort(changelog1ID), 1000, !emptyOldChanges); Thread.sleep(1000); } catch(SocketException se) { fail("Broker connection is expected to be accepted."); } try { debugInfo("Connecting broker3 to replServer2"); broker3 = openReplicationSession(baseDn, server3ID, 100, getChangelogPort(changelog2ID), 1000, !emptyOldChanges); Thread.sleep(1000); } catch(SocketException se) { fail("Broker connection is expected to be accepted."); } try { debugInfo("Connecting broker4 to replServer2"); broker4 = openReplicationSession(baseDn, server4ID, 100, getChangelogPort(changelog2ID), 1000, !emptyOldChanges); Thread.sleep(1000); } catch(SocketException se) { fail("Broker connection is expected to be accepted."); } // Do a bunch of change updatedEntries = newLDIFEntries(); this.addTestEntriesToDB(updatedEntries); for (int i = 0; i<200; i++) { String ent1[] = { createEntry(UUID.randomUUID()) }; this.addTestEntriesToDB(ent1); } for (int i=0; i<10; i++) { broker3.publish(createAddMsg(server3ID)); } searchMonitor(); debugInfo("Disconnect DS from replServer1 (required in order to DEL entries)."); disconnectFromReplServer(changelog1ID); debugInfo("Cleaning entries"); postTest(); debugInfo("Successfully ending " + testCase); } /** * Disconnect broker and remove entries from the local DB * @throws Exception */ protected void postTest() { debugInfo("Post test cleaning."); // Clean brokers if (broker2 != null) broker2.stop(); broker2 = null; if (broker3 != null) broker3.stop(); broker3 = null; if (broker4 != null) broker4.stop(); broker4 = null; if (replServer1 != null) replServer1.remove(); if (replServer2 != null) replServer2.remove(); if (replServer3 != null) replServer3.remove(); replServer1 = null; replServer2 = null; replServer3 = null; super.cleanRealEntries(); try { ReplicationDomain.clearJEBackend(false, replDomain.getBackend().getBackendID(), baseDn.toNormalizedString()); } catch (Exception e) {} } @Test(enabled=true) public void MonitorTest() throws Exception { testMultiRS(); } private static final ByteArrayOutputStream oStream = new ByteArrayOutputStream(); private static final ByteArrayOutputStream eStream = new ByteArrayOutputStream(); private void searchMonitor() { // test search as directory manager returns content String[] args3 = { "-h", "127.0.0.1", "-p", String.valueOf(TestCaseUtils.getServerLdapPort()), "-D", "cn=Directory Manager", "-w", "password", "-b", "cn=monitor", "-s", "sub", "(base-dn=*)" }; oStream.reset(); eStream.reset(); int retVal = LDAPSearch.mainSearch(args3, false, oStream, eStream); String entries = oStream.toString(); debugInfo("Entries:" + entries); try { assertEquals(retVal, 0, "Returned error: " + eStream); assertTrue(!entries.equalsIgnoreCase(""), "Returned entries: " + entries); } catch(Exception e) { if (debugEnabled()) TRACER.debugInfo( stackTraceToSingleLineString(new Exception())); fail(e.getMessage()); } } }