mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
03.41.2008 2942eaa1b7264228c9ca7535aabd206e663581e9
opends/src/messages/messages/replication.properties
@@ -251,4 +251,10 @@
  export-ldif command must be run as a task
NOTICE_SSL_SERVER_CON_ATTEMPT_ERROR_105=SSL connection attempt from %s (%s) \
  failed: %s
SEVERE_ERR_MISSING_REMOTE_MONITOR_DATA_106=Monitor data of remote servers \
 are missing due to an error in the retrieval process
SEVERE_ERR_PROCESSING_REMOTE_MONITOR_DATA_107=Monitor data of remote servers \
 are missing due to a processing error : %s
SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \
 sending request to get remote monitor data
opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
@@ -185,6 +185,40 @@
    }
  }
  /**
   * Computes the difference in number of changes between 2
   * change numbers.
   * @param op1 the first ChangeNumber
   * @param op2 the second ChangeNumber
   * @return the difference
   */
  public static int diffSeqNum(ChangeNumber op1, ChangeNumber op2)
  {
    int totalCount = 0;
    int max = op1.getSeqnum();
    if (op2 != null)
    {
      int current = op2.getSeqnum();
      if (current == max)
      {
      }
      else if (current < max)
      {
        totalCount += max - current;
      }
      else
      {
        totalCount += Integer.MAX_VALUE - (current - max) + 1;
      }
    }
    else
    {
      totalCount += max;
    }
    return totalCount;
  }
  /**
   * check if the current Object is strictly older than ChangeNumber
   * given in parameter.
opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -258,7 +258,7 @@
  @Override
  public String toString()
  {
    return ("ADD " + getDn() + " " + getChangeNumber());
    return ("ADD DN=(" + getDn() + ") CN=(" + getChangeNumber() + ")");
  }
  /**
opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
New file
@@ -0,0 +1,397 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import org.opends.server.replication.common.ServerState;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.asn1.ASN1Sequence;
import org.opends.server.protocols.asn1.ASN1Element;
import org.opends.server.replication.common.ChangeNumber;
/**
 * This message is part of the replication protocol.
 * This message is sent by a server to one or several other servers and
 * contain one entry to be sent over the protocol in the context of
 * an import/export over the protocol.
 */
public class MonitorMessage extends RoutableMessage implements
    Serializable
{
  private static final long serialVersionUID = -1900670921496804942L;
  /**
   * FIXME.
   *
   */
  class ServerData
  {
    ServerState state;
    Long approxFirstMissingDate;
  }
  /**
   * FIXME.
   *
   */
  class SubTopoMonitorData
  {
    ServerState replServerState;
    HashMap<Short, ServerData> ldapStates =
      new HashMap<Short, ServerData>();
  }
  SubTopoMonitorData data = new SubTopoMonitorData();;
  /**
   * Creates a new EntryMessage.
   *
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   */
  public MonitorMessage(short sender, short destination)
  {
    super(sender, destination);
  }
  /**
   * FIXME.
   * @param state a.
   */
  public void setReplServerState(ServerState state)
  {
    data.replServerState = state;
  }
  /**
   * FIXME.
   * @param serverId a.
   * @param state a.
   * @param olderUpdateTime a.
   *
   */
  public void setLDAPServerState(short serverId, ServerState state,
      Long olderUpdateTime)
  {
    if (data.ldapStates == null)
    {
      data.ldapStates = new HashMap<Short, ServerData>();
    }
    ServerData sd = new ServerData();
    sd.state = state;
    sd.approxFirstMissingDate = olderUpdateTime;
    data.ldapStates.put(serverId, sd);
  }
  /**
   * FIXME.
   * @param serverId a.
   * @return a.
   */
  public ServerState getLDAPServerState(short serverId)
  {
    return data.ldapStates.get(serverId).state;
  }
  /**
   * FIXME.
   * @param serverId a.
   * @return a.
   */
  public Long getApproxFirstMissingDate(short serverId)
  {
    return data.ldapStates.get(serverId).approxFirstMissingDate;
  }
  /**
   * Creates a new EntryMessage from its encoded form.
   *
   * @param in The byte array containing the encoded form of the message.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public MonitorMessage(byte[] in) throws DataFormatException
  {
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderIDString = new String(in, pos, length, "UTF-8");
      this.senderID = Short.valueOf(senderIDString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Short.valueOf(destinationString);
      pos += length +1;
       /* Read the states : all the remaining bytes but the terminating 0 */
      byte[] encodedS = new byte[in.length-pos-1];
      int i =0;
      while (pos<in.length-1)
      {
        encodedS[i++] = in[pos++];
      }
      try
      {
        ASN1Sequence s0 = ASN1Sequence.decodeAsSequence(encodedS);
        for (ASN1Element el0 : s0.elements())
        {
          ServerState newState = new ServerState();
          short serverId = 0;
          Long outime = (long)0;
          ASN1Sequence s1 = el0.decodeAsSequence();
          for (ASN1Element el1 : s1.elements())
          {
            ASN1OctetString o = el1.decodeAsOctetString();
            String s = o.stringValue();
            ChangeNumber cn = new ChangeNumber(s);
            if ((data.replServerState != null) && (serverId == 0))
            {
              serverId = cn.getServerId();
              outime = cn.getTime();
            }
            else
            {
              newState.update(cn);
            }
          }
          // the first state is the replication state
          if (data.replServerState == null)
          {
            data.replServerState = newState;
          }
          else
          {
            ServerData sd = new ServerData();
            sd.state = newState;
            sd.approxFirstMissingDate = outime;
            data.ldapStates.put(serverId, sd);
          }
        }
      } catch(Exception e)
      {
      }
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try
    {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length +
                   1 + destinationBytes.length;
      ASN1Sequence stateElementSequence = new ASN1Sequence();
      ArrayList<ASN1Element> stateElementList = new ArrayList<ASN1Element>();
      // First loop computes the length
      /* Put the serverStates ... */
      stateElementSequence = new ASN1Sequence();
      stateElementList = new ArrayList<ASN1Element>();
      /* first put the Replication Server state */
      ArrayList<ASN1OctetString> cnOctetList =
        data.replServerState.toASN1ArrayList();
      ArrayList<ASN1Element> cnElementList = new ArrayList<ASN1Element>();
      for (ASN1OctetString soci : cnOctetList)
      {
        cnElementList.add(soci);
      }
      ASN1Sequence cnSequence = new ASN1Sequence(cnElementList);
      stateElementList.add(cnSequence);
      // then the LDAP server data
      Set<Short> servers = data.ldapStates.keySet();
      for (Short sid : servers)
      {
        // State
        ServerState statei = data.ldapStates.get(sid).state;
        // First missing date
        Long outime =  data.ldapStates.get(sid).approxFirstMissingDate;
        // retrieves the change numbers as an arrayList of ANSN1OctetString
        cnOctetList = statei.toASN1ArrayList();
        cnElementList = new ArrayList<ASN1Element>();
        // a fake changenumber helps storing the LDAP server ID
        // and the olderupdatetime
        ChangeNumber cn = new ChangeNumber(outime,0,sid);
        cnElementList.add(new ASN1OctetString(cn.toString()));
        // the changenumbers
        for (ASN1OctetString soci : cnOctetList)
        {
          cnElementList.add(soci);
        }
        cnSequence = new ASN1Sequence(cnElementList);
        stateElementList.add(cnSequence);
      }
      stateElementSequence.setElements(stateElementList);
      int seqLen = stateElementSequence.encode().length;
      //
      length += seqLen;
      length += 2;
      // Allocate the array sized from the computed length
      byte[] resultByteArray = new byte[length];
      // Second loop build the array
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
      int pos = 1;
      pos = addByteArray(senderBytes, resultByteArray, pos);
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      /* Put the serverStates ... */
      stateElementSequence = new ASN1Sequence();
      stateElementList = new ArrayList<ASN1Element>();
      /* first put the Replication Server state */
      cnOctetList =
        data.replServerState.toASN1ArrayList();
      cnElementList = new ArrayList<ASN1Element>();
      for (ASN1OctetString soci : cnOctetList)
      {
        cnElementList.add(soci);
      }
      cnSequence = new ASN1Sequence(cnElementList);
      stateElementList.add(cnSequence);
      // then the LDAP server state
      servers = data.ldapStates.keySet();
      for (Short sid : servers)
      {
        ServerState statei = data.ldapStates.get(sid).state;
        Long outime = data.ldapStates.get(sid).approxFirstMissingDate;
        // retrieves the change numbers as an arrayList of ANSN1OctetString
        cnOctetList = statei.toASN1ArrayList();
        cnElementList = new ArrayList<ASN1Element>();
        // a fake changenumber helps storing the LDAP server ID
        ChangeNumber cn = new ChangeNumber(outime,0,sid);
        cnElementList.add(new ASN1OctetString(cn.toString()));
        // the changenumbers
        for (ASN1OctetString soci : cnOctetList)
        {
          cnElementList.add(soci);
        }
        cnSequence = new ASN1Sequence(cnElementList);
        stateElementList.add(cnSequence);
      }
      stateElementSequence.setElements(stateElementList);
      pos = addByteArray(stateElementSequence.encode(), resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
  /**
   * FIXME.
   * @return FIXME.
   */
  public ServerState getReplServerState()
  {
    return data.replServerState;
  }
  /**
   * FIXME.
   * @return a.
   */
  public Iterator<Short> iterator()
  {
    return data.ldapStates.keySet().iterator();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    String stateS = " RState:";
    stateS += "/" + data.replServerState.toString();
    stateS += " LDAPStates:";
    Iterator<ServerData> it = data.ldapStates.values().iterator();
    while (it.hasNext())
    {
      ServerData sd = it.next();
      stateS += "/ state=" + sd.state.toString()
      + " afmd=" + sd.approxFirstMissingDate + "] ";
    }
    String me = this.getClass().getCanonicalName() +
    " sender=" + this.senderID +
    " destination=" + this.destination +
    " states=" + stateS +
    "]";
    return me;
  }
}
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java
New file
@@ -0,0 +1,122 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is part of the replication protocol.
 * FIXME: usage
 */
public class MonitorRequestMessage extends RoutableMessage implements
    Serializable
{
  private static final long serialVersionUID = -2407640479423633234L;
  /**
   * Creates a message.
   *
   * @param sender The sender server of this message.
   * @param destination The server or servers targetted by this message.
   */
  public MonitorRequestMessage(short sender, short destination)
  {
    super(sender, destination);
  }
  /**
   * Creates a new message by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the message,
   * @throws DataFormatException If the in does not contain a properly,
   *                             encoded message.
   */
  public MonitorRequestMessage(byte[] in) throws DataFormatException
  {
    super();
    try
    {
      // First byte is the type
      if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      this.senderID = Short.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Short.valueOf(destinationString);
      pos += length +1;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    try
    {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length + 1
                     + destinationBytes.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR_REQUEST;
      int pos = 1;
      /* put the sender */
      pos = addByteArray(senderBytes, resultByteArray, pos);
      /* put the destination */
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 *      Portions Copyright 2007-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -67,7 +67,7 @@
      /* first byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_REPL_SERVER_INFO)
        throw new DataFormatException(
        "Input is not a valid changelogInfo Message.");
        "Input is not a valid " + this.getClass().getCanonicalName());
      int pos = 1;
@@ -97,7 +97,7 @@
  /**
   * Creates a new changelogInfo message from a list of the currently
   * Creates a new ReplServerInfo message from a list of the currently
   * connected servers.
   *
   * @param connectedServers The list of currently connected servers ID.
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -55,6 +55,8 @@
  static final byte MSG_TYPE_WINDOW_PROBE = 15;
  static final byte MSG_TYPE_REPL_SERVER_INFO = 16;
  static final byte MSG_TYPE_RESET_GENERATION_ID = 17;
  static final byte MSG_TYPE_REPL_SERVER_MONITOR_REQUEST = 18;
  static final byte MSG_TYPE_REPL_SERVER_MONITOR = 19;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
@@ -79,6 +81,8 @@
   * MSG_TYPE_WINDOW_PROBE
   * MSG_TYPE_REPL_SERVER_INFO
   * MSG_TYPE_RESET_GENERATION_ID
   * MSG_TYPE_REPL_SERVER_MONITOR_REQUEST
   * MSG_TYPE_REPL_SERVER_MONITOR
   *
   * @return the byte[] representation of this message.
   * @throws UnsupportedEncodingException  When the encoding of the message
@@ -152,6 +156,12 @@
      case MSG_TYPE_REPL_SERVER_INFO:
        msg = new ReplServerInfoMessage(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_MONITOR_REQUEST:
        msg = new MonitorRequestMessage(buffer);
      break;
      case MSG_TYPE_REPL_SERVER_MONITOR:
        msg = new MonitorMessage(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
@@ -192,7 +202,7 @@
    while (in[offset++] != 0)
    {
      if (offset >= in.length)
        throw new DataFormatException("byte[] is not a valid modify msg");
        throw new DataFormatException("byte[] is not a valid msg");
      length++;
    }
    return length;
opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
New file
@@ -0,0 +1,270 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashSet;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
/**
 * This class defines a server handler dedicated to the remote LDAP servers
 * connected to a remote Replication Server.
 * This class is necessary because we want to provide monitor entries for them
 * and because the monitor API only allows one entry by MonitorProvider instance
 * so that no other class can provide the monitor entry for these objects.
 *
 * One instance of this class is created for each instance of remote LDAP server
 * connected to a remote Replication Server.
 */
public class LightweightServerHandler
  extends MonitorProvider<MonitorProviderCfg>
{
  // The tracer object for the debug logger.
  private static final DebugTracer TRACER = getTracer();
  short serverId;
  ServerHandler replServerHandler;
  ReplicationServerDomain rsDomain;
  DN baseDn;
  /**
   * Creates a new LighweightServerHandler with the provided serverid, connected
   * to the remote Replication Server represented by replServerHandler.
   *
   * @param serverId The serverId of this remote LDAP server.
   * @param replServerHandler The server handler of the Replication Server to
   * which this LDAP server is remotely connected.
   */
  public LightweightServerHandler(String serverId,
      ServerHandler replServerHandler)
  {
    super("Server Handler");
    this.serverId = Short.valueOf(serverId);
    this.replServerHandler = replServerHandler;
    this.rsDomain = replServerHandler.getDomain();
    this.baseDn = rsDomain.getBaseDn();
    if (debugEnabled())
      TRACER.debugInfo(
        "In " +
  replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName()+
        " LWSH for remote server " + this.serverId +
        " connected to:" + this.replServerHandler.getMonitorInstanceName() +
        " ()");
}
  /**
   * Get the serverID associated with this LDAP server.
   * @return The serverId.
   */
  public short getServerId()
  {
    return Short.valueOf(serverId);
  }
  /**
   * Stop this server handler processing.
   */
  public void startHandler()
  {
    if (debugEnabled())
      TRACER.debugInfo(
      "In " +
replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
      " LWSH for remote server " + this.serverId +
      " connected to:" + this.replServerHandler.getMonitorInstanceName() +
          " start");
    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
    DirectoryServer.registerMonitorProvider(this);
  }
  /**
   * Stop this server handler processing.
   */
  public void stopHandler()
  {
    if (debugEnabled())
      TRACER.debugInfo(
      "In " +
replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
      " LWSH for remote server " + this.serverId +
      " connected to:" + this.replServerHandler.getMonitorInstanceName() +
          " stop");
    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
                          throws ConfigException,InitializationException
  {
    // Nothing to do for now
  }
  /**
   * Retrieves the name of this monitor provider.  It should be unique among all
   * monitor providers, including all instances of the same monitor provider.
   *
   * @return  The name of this monitor provider.
   */
  @Override
  public String getMonitorInstanceName()
  {
    String serverURL=""; // FIXME
    String str = baseDn.toString() + " " + serverURL + " "
       + String.valueOf(serverId);
    return "Undirect LDAP Server " + str;
  }
  /**
   * Retrieves the length of time in milliseconds that should elapse between
   * calls to the <CODE>updateMonitorData()</CODE> method.  A negative or zero
   * return value indicates that the <CODE>updateMonitorData()</CODE> method
   * should not be periodically invoked.
   *
   * @return  The length of time in milliseconds that should elapse between
   *          calls to the <CODE>updateMonitorData()</CODE> method.
   */
  @Override
  public long getUpdateInterval()
  {
    /* we don't wont to do polling on this monitor */
    return 0;
  }
  /**
   * Performs any processing periodic processing that may be desired to update
   * the information associated with this monitor.  Note that best-effort
   * attempts will be made to ensure that calls to this method come
   * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
   * be made.
   */
  @Override
  public void updateMonitorData()
  {
    // As long as getUpdateInterval() returns 0, this will never get called
  }
  /**
   * Retrieves a set of attributes containing monitor data that should be
   * returned to the client if the corresponding monitor entry is requested.
   *
   * @return  A set of attributes containing monitor data that should be
   *          returned to the client if the corresponding monitor entry is
   *          requested.
   */
  @Override
  public ArrayList<Attribute> getMonitorData()
  {
    if (debugEnabled())
      TRACER.debugInfo(
          "In " +
          this.replServerHandler.getDomain().getReplicationServer().
          getMonitorInstanceName()+
          " LWSH for remote server " + this.serverId +
          " connected to:" + this.replServerHandler.getMonitorInstanceName() +
      " getMonitor data");
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    attributes.add(new Attribute("server-id",
        String.valueOf(serverId)));
    attributes.add(new Attribute("base-dn",
        rsDomain.getBaseDn().toNormalizedString()));
    attributes.add(new Attribute("connected-to",
        replServerHandler.getMonitorInstanceName()));
    // Retrieves the topology counters
    try
    {
      rsDomain.retrievesRemoteMonitorData();
      // Compute the latency for the current SH
      ServerState remoteState = rsDomain.getServerState(serverId);
      if (remoteState == null)
      {
        remoteState = new ServerState();
      }
      /* get the Server State */
      final String ATTR_SERVER_STATE = "server-state";
      AttributeType type =
        DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
      LinkedHashSet<AttributeValue> values =
        new LinkedHashSet<AttributeValue>();
      for (String str : remoteState.toStringSet())
      {
        values.add(new AttributeValue(type,str));
      }
      Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
      attributes.add(attr);
      // add the latency attribute to our monitor data
      // Compute the latency for the current SH
      int missingChanges = rsDomain.getMissingChanges(remoteState);
      attributes.add(new Attribute("missing-changes",
          String.valueOf(missingChanges)));
      // Add the oldest missing update
      Long olderUpdateTime = rsDomain.getApproxFirstMissingDate(serverId);
      if (olderUpdateTime != null)
      {
        Date date = new Date(olderUpdateTime);
        attributes.add(new Attribute("approx-older-change-not-synchronized",
          date.toString()));
      }
    }
    catch(Exception e)
    {
      // We failed retrieving the remote monitor data.
      attributes.add(new Attribute("error",
        stackTraceToSingleLineString(e)));
    }
    return attributes;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,25 +22,31 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
@@ -49,9 +55,13 @@
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.replication.protocol.MonitorMessage;
import org.opends.server.replication.protocol.MonitorRequestMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
/**
@@ -118,6 +128,34 @@
   */
  private static final DebugTracer TRACER = getTracer();
  /* Monitor data management */
  // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
  private long remoteMonitorDataLifeTime = 500;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
   */
  Semaphore remoteMonitorResponsesSemaphore;
  /* The date of the last time they have been elaborated */
  private long validityDate = 0;
  // For each LDAP server, its server state
  private HashMap<Short, ServerState> LDAPStates =
    new HashMap<Short, ServerState>();
  // For each LDAP server, the last CN it published
  private HashMap<Short, ChangeNumber> maxCNs =
    new HashMap<Short, ChangeNumber>();
  // For each LDAP server, an approximation of the date of the first missing
  // change
  private HashMap<Short, Long> approxFirstMissingDate =
    new HashMap<Short, Long>();
  /**
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
   *
@@ -352,7 +390,7 @@
        }
        else
        {
          if (!rsh.getRemoteLDAPServers().isEmpty())
          if (rsh.hasRemoteLDAPServers())
          {
            lDAPServersConnectedInTheTopology = true;
@@ -636,7 +674,7 @@
        // server connected
        for (ServerHandler rsh : replicationServers.values())
        {
          if (!rsh.getRemoteLDAPServers().isEmpty())
          if (rsh.hasRemoteLDAPServers())
          {
            servers.add(rsh);
          }
@@ -693,15 +731,58 @@
   */
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  {
    // A replication server is not expected to be the destination
    // of a routable message except for an error message.
    // Test the message for which a ReplicationServer is expected
    // to be the destination
    if (msg.getDestination() == this.replicationServer.getServerId())
    {
      if (msg instanceof ErrorMessage)
      {
        ErrorMessage errorMsg = (ErrorMessage)msg;
        logError(ERR_ERROR_MSG_RECEIVED.get(
                   errorMsg.getDetails()));
            errorMsg.getDetails()));
      }
      else if (msg instanceof MonitorRequestMessage)
      {
        MonitorRequestMessage replServerMonitorRequestMsg =
          (MonitorRequestMessage) msg;
        MonitorMessage monitorMsg =
          new MonitorMessage(
              replServerMonitorRequestMsg.getDestination(),
              replServerMonitorRequestMsg.getsenderID());
        // Populate the RS state in the msg from the DbState
        monitorMsg.setReplServerState(this.getDbServerState());
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
        // - the server state
        // - the older missing change
        for (ServerHandler lsh : this.connectedServers.values())
        {
          monitorMsg.setLDAPServerState(
              lsh.getServerId(),
              lsh.getServerState(),
              lsh.getApproxFirstMissingDate());
        }
        try
        {
          senderHandler.send(monitorMsg);
        }
        catch(Exception e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
              Short.toString((msg.getDestination()))));
        }
      }
      else if (msg instanceof MonitorMessage)
      {
        MonitorMessage monitorMsg =
          (MonitorMessage) msg;
        receivesMonitorDataResponse(monitorMsg);
      }
      else
      {
@@ -1156,4 +1237,288 @@
    {
      return replicationServer;
    }
    /*
     * Monitor Data generation
     */
    /**
     * Retrieves the remote monitor data.
     *
     * @throws DirectoryException When an error occurs.
     */
    protected void retrievesRemoteMonitorData()
      throws DirectoryException
    {
      if (validityDate > TimeThread.getTime())
      {
        // The current data are still valid. No need to renew them.
        return;
      }
      // Clean
      this.LDAPStates.clear();
      this.maxCNs.clear();
      // Init the maxCNs of our direct LDAP servers from our own dbstate
      for (ServerHandler rs : connectedServers.values())
      {
        short serverID = rs.getServerId();
        ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
        if (cn == null)
        {
          // we have nothing in db for that server
          cn = new ChangeNumber(0, 0 , serverID);
        }
        this.maxCNs.put(serverID, cn);
      }
      ServerState replServerState = this.getDbServerState();
      Iterator<Short> it = replServerState.iterator();
      while (it.hasNext())
      {
        short sid = it.next();
        ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
        ChangeNumber maxCN = this.maxCNs.get(sid);
        if ((maxCN != null) && (receivedCN.newer(maxCN)))
        {
          // We found a newer one
          this.maxCNs.remove(sid);
          this.maxCNs.put(sid, receivedCN);
        }
      }
      // Send Request to the other Replication Servers
      if (remoteMonitorResponsesSemaphore == null)
      {
        remoteMonitorResponsesSemaphore = new Semaphore(
            replicationServers.size() -1);
        sendMonitorDataRequest();
        // Wait reponses from them or timeout
        waitMonitorDataResponses(replicationServers.size());
      }
      else
      {
        // The processing of renewing the monitor cache is already running
        // We'll make it sleeping until the end
        while (remoteMonitorResponsesSemaphore!=null)
        {
          waitMonitorDataResponses(1);
        }
      }
      // Now we have the expected answers of an error occured
      validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
      if (debugEnabled())
      {
        debugMonitorData();
      }
    }
    private void debugMonitorData()
    {
      String mds = " Monitor data=";
      Iterator<Short> ite = LDAPStates.keySet().iterator();
      while (ite.hasNext())
      {
        Short sid = ite.next();
        ServerState ss = LDAPStates.get(sid);
        mds += " LDAPState(" + sid + ")=" + ss.toString();
      }
      Iterator<Short> itc = maxCNs.keySet().iterator();
      while (itc.hasNext())
      {
        Short sid = itc.next();
        ChangeNumber cn = maxCNs.get(sid);
        mds += " maxCNs(" + sid + ")=" + cn.toString();
      }
      mds += "--";
      TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          mds);
    }
    /**
     * Sends a MonitorRequest message to all connected RS.
     * @throws DirectoryException when a problem occurs.
     */
    protected void sendMonitorDataRequest()
      throws DirectoryException
    {
      try
      {
        for (ServerHandler rs : replicationServers.values())
        {
          MonitorRequestMessage msg = new
            MonitorRequestMessage(this.replicationServer.getServerId(),
              rs.getServerId());
          rs.send(msg);
        }
      }
      catch(Exception e)
      {
        Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
        logError(message);
        throw new DirectoryException(ResultCode.OTHER,
            message, e);
      }
    }
    /**
     * Wait for the expected count of received MonitorMessage.
     * @param expectedResponses The number of expected answers.
     * @throws DirectoryException When an error occurs.
     */
    protected void waitMonitorDataResponses(int expectedResponses)
      throws DirectoryException
    {
      try
      {
        boolean allPermitsAcquired =
          remoteMonitorResponsesSemaphore.tryAcquire(
              expectedResponses,
              (long) 500, TimeUnit.MILLISECONDS);
        if (!allPermitsAcquired)
        {
          logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
        }
        else
        {
          if (debugEnabled())
            TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            "Successfully received all " + replicationServers.size()
            + " expected monitor messages");
        }
      }
      catch(Exception e)
      {
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
      }
      finally
      {
        remoteMonitorResponsesSemaphore = null;
      }
    }
    /**
     * Processes a Monitor message receives from a remote Replication Server
     * and stores the data received.
     *
     * @param msg The message to be processed.
     */
    public void receivesMonitorDataResponse(MonitorMessage msg)
    {
      if (remoteMonitorResponsesSemaphore == null)
      {
        // Ignoring the remote monitor data because an error occured previously
        return;
      }
      try
      {
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN accross the RSes
        ServerState replServerState = msg.getReplServerState();
        Iterator<Short> it = replServerState.iterator();
        while (it.hasNext())
        {
          short sid = it.next();
          ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
          ChangeNumber maxCN = this.maxCNs.get(sid);
          if (receivedCN.newer(maxCN))
          {
            // We found a newer one
            this.maxCNs.remove(sid);
            this.maxCNs.put(sid, receivedCN);
          }
        }
        // Store the LDAP servers states
        Iterator<Short> sidIterator = msg.iterator();
        while (sidIterator.hasNext())
        {
          short sid = sidIterator.next();
          ServerState ss = msg.getLDAPServerState(sid);
          this.LDAPStates.put(sid, ss);
          this.approxFirstMissingDate.put(sid,
              msg.getApproxFirstMissingDate(sid));
        }
        // Decreases the number of expected responses and potentially
        // wakes up the waiting requestor thread.
        remoteMonitorResponsesSemaphore.release();
      }
      catch (Exception e)
      {
        // If an exception occurs while processing one of the expected message,
        // the processing is aborted and the waiting thread is awoke.
        remoteMonitorResponsesSemaphore.notifyAll();
      }
    }
    /**
     * Get the state of the LDAP server with the provided serverId.
     * @param serverId The server ID.
     * @return The server state.
     */
    public ServerState getServerState(short serverId)
    {
      return LDAPStates.get(serverId);
    }
    /**
     * Get the highest know change number of the LDAP server with the provided
     * serverId.
     * @param serverId The server ID.
     * @return The highest change number.
     */
    public ChangeNumber getMaxCN(short serverId)
    {
      return maxCNs.get(serverId);
    }
    /**
     * Get an approximation of the date of the oldest missing changes.
     * serverId.
     * @param serverId The server ID.
     * @return The approximation of the date of the oldest missing change.
     */
    public Long getApproxFirstMissingDate(short serverId)
    {
      return approxFirstMissingDate.get(serverId);
    }
    /**
     * Get the number of missing change for the server with the provided state.
     * @param state The provided server state.
     * @return The number of missing changes.
     */
    public int getMissingChanges(ServerState state)
    {
      // Traverse the max Cn transmitted by each server
      // For each server, get the highest CN know from the current server
      // Sum the difference betwenn the max and the last
      int missingChanges = 0;
      Iterator<Short> itc = maxCNs.keySet().iterator();
      while (itc.hasNext())
      {
        Short sid = itc.next();
        ChangeNumber maxCN = maxCNs.get(sid);
        ChangeNumber last = state.getMaxChangeNumber(sid);
        if (last == null)
        {
          last = new ChangeNumber(0,0, sid);
        }
        int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
        missingChanges += missingChangesFromSID;
      }
      return missingChanges;
    }
}
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -22,33 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -62,9 +36,9 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -150,11 +124,12 @@
  /**
   * When this Handler is connected to a changelog server this collection
   * will contain the list of LDAP servers connected to the remote changelog
   * server.
   * When this Handler is connected to a remote replication server
   * this collection will contain as many elements as there are
   * LDAP servers connected to the remote replication server.
   */
  private List<String> remoteLDAPservers = new ArrayList<String>();
  private List<LightweightServerHandler>
     remoteLDAPservers = new ArrayList<LightweightServerHandler>();
  /**
   * The time in milliseconds between heartbeats from the replication
@@ -830,27 +805,8 @@
       ServerState dbState = replicationServerDomain.getDbServerState();
       for (short id : dbState)
       {
         int max = dbState.getMaxChangeNumber(id).getSeqnum();
         ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
         if (currentChange != null)
         {
           int current = currentChange.getSeqnum();
           if (current == max)
           {
           }
           else if (current < max)
           {
             totalCount += max - current;
           }
           else
           {
             totalCount += Integer.MAX_VALUE - (current - max) + 1;
           }
         }
         else
         {
           totalCount += max;
         }
         totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
             serverState.getMaxChangeNumber(id));
       }
       return totalCount;
     }
@@ -858,7 +814,7 @@
  }
  /**
   * Get an approximation of the delay by looking at the age of the odest
   * Get an approximation of the delay by looking at the age of the oldest
   * message that has not been sent to this server.
   * This is an approximation because the age is calculated using the
   * clock of the servee where the replicationServer is currently running
@@ -886,25 +842,65 @@
   * @return The age if the older change has not yet been replicated
   *         to the server handled by this ServerHandler.
   */
  public Long getApproxFirstMissingDate()
  {
    // Get the older CN received
    // From it, get the next sequence number
    // Get the CN for the next sequence number
    // If not present in the local RS db,
    // then approximate with the older update time
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return null;
    ReplicationIterator ri =
      replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN);
    if (ri != null)
    {
      if (ri.next())
      {
        ChangeNumber firstMissingChange = ri.getChange().getChangeNumber();
        return firstMissingChange.getTime();
      }
    }
    return olderUpdateCN.getTime();
  }
  /**
   * Get the older update time for that server.
   * @return The older update time.
   */
  public long getOlderUpdateTime()
  {
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return 0;
    return  olderUpdateCN.getTime();
  }
  /**
   * Get the older Change Number for that server.
   * @return The older change number.
   */
  public ChangeNumber getOlderUpdateCN()
  {
    synchronized (msgQueue)
    {
      if (isFollowing())
      {
        if (msgQueue.isEmpty())
          return 0;
          return null;
        UpdateMessage msg = msgQueue.first();
        return msg.getChangeNumber().getTime();
        return msg.getChangeNumber();
      }
      else
      {
        if (lateQueue.isEmpty())
          return 0;
          return null;
        UpdateMessage msg = lateQueue.first();
        return msg.getChangeNumber().getTime();
        return msg.getChangeNumber();
      }
    }
  }
@@ -1190,6 +1186,16 @@
  }
  /**
   * Get the state of this server.
   *
   * @return ServerState the state for this server..
   */
  public ServerState getServerState()
  {
    return serverState;
  }
  /**
   * Stop this server handler processing.
   */
  public void stopHandler()
@@ -1397,7 +1403,7 @@
                 " " + serverURL + " " + String.valueOf(serverId);
    if (serverIsLDAPserver)
      return "Remote LDAP Server " + str;
      return "Direct LDAP Server " + str;
    else
      return "Remote Repl Server " + str;
  }
@@ -1445,28 +1451,68 @@
  {
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    if (serverIsLDAPserver)
    {
      attributes.add(new Attribute("LDAP-Server", serverURL));
      attributes.add(new Attribute("connected-to", this.replicationServerDomain.
          getReplicationServer().getMonitorInstanceName()));
      // Add the oldest missing update
      Long olderUpdateTime = this.getApproxFirstMissingDate();
      if (olderUpdateTime != null)
      {
        Date date = new Date(olderUpdateTime);
        attributes.add(new Attribute("approx-older-change-not-synchronized",
          date.toString()));
      }
    }
    else
    {
      attributes.add(new Attribute("ReplicationServer-Server", serverURL));
    }
    attributes.add(new Attribute("server-id",
                                 String.valueOf(serverId)));
    attributes.add(new Attribute("base-dn",
                                 baseDn.toString()));
    attributes.add(new Attribute("waiting-changes",
                                 String.valueOf(getRcvMsgQueueSize())));
    attributes.add(new Attribute("max-waiting-changes",
                                 String.valueOf(maxQueueSize)));
    attributes.add(new Attribute("update-waiting-acks",
                                 String.valueOf(getWaitingAckSize())));
    // Update stats
    // Retrieves the topology counters
    if (serverIsLDAPserver)
    {
      try
      {
        replicationServerDomain.retrievesRemoteMonitorData();
      }
      catch(Exception e)
      {
        // FIXME: We failed retrieving the remote monitor data
      }
      // Compute the latency for the current SH
      int missingChanges =
        replicationServerDomain.getMissingChanges(serverState);
      // add the latency attribute to our monitor data
      attributes.add(new Attribute("missing-changes",
          String.valueOf(missingChanges)));
    }
    // Deprecated
    // attributes.add(new Attribute("max-waiting-changes",
    //                              String.valueOf(maxQueueSize)));
    attributes.add(new Attribute("update-sent",
                                 String.valueOf(getOutCount())));
    attributes.add(new Attribute("update-received",
                                 String.valueOf(getInCount())));
    // Deprecated as long as assured is not exposed
    attributes.add(new Attribute("update-waiting-acks",
        String.valueOf(getWaitingAckSize())));
    attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
    attributes.add(new Attribute("ack-received",
                                 String.valueOf(getInAckCount())));
    attributes.add(new Attribute("approximate-delay",
                                 String.valueOf(getApproxDelay())));
    // Window stats
    attributes.add(new Attribute("max-send-window",
                                 String.valueOf(sendWindowSize)));
    attributes.add(new Attribute("current-send-window",
@@ -1475,6 +1521,18 @@
                                 String.valueOf(maxRcvWindow)));
    attributes.add(new Attribute("current-rcv-window",
                                 String.valueOf(rcvWindow)));
    /*
     * FIXME:PGB DEPRECATED
     *
    // Missing changes
    attributes.add(new Attribute("waiting-changes",
        String.valueOf(getRcvMsgQueueSize())));
    // Age of oldest missing change
    attributes.add(new Attribute("approximate-delay",
                                 String.valueOf(getApproxDelay())));
    // Date of the oldest missing change
    long olderUpdateTime = getOlderUpdateTime();
    if (olderUpdateTime != 0)
    {
@@ -1482,6 +1540,7 @@
      attributes.add(new Attribute("older-change-not-synchronized",
                                 String.valueOf(date.toString())));
    }
    */
    /* get the Server State */
    final String ATTR_SERVER_STATE = "server-state";
@@ -1495,9 +1554,11 @@
    Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
    attributes.add(attr);
    // Encryption
    attributes.add(new Attribute("ssl-encryption",
        String.valueOf(session.isEncrypted())));
    // Data generation
    attributes.add(new Attribute("generation-id",
        String.valueOf(generationId)));
@@ -1663,8 +1724,28 @@
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sets replServerInfo " + "<" + infoMsg + ">");
     remoteLDAPservers = infoMsg.getConnectedServers();
     List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
     generationId = infoMsg.getGenerationId();
     synchronized(remoteLDAPservers)
     {
       // Removes the existing structures
       for (LightweightServerHandler lsh : remoteLDAPservers)
       {
         lsh.stopHandler();
       }
       remoteLDAPservers.clear();
       // Creates the new structure according to the message received.
       for (String newConnectedServer : newRemoteLDAPservers)
       {
         LightweightServerHandler lsh
         = new LightweightServerHandler(newConnectedServer, this);
         lsh.startHandler();
         remoteLDAPservers.add(lsh);
       }
     }
   }
   /**
@@ -1678,9 +1759,9 @@
    */
   public boolean isRemoteLDAPServer(short wantedServer)
   {
     for (String server : remoteLDAPservers)
     for (LightweightServerHandler server : remoteLDAPservers)
     {
       if (wantedServer == Short.valueOf(server))
       if (wantedServer == server.getServerId())
       {
         return true;
       }
@@ -1695,9 +1776,9 @@
    * @return boolean True is the replication server has remote LDAP servers
    * connected to it.
    */
   public List<String> getRemoteLDAPServers()
   public boolean hasRemoteLDAPServers()
   {
     return remoteLDAPservers;
     return !remoteLDAPservers.isEmpty();
   }
  /**
@@ -1802,4 +1883,14 @@
  {
    this.generationId = generationId;
  }
  /**
   * Returns the Replication Server Domain to which belongs this server handler.
   *
   * @return The replication server domain.
   */
  public ReplicationServerDomain getDomain()
  {
    return this.replicationServerDomain;
  }
}
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
@@ -49,6 +49,8 @@
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.protocol.ReplServerInfoMessage;
import org.opends.server.replication.protocol.MonitorMessage;
import org.opends.server.replication.protocol.MonitorRequestMessage;
import org.opends.server.loggers.debug.DebugTracer;
@@ -246,6 +248,17 @@
            }
          }
        }
        else if (msg instanceof MonitorRequestMessage)
        {
          MonitorRequestMessage replServerMonitorRequestMsg =
            (MonitorRequestMessage) msg;
          handler.process(replServerMonitorRequestMsg);
        }
        else if (msg instanceof MonitorMessage)
        {
          MonitorMessage replServerMonitorMsg = (MonitorMessage) msg;
          handler.process(replServerMonitorMsg);
        }
        else if (msg == null)
        {
          /*
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
@@ -126,7 +126,7 @@
            "In " + replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
            ", writer to " + this.handler.getMonitorInstanceName() +
            " publishes msg=" + update.toString() +
            " publishes msg=[" + update.toString() + "]"+
            " refgenId=" + referenceGenerationId +
            " server=" + handler.getServerId() +
            " generationId=" + handler.getGenerationId());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication;
@@ -917,6 +917,11 @@
          TRACER.debugInfo("Failed to add entry " + entry.getDN() +
              "Result code = : " + addOp.getResultCode());
        }
        else
        {
          TRACER.debugInfo(entry.getDN() +
              " added " + addOp.getResultCode());
        }
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
@@ -303,4 +303,29 @@
    CN2 = cng.newChangeNumber();
    assertTrue(CN1.compareTo(CN2) != 0 );
  }
  /**
   * Test the difference in seq num between 2 change numbers.
   */
  @Test
  public void changeNumberDiffSeqNum()
         throws Exception
  {
    ChangeNumber CN1;
    ChangeNumber CN2;
    CN1 = new ChangeNumber((long)0, 3, (short)0);
    // 3-1 = 2
    CN2 = new ChangeNumber((long)0, 1, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 2);
    // 3-3 = 0
    CN2 = new ChangeNumber((long)0, 3, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 0);
    // 3-4 == MAXINT (modulo)
    CN2 = new ChangeNumber((long)0, 4, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -30,7 +30,9 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
@@ -560,7 +562,7 @@
   * an exception.
   */
  @Test()
  public void WindowProbeTest() throws Exception
  public void windowProbeTest() throws Exception
  {
    WindowProbe msg = new WindowProbe();
    new WindowProbe(msg.getBytes());
@@ -583,6 +585,74 @@
  }
  /**
   * Test MonitorMessage
   */
  @Test()
  public void monitorMessageTest() throws Exception
  {
    short sender = 2;
    short dest = 3;
    // RS State
    ServerState rsState = new ServerState();
    ChangeNumber rscn1 = new ChangeNumber(1, (short) 1, (short) 1);
    ChangeNumber rscn2 = new ChangeNumber(1, (short) 1, (short) 2);
    rsState.update(rscn1);
    rsState.update(rscn2);
    // LS1 state
    ServerState s1 = new ServerState();
    short sid1 = 111;
    ChangeNumber cn1 = new ChangeNumber(1, (short) 1, sid1);
    s1.update(cn1);
    // LS2 state
    ServerState s2 = new ServerState();
    short sid2 = 222;
    Long now = TimeThread.getTime();
    ChangeNumber cn2 = new ChangeNumber(now,
                                       (short) 123, sid2);
    s2.update(cn2);
    MonitorMessage msg =
      new MonitorMessage(sender, dest);
    msg.setReplServerState(rsState);
    msg.setLDAPServerState(sid1, s1, now+1);
    msg.setLDAPServerState(sid2, s2, now+2);
    byte[] b = msg.getBytes();
    MonitorMessage newMsg = new MonitorMessage(b);
    assertEquals(rsState, msg.getReplServerState());
    assertEquals(newMsg.getReplServerState().toString(),
        msg.getReplServerState().toString());
    Iterator<Short> it = newMsg.iterator();
    while (it.hasNext())
    {
      short sid = it.next();
      ServerState s = newMsg.getLDAPServerState(sid);
      if (sid == sid1)
      {
        assertEquals(s.toString(), s1.toString(), "");
        assertEquals((Long)(now+1), newMsg.getApproxFirstMissingDate(sid), "");
      }
      else if (sid == sid2)
      {
        assertEquals(s.toString(), s2.toString());
        assertEquals((Long)(now+2), newMsg.getApproxFirstMissingDate(sid), "");
      }
      else
      {
        fail("Bad sid");
      }
    }
    assertEquals(newMsg.getsenderID(), msg.getsenderID());
    assertEquals(newMsg.getDestination(), msg.getDestination());
  }
  /**
   * Test that EntryMessage encoding and decoding works
   * by checking that : msg == new EntryMessageTest(msg.getBytes()).
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
New file
@@ -0,0 +1,599 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.net.ServerSocket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.tools.LDAPSearch;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
 * Tests for the replicationServer code.
 */
public class MonitorTest extends ReplicationTestCase
{
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  private static final String baseDnStr = "dc=example,dc=com";
  private static final String baseSnStr = "genidcom";
  private static final int   WINDOW_SIZE = 10;
  private static final int   CHANGELOG_QUEUE_SIZE = 100;
  private static final short server1ID = 1;
  private static final short server2ID = 2;
  private static final short server3ID = 3;
  private static final short server4ID = 4;
  private static final short changelog1ID = 11;
  private static final short changelog2ID = 12;
  private static final short changelog3ID = 13;
  private DN baseDn;
  private ReplicationBroker broker2 = null;
  private ReplicationBroker broker3 = null;
  private ReplicationBroker broker4 = null;
  private ReplicationServer replServer1 = null;
  private ReplicationServer replServer2 = null;
  private ReplicationServer replServer3 = null;
  private boolean emptyOldChanges = true;
  ReplicationDomain replDomain = null;
  SocketSession ssSession = null;
  boolean ssShutdownRequested = false;
  protected String[] updatedEntries;
  private static int[] replServerPort = new int[20];
  private void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
    if (debugEnabled())
    {
      TRACER.debugInfo("** TEST **" + s);
    }
  }
  protected void debugInfo(String message, Exception e)
  {
    debugInfo(message + stackTraceToSingleLineString(e));
  }
  /**
   * Set up the environment for performing the tests in this Class.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  public void setUp() throws Exception
  {
    //log("Starting generationIdTest setup: debugEnabled:" + debugEnabled());
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    baseDn = DN.decode(baseDnStr);
    updatedEntries = newLDIFEntries();
    // Create an internal connection in order to provide operations
    // to DS to populate the db -
    connection = InternalClientConnection.getRootConnection();
    // Synchro provider
    String synchroStringDN = "cn=Synchronization Providers,cn=config";
    // Synchro multi-master
    synchroPluginStringDN = "cn=Multimaster Synchronization, "
      + synchroStringDN;
    // Synchro suffix
    synchroServerEntry = null;
    // Add config entries to the current DS server based on :
    // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN
    // Add synchroServerEntry
    // Add replServerEntry
    configureReplication();
    // Change log
    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
    + "objectClass: top\n"
    + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
    + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
    + "ds-cfg-changelog-server-id: 1\n"
    + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
    + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
    replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
    replServerEntry = null;
  }
  /*
   * Creates entries necessary to the test.
   */
  private String[] newLDIFEntries()
  {
    String[] entries =
    {
        "dn: " + baseDn + "\n"
        + "objectClass: top\n"
        + "objectClass: domain\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
        + "\n",
        "dn: ou=People," + baseDn + "\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
        + "\n",
        "dn: cn=Fiona Jensen,ou=people," + baseDn + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Fiona Jensen\n"
        + "sn: Jensen\n"
        + "uid: fiona\n"
        + "telephonenumber: +1 408 555 1212\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
        + "\n",
        "dn: cn=Robert Langman,ou=people," + baseDn + "\n"
        + "objectclass: top\n"
        + "objectclass: person\n"
        + "objectclass: organizationalPerson\n"
        + "objectclass: inetOrgPerson\n"
        + "cn: Robert Langman\n"
        + "sn: Langman\n"
        + "uid: robert\n"
        + "telephonenumber: +1 408 555 1213\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
        + "\n"
    };
    return entries;
  }
  /**
   * Creates a new replicationServer.
   * @param changelogId The serverID of the replicationServer to create.
   * @param all         Specifies whether to coonect the created replication
   *                    server to the other replication servers in the test.
   * @return The new created replication server.
   */
  private ReplicationServer createReplicationServer(short changelogId,
      boolean all, String suffix)
  {
    SortedSet<String> servers = null;
    servers = new TreeSet<String>();
    try
    {
      if (changelogId==changelog1ID)
      {
        if (replServer1!=null)
          return replServer1;
      }
      else if (changelogId==changelog2ID)
      {
        if (replServer2!=null)
          return replServer2;
      }
      else if (changelogId==changelog3ID)
      {
        if (replServer3!=null)
          return replServer3;
      }
      if (all)
      {
        servers.add("localhost:" + getChangelogPort(changelog1ID));
        servers.add("localhost:" + getChangelogPort(changelog2ID));
      }
      int chPort = getChangelogPort(changelogId);
      String chDir = "genid"+changelogId+suffix+"Db";
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100,
            servers);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      Thread.sleep(1000);
      return replicationServer;
    }
    catch (Exception e)
    {
      fail("createChangelog" + stackTraceToSingleLineString(e));
    }
    return null;
  }
  /**
   * Create a synchronized suffix in the current server providing the
   * replication Server ID.
   * @param changelogID
   */
  private void connectServer1ToChangelog(short changelogID)
  {
    // Connect DS to the replicationServer
    try
    {
      // suffix synchronized
      String synchroServerStringDN = synchroPluginStringDN;
      String synchroServerLdif =
        "dn: cn=" + baseSnStr + ", cn=domains," + synchroServerStringDN + "\n"
        + "objectClass: top\n"
        + "objectClass: ds-cfg-replication-domain\n"
        + "cn: " + baseSnStr + "\n"
        + "ds-cfg-base-dn: " + baseDnStr + "\n"
        + "ds-cfg-replication-server: localhost:"
        + getChangelogPort(changelogID)+"\n"
        + "ds-cfg-server-id: " + server1ID + "\n"
        + "ds-cfg-receive-status: true\n"
        + "ds-cfg-window-size: " + WINDOW_SIZE;
      if (synchroServerEntry == null)
      {
        synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
        DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
        configEntryList.add(synchroServerEntry.getDN());
        replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
      }
      if (replDomain != null)
      {
        debugInfo("ReplicationDomain: Import/Export is running ? " + replDomain.ieRunning());
      }
    }
    catch(Exception e)
    {
      debugInfo("connectToReplServer", e);
      fail("connectToReplServer", e);
    }
  }
  /*
   * Disconnect DS from the replicationServer
   */
  private void disconnectFromReplServer(short changelogID)
  {
    try
    {
      // suffix synchronized
      String synchroServerStringDN = "cn=" + baseSnStr + ", cn=domains," +
      synchroPluginStringDN;
      if (synchroServerEntry != null)
      {
        DN synchroServerDN = DN.decode(synchroServerStringDN);
        DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN,null);
        assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN())==null,
        "Unable to delete the synchronized domain");
        synchroServerEntry = null;
        configEntryList.remove(configEntryList.indexOf(synchroServerDN));
      }
    }
    catch(Exception e)
    {
      fail("disconnectFromReplServer", e);
    }
  }
  private int getChangelogPort(short changelogID)
  {
    if (replServerPort[changelogID] == 0)
    {
      try
      {
        // Find  a free port for the replicationServer
        ServerSocket socket = TestCaseUtils.bindFreePort();
        replServerPort[changelogID] = socket.getLocalPort();
        socket.close();
      }
      catch(Exception e)
      {
        fail("Cannot retrieve a free port for replication server."
            + e.getMessage());
      }
    }
    return replServerPort[changelogID];
  }
  private String createEntry(UUID uid)
  {
    String user2dn = "uid=user"+uid+",ou=People," + baseDnStr;
    return new String(
        "dn: "+ user2dn + "\n"
        + "objectClass: top\n" + "objectClass: person\n"
        + "objectClass: organizationalPerson\n"
        + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
        + "homePhone: 951-245-7634\n"
        + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
        + "mobile: 027-085-0537\n"
        + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
        + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
        + "cn: Aaccf Amar2\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
        + "street: 17984 Thirteenth Street\n"
        + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 2\n"
        + "sn: Amar2\n" + "givenName: Aaccf2\n" + "postalCode: 85762\n"
        + "userPassword: password\n" + "initials: AA\n");
  }
  static protected ReplicationMessage createAddMsg(short serverId)
  {
    Entry personWithUUIDEntry = null;
    String user1entryUUID;
    String baseUUID = null;
    String user1dn;
    /*
     * Create a Change number generator to generate new changenumbers
     * when we need to send operation messages to the replicationServer.
     */
    ChangeNumberGenerator gen = new ChangeNumberGenerator(serverId, 0);
    user1entryUUID = "33333333-3333-3333-3333-333333333333";
    user1dn = "uid=user1,ou=People," + baseDnStr;
    String entryWithUUIDldif = "dn: "+ user1dn + "\n"
    + "objectClass: top\n" + "objectClass: person\n"
    + "objectClass: organizationalPerson\n"
    + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
    + "homePhone: 951-245-7634\n"
    + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
    + "mobile: 027-085-0537\n"
    + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
    + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
    + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
    + "street: 17984 Thirteenth Street\n"
    + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
    + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
    + "userPassword: password\n" + "initials: AA\n"
    + "entryUUID: " + user1entryUUID + "\n";
    try
    {
      personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif);
    }
    catch(Exception e)
    {
      fail(e.getMessage());
    }
    // Create and publish an update message to add an entry.
    AddMsg addMsg = new AddMsg(gen.newChangeNumber(),
        personWithUUIDEntry.getDN().toString(),
        user1entryUUID,
        baseUUID,
        personWithUUIDEntry.getObjectClassAttribute(),
        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
    return addMsg;
  }
  @Test(enabled=false)
  public void testMultiRS() throws Exception
  {
    String testCase = "testMultiRS";
    debugInfo("Starting " + testCase);
    ReplicationDomain.clearJEBackend(false,
        "userRoot",
        baseDn.toNormalizedString());
    debugInfo ("Creating 2 RS");
    replServer1 = createReplicationServer(changelog1ID, true, testCase);
    replServer2 = createReplicationServer(changelog2ID, true, testCase);
    replServer3 = createReplicationServer(changelog3ID, true, testCase);
    Thread.sleep(500);
    debugInfo("Connecting DS to replServer1");
    connectServer1ToChangelog(changelog1ID);
    Thread.sleep(1500);
    try
    {
      debugInfo("Connecting broker2 to replServer1");
      broker2 = openReplicationSession(baseDn,
          server2ID, 100, getChangelogPort(changelog1ID),
          1000, !emptyOldChanges);
      Thread.sleep(1000);
    }
    catch(SocketException se)
    {
      fail("Broker connection is expected to be accepted.");
    }
    try
    {
      debugInfo("Connecting broker3 to replServer2");
      broker3 = openReplicationSession(baseDn,
          server3ID, 100, getChangelogPort(changelog2ID),
          1000, !emptyOldChanges);
      Thread.sleep(1000);
    }
    catch(SocketException se)
    {
      fail("Broker connection is expected to be accepted.");
    }
    try
    {
      debugInfo("Connecting broker4 to replServer2");
      broker4 = openReplicationSession(baseDn,
          server4ID, 100, getChangelogPort(changelog2ID),
          1000, !emptyOldChanges);
      Thread.sleep(1000);
    }
    catch(SocketException se)
    {
      fail("Broker connection is expected to be accepted.");
    }
    // Do a bunch of change
    updatedEntries = newLDIFEntries();
    this.addTestEntriesToDB(updatedEntries);
    for (int i = 0; i<200; i++)
    {
      String ent1[] = { createEntry(UUID.randomUUID()) };
      this.addTestEntriesToDB(ent1);
    }
    for (int i=0; i<10; i++)
    {
      broker3.publish(createAddMsg(server3ID));
    }
    searchMonitor();
    debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
    disconnectFromReplServer(changelog1ID);
    debugInfo("Cleaning entries");
    postTest();
    debugInfo("Successfully ending " + testCase);
  }
  /**
   * Disconnect broker and remove entries from the local DB
   * @throws Exception
   */
  protected void postTest()
  {
    debugInfo("Post test cleaning.");
    // Clean brokers
    if (broker2 != null)
      broker2.stop();
    broker2 = null;
    if (broker3 != null)
      broker3.stop();
    broker3 = null;
    if (broker4 != null)
      broker4.stop();
    broker4 = null;
    if (replServer1 != null)
      replServer1.remove();
    if (replServer2 != null)
      replServer2.remove();
    if (replServer3 != null)
      replServer3.remove();
    replServer1 = null;
    replServer2 = null;
    replServer3 = null;
    super.cleanRealEntries();
    try
    {
      ReplicationDomain.clearJEBackend(false,
        replDomain.getBackend().getBackendID(),
        baseDn.toNormalizedString());
    }
    catch (Exception e) {}
  }
  @Test(enabled=true)
  public void MonitorTest() throws Exception
  {
    testMultiRS();
  }
  private static final ByteArrayOutputStream oStream =
    new ByteArrayOutputStream();
  private static final ByteArrayOutputStream eStream =
    new ByteArrayOutputStream();
  private void searchMonitor()
  {
    // test search as directory manager returns content
    String[] args3 =
    {
      "-h", "127.0.0.1",
      "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
      "-D", "cn=Directory Manager",
      "-w", "password",
      "-b", "cn=monitor",
      "-s", "sub",
      "(base-dn=*)"
    };
    oStream.reset();
    eStream.reset();
    int retVal =
      LDAPSearch.mainSearch(args3, false, oStream, eStream);
    String entries = oStream.toString();
    debugInfo("Entries:" + entries);
    try
    {
      assertEquals(retVal, 0, "Returned error: " + eStream);
      assertTrue(!entries.equalsIgnoreCase(""), "Returned entries: " + entries);
    }
    catch(Exception e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          stackTraceToSingleLineString(new Exception()));
      fail(e.getMessage());
    }
  }
}