mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
03.41.2008 2942eaa1b7264228c9ca7535aabd206e663581e9

Issue 2345: global replication monitoring
https://opends.dev.java.net/issues/show_bug.cgi?id=2345

Feature description:

** Before these changes, the monitoring information for the replication contain the information for the replication server itself, and for the LDAP server that are currently connected to it, not for the
LDAP servers connected to other RS.
** With these changes, each RS exposes the delay informations for all RSes in the topology. In addition the delay for each server is consolidated across the whole topology.

see details in
https://www.opends.org/wiki//page/MonitoringReplication

Note:
- This feature is loosely tested by unit tests. We should probably add more tests in the future but I think it is a good idea to commit the feature and basic tests right now.
- The UI part of this feature still need some adaptations to the changes done in the server.

4 files added
12 files modified
2195 ■■■■■ changed files
opends/src/messages/messages/replication.properties 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ChangeNumber.java 36 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/AddMsg.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java 397 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java 122 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java 14 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java 270 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 379 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 235 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 15 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 4 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 7 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java 27 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 74 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java 599 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -251,4 +251,10 @@
  export-ldif command must be run as a task
NOTICE_SSL_SERVER_CON_ATTEMPT_ERROR_105=SSL connection attempt from %s (%s) \
  failed: %s
SEVERE_ERR_MISSING_REMOTE_MONITOR_DATA_106=Monitor data of remote servers \
 are missing due to an error in the retrieval process
SEVERE_ERR_PROCESSING_REMOTE_MONITOR_DATA_107=Monitor data of remote servers \
 are missing due to a processing error : %s
SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \
 sending request to get remote monitor data
opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
@@ -185,6 +185,40 @@
    }
  }
  /**
   * Computes the difference in number of changes between 2
   * change numbers.
   * @param op1 the first ChangeNumber
   * @param op2 the second ChangeNumber
   * @return the difference
   */
  public static int diffSeqNum(ChangeNumber op1, ChangeNumber op2)
  {
    int totalCount = 0;
    int max = op1.getSeqnum();
    if (op2 != null)
    {
      int current = op2.getSeqnum();
      if (current == max)
      {
      }
      else if (current < max)
      {
        totalCount += max - current;
      }
      else
      {
        totalCount += Integer.MAX_VALUE - (current - max) + 1;
      }
    }
    else
    {
      totalCount += max;
    }
    return totalCount;
  }
  /**
   * check if the current Object is strictly older than ChangeNumber
   * given in parameter.
opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -258,7 +258,7 @@
  @Override
  public String toString()
  {
    return ("ADD " + getDn() + " " + getChangeNumber());
    return ("ADD DN=(" + getDn() + ") CN=(" + getChangeNumber() + ")");
  }
  /**
opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
New file
@@ -0,0 +1,397 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import org.opends.server.replication.common.ServerState;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.asn1.ASN1Sequence;
import org.opends.server.protocols.asn1.ASN1Element;
import org.opends.server.replication.common.ChangeNumber;
/**
 * This message is part of the replication protocol.
 * This message is sent by a server to one or several other servers and
 * contain one entry to be sent over the protocol in the context of
 * an import/export over the protocol.
 */
public class MonitorMessage extends RoutableMessage implements
    Serializable
{
  private static final long serialVersionUID = -1900670921496804942L;
  /**
   * FIXME.
   *
   */
  class ServerData
  {
    ServerState state;
    Long approxFirstMissingDate;
  }
  /**
   * FIXME.
   *
   */
  class SubTopoMonitorData
  {
    ServerState replServerState;
    HashMap<Short, ServerData> ldapStates =
      new HashMap<Short, ServerData>();
  }
  SubTopoMonitorData data = new SubTopoMonitorData();;
  /**
   * Creates a new EntryMessage.
   *
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   */
  public MonitorMessage(short sender, short destination)
  {
    super(sender, destination);
  }
  /**
   * FIXME.
   * @param state a.
   */
  public void setReplServerState(ServerState state)
  {
    data.replServerState = state;
  }
  /**
   * FIXME.
   * @param serverId a.
   * @param state a.
   * @param olderUpdateTime a.
   *
   */
  public void setLDAPServerState(short serverId, ServerState state,
      Long olderUpdateTime)
  {
    if (data.ldapStates == null)
    {
      data.ldapStates = new HashMap<Short, ServerData>();
    }
    ServerData sd = new ServerData();
    sd.state = state;
    sd.approxFirstMissingDate = olderUpdateTime;
    data.ldapStates.put(serverId, sd);
  }
  /**
   * FIXME.
   * @param serverId a.
   * @return a.
   */
  public ServerState getLDAPServerState(short serverId)
  {
    return data.ldapStates.get(serverId).state;
  }
  /**
   * FIXME.
   * @param serverId a.
   * @return a.
   */
  public Long getApproxFirstMissingDate(short serverId)
  {
    return data.ldapStates.get(serverId).approxFirstMissingDate;
  }
  /**
   * Creates a new EntryMessage from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public MonitorMessage(byte[] in) throws DataFormatException
  {
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderIDString = new String(in, pos, length, "UTF-8");
      this.senderID = Short.valueOf(senderIDString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Short.valueOf(destinationString);
      pos += length +1;
       /* Read the states : all the remaining bytes but the terminating 0 */
      byte[] encodedS = new byte[in.length-pos-1];
      int i =0;
      while (pos<in.length-1)
      {
        encodedS[i++] = in[pos++];
      }
      try
      {
        ASN1Sequence s0 = ASN1Sequence.decodeAsSequence(encodedS);
        for (ASN1Element el0 : s0.elements())
        {
          ServerState newState = new ServerState();
          short serverId = 0;
          Long outime = (long)0;
          ASN1Sequence s1 = el0.decodeAsSequence();
          for (ASN1Element el1 : s1.elements())
          {
            ASN1OctetString o = el1.decodeAsOctetString();
            String s = o.stringValue();
            ChangeNumber cn = new ChangeNumber(s);
            if ((data.replServerState != null) && (serverId == 0))
            {
              serverId = cn.getServerId();
              outime = cn.getTime();
            }
            else
            {
              newState.update(cn);
            }
          }
          // the first state is the replication state
          if (data.replServerState == null)
          {
            data.replServerState = newState;
          }
          else
          {
            ServerData sd = new ServerData();
            sd.state = newState;
            sd.approxFirstMissingDate = outime;
            data.ldapStates.put(serverId, sd);
          }
        }
      } catch(Exception e)
      {
      }
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try
    {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length +
                   1 + destinationBytes.length;
      ASN1Sequence stateElementSequence = new ASN1Sequence();
      ArrayList<ASN1Element> stateElementList = new ArrayList<ASN1Element>();
      // First loop computes the length
      /* Put the serverStates ... */
      stateElementSequence = new ASN1Sequence();
      stateElementList = new ArrayList<ASN1Element>();
      /* first put the Replication Server state */
      ArrayList<ASN1OctetString> cnOctetList =
        data.replServerState.toASN1ArrayList();
      ArrayList<ASN1Element> cnElementList = new ArrayList<ASN1Element>();
      for (ASN1OctetString soci : cnOctetList)
      {
        cnElementList.add(soci);
      }
      ASN1Sequence cnSequence = new ASN1Sequence(cnElementList);
      stateElementList.add(cnSequence);
      // then the LDAP server data
      Set<Short> servers = data.ldapStates.keySet();
      for (Short sid : servers)
      {
        // State
        ServerState statei = data.ldapStates.get(sid).state;
        // First missing date
        Long outime =  data.ldapStates.get(sid).approxFirstMissingDate;
        // retrieves the change numbers as an arrayList of ANSN1OctetString
        cnOctetList = statei.toASN1ArrayList();
        cnElementList = new ArrayList<ASN1Element>();
        // a fake changenumber helps storing the LDAP server ID
        // and the olderupdatetime
        ChangeNumber cn = new ChangeNumber(outime,0,sid);
        cnElementList.add(new ASN1OctetString(cn.toString()));
        // the changenumbers
        for (ASN1OctetString soci : cnOctetList)
        {
          cnElementList.add(soci);
        }
        cnSequence = new ASN1Sequence(cnElementList);
        stateElementList.add(cnSequence);
      }
      stateElementSequence.setElements(stateElementList);
      int seqLen = stateElementSequence.encode().length;
      //
      length += seqLen;
      length += 2;
      // Allocate the array sized from the computed length
      byte[] resultByteArray = new byte[length];
      // Second loop build the array
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
      int pos = 1;
      pos = addByteArray(senderBytes, resultByteArray, pos);
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      /* Put the serverStates ... */
      stateElementSequence = new ASN1Sequence();
      stateElementList = new ArrayList<ASN1Element>();
      /* first put the Replication Server state */
      cnOctetList =
        data.replServerState.toASN1ArrayList();
      cnElementList = new ArrayList<ASN1Element>();
      for (ASN1OctetString soci : cnOctetList)
      {
        cnElementList.add(soci);
      }
      cnSequence = new ASN1Sequence(cnElementList);
      stateElementList.add(cnSequence);
      // then the LDAP server state
      servers = data.ldapStates.keySet();
      for (Short sid : servers)
      {
        ServerState statei = data.ldapStates.get(sid).state;
        Long outime = data.ldapStates.get(sid).approxFirstMissingDate;
        // retrieves the change numbers as an arrayList of ANSN1OctetString
        cnOctetList = statei.toASN1ArrayList();
        cnElementList = new ArrayList<ASN1Element>();
        // a fake changenumber helps storing the LDAP server ID
        ChangeNumber cn = new ChangeNumber(outime,0,sid);
        cnElementList.add(new ASN1OctetString(cn.toString()));
        // the changenumbers
        for (ASN1OctetString soci : cnOctetList)
        {
          cnElementList.add(soci);
        }
        cnSequence = new ASN1Sequence(cnElementList);
        stateElementList.add(cnSequence);
      }
      stateElementSequence.setElements(stateElementList);
      pos = addByteArray(stateElementSequence.encode(), resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
  /**
   * FIXME.
   * @return FIXME.
   */
  public ServerState getReplServerState()
  {
    return data.replServerState;
  }
  /**
   * FIXME.
   * @return a.
   */
  public Iterator<Short> iterator()
  {
    return data.ldapStates.keySet().iterator();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    String stateS = " RState:";
    stateS += "/" + data.replServerState.toString();
    stateS += " LDAPStates:";
    Iterator<ServerData> it = data.ldapStates.values().iterator();
    while (it.hasNext())
    {
      ServerData sd = it.next();
      stateS += "/ state=" + sd.state.toString()
      + " afmd=" + sd.approxFirstMissingDate + "] ";
    }
    String me = this.getClass().getCanonicalName() +
    " sender=" + this.senderID +
    " destination=" + this.destination +
    " states=" + stateS +
    "]";
    return me;
  }
}
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java
New file
@@ -0,0 +1,122 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is part of the replication protocol.
 * FIXME: usage
 */
public class MonitorRequestMessage extends RoutableMessage implements
    Serializable
{
  private static final long serialVersionUID = -2407640479423633234L;
  /**
   * Creates a message.
   *
   * @param sender The sender server of this message.
   * @param destination The server or servers targetted by this message.
   */
  public MonitorRequestMessage(short sender, short destination)
  {
    super(sender, destination);
  }
  /**
   * Creates a new message by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the message,
   * @throws DataFormatException If the in does not contain a properly,
   *                             encoded message.
   */
  public MonitorRequestMessage(byte[] in) throws DataFormatException
  {
    super();
    try
    {
      // First byte is the type
      if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      this.senderID = Short.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Short.valueOf(destinationString);
      pos += length +1;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try
    {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length + 1
                     + destinationBytes.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR_REQUEST;
      int pos = 1;
      /* put the sender */
      pos = addByteArray(senderBytes, resultByteArray, pos);
      /* put the destination */
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 *      Portions Copyright 2007-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -67,7 +67,7 @@
      /* first byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_REPL_SERVER_INFO)
        throw new DataFormatException(
        "Input is not a valid changelogInfo Message.");
        "Input is not a valid " + this.getClass().getCanonicalName());
      int pos = 1;
@@ -97,7 +97,7 @@
  /**
   * Creates a new changelogInfo message from a list of the currently
   * Creates a new ReplServerInfo message from a list of the currently
   * connected servers.
   *
   * @param connectedServers The list of currently connected servers ID.
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -55,6 +55,8 @@
  static final byte MSG_TYPE_WINDOW_PROBE = 15;
  static final byte MSG_TYPE_REPL_SERVER_INFO = 16;
  static final byte MSG_TYPE_RESET_GENERATION_ID = 17;
  static final byte MSG_TYPE_REPL_SERVER_MONITOR_REQUEST = 18;
  static final byte MSG_TYPE_REPL_SERVER_MONITOR = 19;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
@@ -79,6 +81,8 @@
   * MSG_TYPE_WINDOW_PROBE
   * MSG_TYPE_REPL_SERVER_INFO
   * MSG_TYPE_RESET_GENERATION_ID
   * MSG_TYPE_REPL_SERVER_MONITOR_REQUEST
   * MSG_TYPE_REPL_SERVER_MONITOR
   *
   * @return the byte[] representation of this message.
   * @throws UnsupportedEncodingException  When the encoding of the message
@@ -152,6 +156,12 @@
      case MSG_TYPE_REPL_SERVER_INFO:
        msg = new ReplServerInfoMessage(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_MONITOR_REQUEST:
        msg = new MonitorRequestMessage(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_MONITOR:
        msg = new MonitorMessage(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
@@ -192,7 +202,7 @@
    while (in[offset++] != 0)
    {
      if (offset >= in.length)
        throw new DataFormatException("byte[] is not a valid modify msg");
        throw new DataFormatException("byte[] is not a valid msg");
      length++;
    }
    return length;
opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
New file
@@ -0,0 +1,270 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashSet;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
/**
 * This class defines a server handler dedicated to the remote LDAP servers
 * connected to a remote Replication Server.
 * This class is necessary because we want to provide monitor entries for them
 * and because the monitor API only allows one entry by MonitorProvider instance
 * so that no other class can provide the monitor entry for these objects.
 *
 * One instance of this class is created for each instance of remote LDAP server
 * connected to a remote Replication Server.
 */
public class LightweightServerHandler
  extends MonitorProvider<MonitorProviderCfg>
{
  // The tracer object for the debug logger.
  private static final DebugTracer TRACER = getTracer();
  short serverId;
  ServerHandler replServerHandler;
  ReplicationServerDomain rsDomain;
  DN baseDn;
  /**
   * Creates a new LighweightServerHandler with the provided serverid, connected
   * to the remote Replication Server represented by replServerHandler.
   *
   * @param serverId The serverId of this remote LDAP server.
   * @param replServerHandler The server handler of the Replication Server to
   * which this LDAP server is remotely connected.
   */
  public LightweightServerHandler(String serverId,
      ServerHandler replServerHandler)
  {
    super("Server Handler");
    this.serverId = Short.valueOf(serverId);
    this.replServerHandler = replServerHandler;
    this.rsDomain = replServerHandler.getDomain();
    this.baseDn = rsDomain.getBaseDn();
    if (debugEnabled())
      TRACER.debugInfo(
        "In " +
  replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName()+
        " LWSH for remote server " + this.serverId +
        " connected to:" + this.replServerHandler.getMonitorInstanceName() +
        " ()");
}
  /**
   * Get the serverID associated with this LDAP server.
   * @return The serverId.
   */
  public short getServerId()
  {
    return Short.valueOf(serverId);
  }
  /**
   * Stop this server handler processing.
   */
  public void startHandler()
  {
    if (debugEnabled())
      TRACER.debugInfo(
      "In " +
replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
      " LWSH for remote server " + this.serverId +
      " connected to:" + this.replServerHandler.getMonitorInstanceName() +
          " start");
    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
    DirectoryServer.registerMonitorProvider(this);
  }
  /**
   * Stop this server handler processing.
   */
  public void stopHandler()
  {
    if (debugEnabled())
      TRACER.debugInfo(
      "In " +
replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
      " LWSH for remote server " + this.serverId +
      " connected to:" + this.replServerHandler.getMonitorInstanceName() +
          " stop");
    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
                          throws ConfigException,InitializationException
  {
    // Nothing to do for now
  }
  /**
   * Retrieves the name of this monitor provider.  It should be unique among all
   * monitor providers, including all instances of the same monitor provider.
   *
   * @return  The name of this monitor provider.
   */
  @Override
  public String getMonitorInstanceName()
  {
    String serverURL=""; // FIXME
    String str = baseDn.toString() + " " + serverURL + " "
       + String.valueOf(serverId);
    return "Undirect LDAP Server " + str;
  }
  /**
   * Retrieves the length of time in milliseconds that should elapse between
   * calls to the <CODE>updateMonitorData()</CODE> method.  A negative or zero
   * return value indicates that the <CODE>updateMonitorData()</CODE> method
   * should not be periodically invoked.
   *
   * @return  The length of time in milliseconds that should elapse between
   *          calls to the <CODE>updateMonitorData()</CODE> method.
   */
  @Override
  public long getUpdateInterval()
  {
    /* we don't wont to do polling on this monitor */
    return 0;
  }
  /**
   * Performs any processing periodic processing that may be desired to update
   * the information associated with this monitor.  Note that best-effort
   * attempts will be made to ensure that calls to this method come
   * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
   * be made.
   */
  @Override
  public void updateMonitorData()
  {
    // As long as getUpdateInterval() returns 0, this will never get called
  }
  /**
   * Retrieves a set of attributes containing monitor data that should be
   * returned to the client if the corresponding monitor entry is requested.
   *
   * @return  A set of attributes containing monitor data that should be
   *          returned to the client if the corresponding monitor entry is
   *          requested.
   */
  @Override
  public ArrayList<Attribute> getMonitorData()
  {
    if (debugEnabled())
      TRACER.debugInfo(
          "In " +
          this.replServerHandler.getDomain().getReplicationServer().
          getMonitorInstanceName()+
          " LWSH for remote server " + this.serverId +
          " connected to:" + this.replServerHandler.getMonitorInstanceName() +
      " getMonitor data");
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    attributes.add(new Attribute("server-id",
        String.valueOf(serverId)));
    attributes.add(new Attribute("base-dn",
        rsDomain.getBaseDn().toNormalizedString()));
    attributes.add(new Attribute("connected-to",
        replServerHandler.getMonitorInstanceName()));
    // Retrieves the topology counters
    try
    {
      rsDomain.retrievesRemoteMonitorData();
      // Compute the latency for the current SH
      ServerState remoteState = rsDomain.getServerState(serverId);
      if (remoteState == null)
      {
        remoteState = new ServerState();
      }
      /* get the Server State */
      final String ATTR_SERVER_STATE = "server-state";
      AttributeType type =
        DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
      LinkedHashSet<AttributeValue> values =
        new LinkedHashSet<AttributeValue>();
      for (String str : remoteState.toStringSet())
      {
        values.add(new AttributeValue(type,str));
      }
      Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
      attributes.add(attr);
      // add the latency attribute to our monitor data
      // Compute the latency for the current SH
      int missingChanges = rsDomain.getMissingChanges(remoteState);
      attributes.add(new Attribute("missing-changes",
          String.valueOf(missingChanges)));
      // Add the oldest missing update
      Long olderUpdateTime = rsDomain.getApproxFirstMissingDate(serverId);
      if (olderUpdateTime != null)
      {
        Date date = new Date(olderUpdateTime);
        attributes.add(new Attribute("approx-older-change-not-synchronized",
          date.toString()));
      }
    }
    catch(Exception e)
    {
      // We failed retrieving the remote monitor data.
      attributes.add(new Attribute("error",
        stackTraceToSingleLineString(e)));
    }
    return attributes;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,25 +22,31 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
@@ -49,9 +55,13 @@
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.replication.protocol.MonitorMessage;
import org.opends.server.replication.protocol.MonitorRequestMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
/**
@@ -118,6 +128,34 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /* Monitor data management */
  // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
  private long remoteMonitorDataLifeTime = 500;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
   */
  Semaphore remoteMonitorResponsesSemaphore;
  /* The date of the last time they have been elaborated */
  private long validityDate = 0;
  // For each LDAP server, its server state
  private HashMap<Short, ServerState> LDAPStates =
    new HashMap<Short, ServerState>();
  // For each LDAP server, the last CN it published
  private HashMap<Short, ChangeNumber> maxCNs =
    new HashMap<Short, ChangeNumber>();
  // For each LDAP server, an approximation of the date of the first missing
  // change
  private HashMap<Short, Long> approxFirstMissingDate =
    new HashMap<Short, Long>();
  /**
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
   *
@@ -352,7 +390,7 @@
        }
        else
        {
          if (!rsh.getRemoteLDAPServers().isEmpty())
          if (rsh.hasRemoteLDAPServers())
          {
            lDAPServersConnectedInTheTopology = true;
@@ -636,7 +674,7 @@
        // server connected
        for (ServerHandler rsh : replicationServers.values())
        {
          if (!rsh.getRemoteLDAPServers().isEmpty())
          if (rsh.hasRemoteLDAPServers())
          {
            servers.add(rsh);
          }
@@ -693,15 +731,58 @@
   */
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  {
    // A replication server is not expected to be the destination
    // of a routable message except for an error message.
    // Test the message for which a ReplicationServer is expected
    // to be the destination
    if (msg.getDestination() == this.replicationServer.getServerId())
    {
      if (msg instanceof ErrorMessage)
      {
        ErrorMessage errorMsg = (ErrorMessage)msg;
        logError(ERR_ERROR_MSG_RECEIVED.get(
                   errorMsg.getDetails()));
            errorMsg.getDetails()));
      }
      else if (msg instanceof MonitorRequestMessage)
      {
        MonitorRequestMessage replServerMonitorRequestMsg =
          (MonitorRequestMessage) msg;
        MonitorMessage monitorMsg =
          new MonitorMessage(
              replServerMonitorRequestMsg.getDestination(),
              replServerMonitorRequestMsg.getsenderID());
        // Populate the RS state in the msg from the DbState
        monitorMsg.setReplServerState(this.getDbServerState());
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
        // - the server state
        // - the older missing change
        for (ServerHandler lsh : this.connectedServers.values())
        {
          monitorMsg.setLDAPServerState(
              lsh.getServerId(),
              lsh.getServerState(),
              lsh.getApproxFirstMissingDate());
        }
        try
        {
          senderHandler.send(monitorMsg);
        }
        catch(Exception e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
              Short.toString((msg.getDestination()))));
        }
      }
      else if (msg instanceof MonitorMessage)
      {
        MonitorMessage monitorMsg =
          (MonitorMessage) msg;
        receivesMonitorDataResponse(monitorMsg);
      }
      else
      {
@@ -1156,4 +1237,288 @@
    {
      return replicationServer;
    }
    /*
     * Monitor Data generation
     */
    /**
     * Retrieves the remote monitor data.
     *
     * @throws DirectoryException When an error occurs.
     */
    protected void retrievesRemoteMonitorData()
      throws DirectoryException
    {
      if (validityDate > TimeThread.getTime())
      {
        // The current data are still valid. No need to renew them.
        return;
      }
      // Clean
      this.LDAPStates.clear();
      this.maxCNs.clear();
      // Init the maxCNs of our direct LDAP servers from our own dbstate
      for (ServerHandler rs : connectedServers.values())
      {
        short serverID = rs.getServerId();
        ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
        if (cn == null)
        {
          // we have nothing in db for that server
          cn = new ChangeNumber(0, 0 , serverID);
        }
        this.maxCNs.put(serverID, cn);
      }
      ServerState replServerState = this.getDbServerState();
      Iterator<Short> it = replServerState.iterator();
      while (it.hasNext())
      {
        short sid = it.next();
        ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
        ChangeNumber maxCN = this.maxCNs.get(sid);
        if ((maxCN != null) && (receivedCN.newer(maxCN)))
        {
          // We found a newer one
          this.maxCNs.remove(sid);
          this.maxCNs.put(sid, receivedCN);
        }
      }
      // Send Request to the other Replication Servers
      if (remoteMonitorResponsesSemaphore == null)
      {
        remoteMonitorResponsesSemaphore = new Semaphore(
            replicationServers.size() -1);
        sendMonitorDataRequest();
        // Wait reponses from them or timeout
        waitMonitorDataResponses(replicationServers.size());
      }
      else
      {
        // The processing of renewing the monitor cache is already running
        // We'll make it sleeping until the end
        while (remoteMonitorResponsesSemaphore!=null)
        {
          waitMonitorDataResponses(1);
        }
      }
      // Now we have the expected answers of an error occured
      validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
      if (debugEnabled())
      {
        debugMonitorData();
      }
    }
    private void debugMonitorData()
    {
      String mds = " Monitor data=";
      Iterator<Short> ite = LDAPStates.keySet().iterator();
      while (ite.hasNext())
      {
        Short sid = ite.next();
        ServerState ss = LDAPStates.get(sid);
        mds += " LDAPState(" + sid + ")=" + ss.toString();
      }
      Iterator<Short> itc = maxCNs.keySet().iterator();
      while (itc.hasNext())
      {
        Short sid = itc.next();
        ChangeNumber cn = maxCNs.get(sid);
        mds += " maxCNs(" + sid + ")=" + cn.toString();
      }
      mds += "--";
      TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          mds);
    }
    /**
     * Sends a MonitorRequest message to all connected RS.
     * @throws DirectoryException when a problem occurs.
     */
    protected void sendMonitorDataRequest()
      throws DirectoryException
    {
      try
      {
        for (ServerHandler rs : replicationServers.values())
        {
          MonitorRequestMessage msg = new
            MonitorRequestMessage(this.replicationServer.getServerId(),
              rs.getServerId());
          rs.send(msg);
        }
      }
      catch(Exception e)
      {
        Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
        logError(message);
        throw new DirectoryException(ResultCode.OTHER,
            message, e);
      }
    }
    /**
     * Wait for the expected count of received MonitorMessage.
     * @param expectedResponses The number of expected answers.
     * @throws DirectoryException When an error occurs.
     */
    protected void waitMonitorDataResponses(int expectedResponses)
      throws DirectoryException
    {
      try
      {
        boolean allPermitsAcquired =
          remoteMonitorResponsesSemaphore.tryAcquire(
              expectedResponses,
              (long) 500, TimeUnit.MILLISECONDS);
        if (!allPermitsAcquired)
        {
          logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
        }
        else
        {
          if (debugEnabled())
            TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            "Successfully received all " + replicationServers.size()
            + " expected monitor messages");
        }
      }
      catch(Exception e)
      {
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
      }
      finally
      {
        remoteMonitorResponsesSemaphore = null;
      }
    }
    /**
     * Processes a Monitor message receives from a remote Replication Server
     * and stores the data received.
     *
     * @param msg The message to be processed.
     */
    public void receivesMonitorDataResponse(MonitorMessage msg)
    {
      if (remoteMonitorResponsesSemaphore == null)
      {
        // Ignoring the remote monitor data because an error occured previously
        return;
      }
      try
      {
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN accross the RSes
        ServerState replServerState = msg.getReplServerState();
        Iterator<Short> it = replServerState.iterator();
        while (it.hasNext())
        {
          short sid = it.next();
          ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
          ChangeNumber maxCN = this.maxCNs.get(sid);
          if (receivedCN.newer(maxCN))
          {
            // We found a newer one
            this.maxCNs.remove(sid);
            this.maxCNs.put(sid, receivedCN);
          }
        }
        // Store the LDAP servers states
        Iterator<Short> sidIterator = msg.iterator();
        while (sidIterator.hasNext())
        {
          short sid = sidIterator.next();
          ServerState ss = msg.getLDAPServerState(sid);
          this.LDAPStates.put(sid, ss);
          this.approxFirstMissingDate.put(sid,
              msg.getApproxFirstMissingDate(sid));
        }
        // Decreases the number of expected responses and potentially
        // wakes up the waiting requestor thread.
        remoteMonitorResponsesSemaphore.release();
      }
      catch (Exception e)
      {
        // If an exception occurs while processing one of the expected message,
        // the processing is aborted and the waiting thread is awoke.
        remoteMonitorResponsesSemaphore.notifyAll();
      }
    }
    /**
     * Get the state of the LDAP server with the provided serverId.
     * @param serverId The server ID.
     * @return The server state.
     */
    public ServerState getServerState(short serverId)
    {
      return LDAPStates.get(serverId);
    }
    /**
     * Get the highest know change number of the LDAP server with the provided
     * serverId.
     * @param serverId The server ID.
     * @return The highest change number.
     */
    public ChangeNumber getMaxCN(short serverId)
    {
      return maxCNs.get(serverId);
    }
    /**
     * Get an approximation of the date of the oldest missing changes.
     * serverId.
     * @param serverId The server ID.
     * @return The approximation of the date of the oldest missing change.
     */
    public Long getApproxFirstMissingDate(short serverId)
    {
      return approxFirstMissingDate.get(serverId);
    }
    /**
     * Get the number of missing change for the server with the provided state.
     * @param state The provided server state.
     * @return The number of missing changes.
     */
    public int getMissingChanges(ServerState state)
    {
      // Traverse the max Cn transmitted by each server
      // For each server, get the highest CN know from the current server
      // Sum the difference betwenn the max and the last
      int missingChanges = 0;
      Iterator<Short> itc = maxCNs.keySet().iterator();
      while (itc.hasNext())
      {
        Short sid = itc.next();
        ChangeNumber maxCN = maxCNs.get(sid);
        ChangeNumber last = state.getMaxChangeNumber(sid);
        if (last == null)
        {
          last = new ChangeNumber(0,0, sid);
        }
        int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
        missingChanges += missingChangesFromSID;
      }
      return missingChanges;
    }
}
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -22,33 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -62,9 +36,9 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -150,11 +124,12 @@
  /**
   * When this Handler is connected to a changelog server this collection
   * will contain the list of LDAP servers connected to the remote changelog
   * server.
   * When this Handler is connected to a remote replication server
   * this collection will contain as many elements as there are
   * LDAP servers connected to the remote replication server.
   */
  private List<String> remoteLDAPservers = new ArrayList<String>();
  private List<LightweightServerHandler>
     remoteLDAPservers = new ArrayList<LightweightServerHandler>();
  /**
   * The time in milliseconds between heartbeats from the replication
@@ -830,27 +805,8 @@
       ServerState dbState = replicationServerDomain.getDbServerState();
       for (short id : dbState)
       {
         int max = dbState.getMaxChangeNumber(id).getSeqnum();
         ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
         if (currentChange != null)
         {
           int current = currentChange.getSeqnum();
           if (current == max)
           {
           }
           else if (current < max)
           {
             totalCount += max - current;
           }
           else
           {
             totalCount += Integer.MAX_VALUE - (current - max) + 1;
           }
         }
         else
         {
           totalCount += max;
         }
         totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
             serverState.getMaxChangeNumber(id));
       }
       return totalCount;
     }
@@ -858,7 +814,7 @@
  }
  /**
   * Get an approximation of the delay by looking at the age of the odest
   * Get an approximation of the delay by looking at the age of the oldest
   * message that has not been sent to this server.
   * This is an approximation because the age is calculated using the
   * clock of the servee where the replicationServer is currently running
@@ -886,25 +842,65 @@
   * @return The age if the older change has not yet been replicated
   *         to the server handled by this ServerHandler.
   */
  public Long getApproxFirstMissingDate()
  {
    // Get the older CN received
    // From it, get the next sequence number
    // Get the CN for the next sequence number
    // If not present in the local RS db,
    // then approximate with the older update time
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return null;
    ReplicationIterator ri =
      replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN);
    if (ri != null)
    {
      if (ri.next())
      {
        ChangeNumber firstMissingChange = ri.getChange().getChangeNumber();
        return firstMissingChange.getTime();
      }
    }
    return olderUpdateCN.getTime();
  }
  /**
   * Get the older update time for that server.
   * @return The older update time.
   */
  public long getOlderUpdateTime()
  {
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return 0;
    return  olderUpdateCN.getTime();
  }
  /**
   * Get the older Change Number for that server.
   * @return The older change number.
   */
  public ChangeNumber getOlderUpdateCN()
  {
    synchronized (msgQueue)
    {
      if (isFollowing())
      {
        if (msgQueue.isEmpty())
          return 0;
          return null;
        UpdateMessage msg = msgQueue.first();
        return msg.getChangeNumber().getTime();
        return msg.getChangeNumber();
      }
      else
      {
        if (lateQueue.isEmpty())
          return 0;
          return null;
        UpdateMessage msg = lateQueue.first();
        return msg.getChangeNumber().getTime();
        return msg.getChangeNumber();
      }
    }
  }
@@ -1190,6 +1186,16 @@
  }
  /**
   * Get the state of this server.
   *
   * @return ServerState the state for this server..
   */
  public ServerState getServerState()
  {
    return serverState;
  }
  /**
   * Stop this server handler processing.
   */
  public void stopHandler()
@@ -1397,7 +1403,7 @@
                 " " + serverURL + " " + String.valueOf(serverId);
    if (serverIsLDAPserver)
      return "Remote LDAP Server " + str;
      return "Direct LDAP Server " + str;
    else
      return "Remote Repl Server " + str;
  }
@@ -1445,28 +1451,68 @@
  {
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    if (serverIsLDAPserver)
    {
      attributes.add(new Attribute("LDAP-Server", serverURL));
      attributes.add(new Attribute("connected-to", this.replicationServerDomain.
          getReplicationServer().getMonitorInstanceName()));
      // Add the oldest missing update
      Long olderUpdateTime = this.getApproxFirstMissingDate();
      if (olderUpdateTime != null)
      {
        Date date = new Date(olderUpdateTime);
        attributes.add(new Attribute("approx-older-change-not-synchronized",
          date.toString()));
      }
    }
    else
    {
      attributes.add(new Attribute("ReplicationServer-Server", serverURL));
    }
    attributes.add(new Attribute("server-id",
                                 String.valueOf(serverId)));
    attributes.add(new Attribute("base-dn",
                                 baseDn.toString()));
    attributes.add(new Attribute("waiting-changes",
                                 String.valueOf(getRcvMsgQueueSize())));
    attributes.add(new Attribute("max-waiting-changes",
                                 String.valueOf(maxQueueSize)));
    attributes.add(new Attribute("update-waiting-acks",
                                 String.valueOf(getWaitingAckSize())));
    // Update stats
    // Retrieves the topology counters
    if (serverIsLDAPserver)
    {
      try
      {
        replicationServerDomain.retrievesRemoteMonitorData();
      }
      catch(Exception e)
      {
        // FIXME: We failed retrieving the remote monitor data
      }
      // Compute the latency for the current SH
      int missingChanges =
        replicationServerDomain.getMissingChanges(serverState);
      // add the latency attribute to our monitor data
      attributes.add(new Attribute("missing-changes",
          String.valueOf(missingChanges)));
    }
    // Deprecated
    // attributes.add(new Attribute("max-waiting-changes",
    //                              String.valueOf(maxQueueSize)));
    attributes.add(new Attribute("update-sent",
                                 String.valueOf(getOutCount())));
    attributes.add(new Attribute("update-received",
                                 String.valueOf(getInCount())));
    // Deprecated as long as assured is not exposed
    attributes.add(new Attribute("update-waiting-acks",
        String.valueOf(getWaitingAckSize())));
    attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
    attributes.add(new Attribute("ack-received",
                                 String.valueOf(getInAckCount())));
    attributes.add(new Attribute("approximate-delay",
                                 String.valueOf(getApproxDelay())));
    // Window stats
    attributes.add(new Attribute("max-send-window",
                                 String.valueOf(sendWindowSize)));
    attributes.add(new Attribute("current-send-window",
@@ -1475,6 +1521,18 @@
                                 String.valueOf(maxRcvWindow)));
    attributes.add(new Attribute("current-rcv-window",
                                 String.valueOf(rcvWindow)));
    /*
     * FIXME:PGB DEPRECATED
     *
    // Missing changes
    attributes.add(new Attribute("waiting-changes",
        String.valueOf(getRcvMsgQueueSize())));
    // Age of oldest missing change
    attributes.add(new Attribute("approximate-delay",
                                 String.valueOf(getApproxDelay())));
    // Date of the oldest missing change
    long olderUpdateTime = getOlderUpdateTime();
    if (olderUpdateTime != 0)
    {
@@ -1482,6 +1540,7 @@
      attributes.add(new Attribute("older-change-not-synchronized",
                                 String.valueOf(date.toString())));
    }
    */
    /* get the Server State */
    final String ATTR_SERVER_STATE = "server-state";
@@ -1495,9 +1554,11 @@
    Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
    attributes.add(attr);
    // Encryption
    attributes.add(new Attribute("ssl-encryption",
        String.valueOf(session.isEncrypted())));
    // Data generation
    attributes.add(new Attribute("generation-id",
        String.valueOf(generationId)));
@@ -1663,8 +1724,28 @@
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sets replServerInfo " + "<" + infoMsg + ">");
     remoteLDAPservers = infoMsg.getConnectedServers();
     List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
     generationId = infoMsg.getGenerationId();
     synchronized(remoteLDAPservers)
     {
       // Removes the existing structures
       for (LightweightServerHandler lsh : remoteLDAPservers)
       {
         lsh.stopHandler();
       }
       remoteLDAPservers.clear();
       // Creates the new structure according to the message received.
       for (String newConnectedServer : newRemoteLDAPservers)
       {
         LightweightServerHandler lsh
         = new LightweightServerHandler(newConnectedServer, this);
         lsh.startHandler();
         remoteLDAPservers.add(lsh);
       }
     }
   }
   /**
@@ -1678,9 +1759,9 @@
    */
   public boolean isRemoteLDAPServer(short wantedServer)
   {
     for (String server : remoteLDAPservers)
     for (LightweightServerHandler server : remoteLDAPservers)
     {
       if (wantedServer == Short.valueOf(server))
       if (wantedServer == server.getServerId())
       {
         return true;
       }
@@ -1695,9 +1776,9 @@
    * @return boolean True is the replication server has remote LDAP servers
    * connected to it.
    */
   public List<String> getRemoteLDAPServers()
   public boolean hasRemoteLDAPServers()
   {
     return remoteLDAPservers;
     return !remoteLDAPservers.isEmpty();
   }
  /**
@@ -1802,4 +1883,14 @@
  {
    this.generationId = generationId;
  }
  /**
   * Returns the Replication Server Domain to which belongs this server handler.
   *
   * @return The replication server domain.
   */
  public ReplicationServerDomain getDomain()
  {
    return this.replicationServerDomain;
  }
}
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
@@ -49,6 +49,8 @@
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.replication.protocol.MonitorMessage;
import org.opends.server.replication.protocol.MonitorRequestMessage;
import org.opends.server.loggers.debug.DebugTracer;
@@ -246,6 +248,17 @@
            }
          }
        }
        else if (msg instanceof MonitorRequestMessage)
        {
          MonitorRequestMessage replServerMonitorRequestMsg =
            (MonitorRequestMessage) msg;
          handler.process(replServerMonitorRequestMsg);
        }
        else if (msg instanceof MonitorMessage)
        {
          MonitorMessage replServerMonitorMsg = (MonitorMessage) msg;
          handler.process(replServerMonitorMsg);
        }
        else if (msg == null)
        {
          /*
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
@@ -126,7 +126,7 @@
            "In " + replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
            ", writer to " + this.handler.getMonitorInstanceName() +
            " publishes msg=" + update.toString() +
            " publishes msg=[" + update.toString() + "]"+
            " refgenId=" + referenceGenerationId +
            " server=" + handler.getServerId() +
            " generationId=" + handler.getGenerationId());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication;
@@ -917,6 +917,11 @@
          TRACER.debugInfo("Failed to add entry " + entry.getDN() +
              "Result code = : " + addOp.getResultCode());
        }
        else
        {
          TRACER.debugInfo(entry.getDN() +
              " added " + addOp.getResultCode());
        }
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
@@ -303,4 +303,29 @@
    CN2 = cng.newChangeNumber();
    assertTrue(CN1.compareTo(CN2) != 0 );
  }
  /**
   * Test the difference in seq num between 2 change numbers.
   */
  @Test
  public void changeNumberDiffSeqNum()
         throws Exception
  {
    ChangeNumber CN1;
    ChangeNumber CN2;
    CN1 = new ChangeNumber((long)0, 3, (short)0);
    // 3-1 = 2
    CN2 = new ChangeNumber((long)0, 1, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 2);
    // 3-3 = 0
    CN2 = new ChangeNumber((long)0, 3, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 0);
    // 3-4 == MAXINT (modulo)
    CN2 = new ChangeNumber((long)0, 4, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -30,7 +30,9 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
@@ -560,7 +562,7 @@
   * an exception.
   */
  @Test()
  public void WindowProbeTest() throws Exception
  public void windowProbeTest() throws Exception
  {
    WindowProbe msg = new WindowProbe();
    new WindowProbe(msg.getBytes());
@@ -583,6 +585,74 @@
  }
  /**
   * Test MonitorMessage
   */
  @Test()
  public void monitorMessageTest() throws Exception
  {
    short sender = 2;
    short dest = 3;
    // RS State
    ServerState rsState = new ServerState();
    ChangeNumber rscn1 = new ChangeNumber(1, (short) 1, (short) 1);
    ChangeNumber rscn2 = new ChangeNumber(1, (short) 1, (short) 2);
    rsState.update(rscn1);
    rsState.update(rscn2);
    // LS1 state
    ServerState s1 = new ServerState();
    short sid1 = 111;
    ChangeNumber cn1 = new ChangeNumber(1, (short) 1, sid1);
    s1.update(cn1);
    // LS2 state
    ServerState s2 = new ServerState();
    short sid2 = 222;
    Long now = TimeThread.getTime();
    ChangeNumber cn2 = new ChangeNumber(now,
                                       (short) 123, sid2);
    s2.update(cn2);
    MonitorMessage msg =
      new MonitorMessage(sender, dest);
    msg.setReplServerState(rsState);
    msg.setLDAPServerState(sid1, s1, now+1);
    msg.setLDAPServerState(sid2, s2, now+2);
    byte[] b = msg.getBytes();
    MonitorMessage newMsg = new MonitorMessage(b);
    assertEquals(rsState, msg.getReplServerState());
    assertEquals(newMsg.getReplServerState().toString(),
        msg.getReplServerState().toString());
    Iterator<Short> it = newMsg.iterator();
    while (it.hasNext())
    {
      short sid = it.next();
      ServerState s = newMsg.getLDAPServerState(sid);
      if (sid == sid1)
      {
        assertEquals(s.toString(), s1.toString(), "");
        assertEquals((Long)(now+1), newMsg.getApproxFirstMissingDate(sid), "");
      }
      else if (sid == sid2)
      {
        assertEquals(s.toString(), s2.toString());
        assertEquals((Long)(now+2), newMsg.getApproxFirstMissingDate(sid), "");
      }
      else
      {
        fail("Bad sid");
      }
    }
    assertEquals(newMsg.getsenderID(), msg.getsenderID());
    assertEquals(newMsg.getDestination(), msg.getDestination());
  }
  /**
   * Test that EntryMessage encoding and decoding works
   * by checking that : msg == new EntryMessageTest(msg.getBytes()).
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
New file
@@ -0,0 +1,599 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.net.ServerSocket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.tools.LDAPSearch;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
 * Tests for the replicationServer code.
 */
public class MonitorTest extends ReplicationTestCase
{
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  private static final String baseDnStr = "dc=example,dc=com";
  private static final String baseSnStr = "genidcom";
  private static final int   WINDOW_SIZE = 10;
  private static final int   CHANGELOG_QUEUE_SIZE = 100;
  private static final short server1ID = 1;
  private static final short server2ID = 2;
  private static final short server3ID = 3;
  private static final short server4ID = 4;
  private static final short changelog1ID = 11;
  private static final short changelog2ID = 12;
  private static final short changelog3ID = 13;
  private DN baseDn;
  private ReplicationBroker broker2 = null;
  private ReplicationBroker broker3 = null;
  private ReplicationBroker broker4 = null;
  private ReplicationServer replServer1 = null;
  private ReplicationServer replServer2 = null;
  private ReplicationServer replServer3 = null;
  private boolean emptyOldChanges = true;
  ReplicationDomain replDomain = null;
  SocketSession ssSession = null;
  boolean ssShutdownRequested = false;
  protected String[] updatedEntries;
  private static int[] replServerPort = new int[20];
  private void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
    if (debugEnabled())
    {
      TRACER.debugInfo("** TEST **" + s);
    }
  }
  protected void debugInfo(String message, Exception e)
  {
    debugInfo(message + stackTraceToSingleLineString(e));
  }
  /**
   * Set up the environment for performing the tests in this Class.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  public void setUp() throws Exception
  {
    //log("Starting generationIdTest setup: debugEnabled:" + debugEnabled());
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    baseDn = DN.decode(baseDnStr);
    updatedEntries = newLDIFEntries();
    // Create an internal connection in order to provide operations
    // to DS to populate the db -
    connection = InternalClientConnection.getRootConnection();
    // Synchro provider
    String synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Synchro multi-master
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
      + synchroStringDN;
    // Synchro suffix
    synchroServerEntry = null;
    // Add config entries to the current DS server based on :
    // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN
    // Add synchroServerEntry
    // Add replServerEntry
    configureReplication();
    // Change log
    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
    + "objectClass: top\n"
    + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
    + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
    + "ds-cfg-changelog-server-id: 1\n"
    + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
    + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
    replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    replServerEntry = null;
  }
  /*
   * Creates entries necessary to the test.
   */
  private String[] newLDIFEntries()
  {
    String[] entries =
    {
        "dn: " + baseDn + "\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
        + "\n",
        "dn: ou=People," + baseDn + "\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
        + "\n",
        "dn: cn=Fiona Jensen,ou=people," + baseDn + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Fiona Jensen\n"
        + "sn: Jensen\n"
        + "uid: fiona\n"
        + "telephonenumber: +1 408 555 1212\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
        + "\n",
        "dn: cn=Robert Langman,ou=people," + baseDn + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Robert Langman\n"
        + "sn: Langman\n"
        + "uid: robert\n"
        + "telephonenumber: +1 408 555 1213\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
        + "\n"
    };
    return entries;
  }
  /**
   * Creates a new replicationServer.
   * @param changelogId The serverID of the replicationServer to create.
   * @param all         Specifies whether to coonect the created replication
   *                    server to the other replication servers in the test.
   * @return The new created replication server.
   */
  private ReplicationServer createReplicationServer(short changelogId,
      boolean all, String suffix)
  {
    SortedSet<String> servers = null;
    servers = new TreeSet<String>();
    try
    {
      if (changelogId==changelog1ID)
      {
        if (replServer1!=null)
          return replServer1;
      }
      else if (changelogId==changelog2ID)
      {
        if (replServer2!=null)
          return replServer2;
      }
      else if (changelogId==changelog3ID)
      {
        if (replServer3!=null)
          return replServer3;
      }
      if (all)
      {
        servers.add("localhost:" + getChangelogPort(changelog1ID));
        servers.add("localhost:" + getChangelogPort(changelog2ID));
      }
      int chPort = getChangelogPort(changelogId);
      String chDir = "genid"+changelogId+suffix+"Db";
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100,
            servers);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      Thread.sleep(1000);
      return replicationServer;
    }
    catch (Exception e)
    {
      fail("createChangelog" + stackTraceToSingleLineString(e));
    }
    return null;
  }
  /**
   * Create a synchronized suffix in the current server providing the
   * replication Server ID.
   * @param changelogID
   */
  private void connectServer1ToChangelog(short changelogID)
  {
    // Connect DS to the replicationServer
    try
    {
      // suffix synchronized
      String synchroServerStringDN = synchroPluginStringDN;
      String synchroServerLdif =
        "dn: cn=" + baseSnStr + ", cn=domains," + synchroServerStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-replication-domain\n"
        + "cn: " + baseSnStr + "\n"
        + "ds-cfg-base-dn: " + baseDnStr + "\n"
        + "ds-cfg-replication-server: localhost:"
        + getChangelogPort(changelogID)+"\n"
        + "ds-cfg-server-id: " + server1ID + "\n"
        + "ds-cfg-receive-status: true\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
      if (synchroServerEntry == null)
      {
        synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
        DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
        configEntryList.add(synchroServerEntry.getDN());
        replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
      }
      if (replDomain != null)
      {
        debugInfo("ReplicationDomain: Import/Export is running ? " + replDomain.ieRunning());
      }
    }
    catch(Exception e)
    {
      debugInfo("connectToReplServer", e);
      fail("connectToReplServer", e);
    }
  }
  /*
   * Disconnect DS from the replicationServer
   */
  private void disconnectFromReplServer(short changelogID)
  {
    try
    {
      // suffix synchronized
      String synchroServerStringDN = "cn=" + baseSnStr + ", cn=domains," +
      synchroPluginStringDN;
      if (synchroServerEntry != null)
      {
        DN synchroServerDN = DN.decode(synchroServerStringDN);
        DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN,null);
        assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN())==null,
        "Unable to delete the synchronized domain");
        synchroServerEntry = null;
        configEntryList.remove(configEntryList.indexOf(synchroServerDN));
      }
    }
    catch(Exception e)
    {
      fail("disconnectFromReplServer", e);
    }
  }
  private int getChangelogPort(short changelogID)
  {
    if (replServerPort[changelogID] == 0)
    {
      try
      {
        // Find  a free port for the replicationServer
        ServerSocket socket = TestCaseUtils.bindFreePort();
        replServerPort[changelogID] = socket.getLocalPort();
        socket.close();
      }
      catch(Exception e)
      {
        fail("Cannot retrieve a free port for replication server."
            + e.getMessage());
      }
    }
    return replServerPort[changelogID];
  }
  private String createEntry(UUID uid)
  {
    String user2dn = "uid=user"+uid+",ou=People," + baseDnStr;
    return new String(
        "dn: "+ user2dn + "\n"
        + "objectClass: top\n" + "objectClass: person\n"
        + "objectClass: organizationalPerson\n"
        + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
        + "homePhone: 951-245-7634\n"
        + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
        + "mobile: 027-085-0537\n"
        + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
        + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
        + "cn: Aaccf Amar2\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
        + "street: 17984 Thirteenth Street\n"
        + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 2\n"
        + "sn: Amar2\n" + "givenName: Aaccf2\n" + "postalCode: 85762\n"
        + "userPassword: password\n" + "initials: AA\n");
  }
  static protected ReplicationMessage createAddMsg(short serverId)
  {
    Entry personWithUUIDEntry = null;
    String user1entryUUID;
    String baseUUID = null;
    String user1dn;
    /*
     * Create a Change number generator to generate new changenumbers
     * when we need to send operation messages to the replicationServer.
     */
    ChangeNumberGenerator gen = new ChangeNumberGenerator(serverId, 0);
    user1entryUUID = "33333333-3333-3333-3333-333333333333";
    user1dn = "uid=user1,ou=People," + baseDnStr;
    String entryWithUUIDldif = "dn: "+ user1dn + "\n"
    + "objectClass: top\n" + "objectClass: person\n"
    + "objectClass: organizationalPerson\n"
    + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
    + "homePhone: 951-245-7634\n"
    + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
    + "mobile: 027-085-0537\n"
    + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
    + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
    + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
    + "street: 17984 Thirteenth Street\n"
    + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
    + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
    + "userPassword: password\n" + "initials: AA\n"
    + "entryUUID: " + user1entryUUID + "\n";
    try
    {
      personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif);
    }
    catch(Exception e)
    {
      fail(e.getMessage());
    }
    // Create and publish an update message to add an entry.
    AddMsg addMsg = new AddMsg(gen.newChangeNumber(),
        personWithUUIDEntry.getDN().toString(),
        user1entryUUID,
        baseUUID,
        personWithUUIDEntry.getObjectClassAttribute(),
        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
    return addMsg;
  }
  @Test(enabled=false)
  public void testMultiRS() throws Exception
  {
    String testCase = "testMultiRS";
    debugInfo("Starting " + testCase);
    ReplicationDomain.clearJEBackend(false,
        "userRoot",
        baseDn.toNormalizedString());
    debugInfo ("Creating 2 RS");
    replServer1 = createReplicationServer(changelog1ID, true, testCase);
    replServer2 = createReplicationServer(changelog2ID, true, testCase);
    replServer3 = createReplicationServer(changelog3ID, true, testCase);
    Thread.sleep(500);
    debugInfo("Connecting DS to replServer1");
    connectServer1ToChangelog(changelog1ID);
    Thread.sleep(1500);
    try
    {
      debugInfo("Connecting broker2 to replServer1");
      broker2 = openReplicationSession(baseDn,
          server2ID, 100, getChangelogPort(changelog1ID),
          1000, !emptyOldChanges);
      Thread.sleep(1000);
    }
    catch(SocketException se)
    {
      fail("Broker connection is expected to be accepted.");
    }
    try
    {
      debugInfo("Connecting broker3 to replServer2");
      broker3 = openReplicationSession(baseDn,
          server3ID, 100, getChangelogPort(changelog2ID),
          1000, !emptyOldChanges);
      Thread.sleep(1000);
    }
    catch(SocketException se)
    {
      fail("Broker connection is expected to be accepted.");
    }
    try
    {
      debugInfo("Connecting broker4 to replServer2");
      broker4 = openReplicationSession(baseDn,
          server4ID, 100, getChangelogPort(changelog2ID),
          1000, !emptyOldChanges);
      Thread.sleep(1000);
    }
    catch(SocketException se)
    {
      fail("Broker connection is expected to be accepted.");
    }
    // Do a bunch of change
    updatedEntries = newLDIFEntries();
    this.addTestEntriesToDB(updatedEntries);
    for (int i = 0; i<200; i++)
    {
      String ent1[] = { createEntry(UUID.randomUUID()) };
      this.addTestEntriesToDB(ent1);
    }
    for (int i=0; i<10; i++)
    {
      broker3.publish(createAddMsg(server3ID));
    }
    searchMonitor();
    debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
    disconnectFromReplServer(changelog1ID);
    debugInfo("Cleaning entries");
    postTest();
    debugInfo("Successfully ending " + testCase);
  }
  /**
   * Disconnect broker and remove entries from the local DB
   * @throws Exception
   */
  protected void postTest()
  {
    debugInfo("Post test cleaning.");
    // Clean brokers
    if (broker2 != null)
      broker2.stop();
    broker2 = null;
    if (broker3 != null)
      broker3.stop();
    broker3 = null;
    if (broker4 != null)
      broker4.stop();
    broker4 = null;
    if (replServer1 != null)
      replServer1.remove();
    if (replServer2 != null)
      replServer2.remove();
    if (replServer3 != null)
      replServer3.remove();
    replServer1 = null;
    replServer2 = null;
    replServer3 = null;
    super.cleanRealEntries();
    try
    {
      ReplicationDomain.clearJEBackend(false,
        replDomain.getBackend().getBackendID(),
        baseDn.toNormalizedString());
    }
    catch (Exception e) {}
  }
  @Test(enabled=true)
  public void MonitorTest() throws Exception
  {
    testMultiRS();
  }
  private static final ByteArrayOutputStream oStream =
    new ByteArrayOutputStream();
  private static final ByteArrayOutputStream eStream =
    new ByteArrayOutputStream();
  private void searchMonitor()
  {
    // test search as directory manager returns content
    String[] args3 =
    {
      "-h", "127.0.0.1",
      "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
      "-D", "cn=Directory Manager",
      "-w", "password",
      "-b", "cn=monitor",
      "-s", "sub",
      "(base-dn=*)"
    };
    oStream.reset();
    eStream.reset();
    int retVal =
      LDAPSearch.mainSearch(args3, false, oStream, eStream);
    String entries = oStream.toString();
    debugInfo("Entries:" + entries);
    try
    {
      assertEquals(retVal, 0, "Returned error: " + eStream);
      assertTrue(!entries.equalsIgnoreCase(""), "Returned entries: " + entries);
    }
    catch(Exception e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          stackTraceToSingleLineString(new Exception()));
      fail(e.getMessage());
    }
  }
}