mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
01.21.2008 40e2acfd1e9676f3b63385b15075bf1395d4543e
Fix 2598 - fixes for global replication monitoring
1 files added
13 files modified
1402 ■■■■ changed files
opends/src/messages/messages/replication.properties 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java 21 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java 199 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java 35 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java 50 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MonitorData.java 329 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 400 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 294 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 2 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 2 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java 14 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 47 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 4 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -252,7 +252,8 @@
NOTICE_SSL_SERVER_CON_ATTEMPT_ERROR_105=SSL connection attempt from %s (%s) \
  failed: %s
SEVERE_ERR_MISSING_REMOTE_MONITOR_DATA_106=Monitor data of remote servers \
 are missing due to an error in the retrieval process
 are missing due to an error in the retrieval process. Potentially a server \
 is too slow to provide its monitoring data over the protocol
SEVERE_ERR_PROCESSING_REMOTE_MONITOR_DATA_107=Monitor data of remote servers \
 are missing due to a processing error : %s
SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
@@ -339,4 +339,23 @@
  {
    return list.isEmpty();
  }
  /**
   * Make a duplicate of this state.
   * @return The duplicate of this state.
   */
  public ServerState duplicate()
  {
    ServerState newState = new ServerState();
    synchronized (this)
    {
      for (Short key  : list.keySet())
      {
        ChangeNumber change = list.get(key);
        Short id =  change.getServerId();
        newState.list.put(id,change);
      }
    }
    return newState;
  }
}
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -433,7 +433,7 @@
      ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
        maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
        halfRcvWindow * 2, heartbeatInterval, state,
        protocolVersion, generationId, isSslEncryption);
        protocolVersion, generationId, isSslEncryption, !keepConnection);
      session.publish(msg);
      /*
opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
@@ -65,15 +65,20 @@
  }
  /**
   * Data structure to manage the state of the replication server
   * and the state informations for the LDAP servers connected.
   * Data structure to manage the state of this replication server
   * and the state informations for the servers connected to it.
   *
   */
  class SubTopoMonitorData
  {
    ServerState replServerState;
    // This replication server DbState
    ServerState replServerDbState;
    // The data related to the LDAP servers connected to this RS
    HashMap<Short, ServerData> ldapStates =
      new HashMap<Short, ServerData>();
    // The data related to the RS servers connected to this RS
    HashMap<Short, ServerData> rsStates =
      new HashMap<Short, ServerData>();
  }
  SubTopoMonitorData data = new SubTopoMonitorData();;
@@ -93,9 +98,9 @@
   * Sets the state of the replication server.
   * @param state The state.
   */
  public void setReplServerState(ServerState state)
  public void setReplServerDbState(ServerState state)
  {
    data.replServerState = state;
    data.replServerDbState = state;
  }
  /**
@@ -103,20 +108,27 @@
   * @param serverId The serverID.
   * @param state The server state.
   * @param approxFirstMissingDate  The approximation of the date
   * of the older missing change.
   *
   * of the older missing change. null when none.
   * @param isLDAP Specifies whether the server is a LS or a RS
   */
  public void setLDAPServerState(short serverId, ServerState state,
      Long approxFirstMissingDate)
  public void setServerState(short serverId, ServerState state,
      Long approxFirstMissingDate, boolean isLDAP)
  {
    if (data.ldapStates == null)
    {
      data.ldapStates = new HashMap<Short, ServerData>();
    }
    if (data.rsStates == null)
    {
      data.rsStates = new HashMap<Short, ServerData>();
    }
    ServerData sd = new ServerData();
    sd.state = state;
    sd.approxFirstMissingDate = approxFirstMissingDate;
    data.ldapStates.put(serverId, sd);
    if (isLDAP)
      data.ldapStates.put(serverId, sd);
    else
      data.rsStates.put(serverId, sd);
  }
  /**
@@ -130,16 +142,37 @@
  }
  /**
   * Get the server state for the RS server with the provided serverId.
   * @param serverId The provided serverId.
   * @return The state.
   */
  public ServerState getRSServerState(short serverId)
  {
    return data.rsStates.get(serverId).state;
  }
  /**
   * Get the approximation of the date of the older missing change for the
   * LDAP Server with the provided server Id.
   * @param serverId The provided serverId.
   * @return The approximated state.
   */
  public Long getApproxFirstMissingDate(short serverId)
  public Long getLDAPApproxFirstMissingDate(short serverId)
  {
    return data.ldapStates.get(serverId).approxFirstMissingDate;
  }
  /**
   * Get the approximation of the date of the older missing change for the
   * RS Server with the provided server Id.
   * @param serverId The provided serverId.
   * @return The approximated state.
   */
  public Long getRSApproxFirstMissingDate(short serverId)
  {
    return data.rsStates.get(serverId).approxFirstMissingDate;
  }
  /**
   * Creates a new EntryMessage from its encoded form.
@@ -182,39 +215,51 @@
      try
      {
        ASN1Sequence s0 = ASN1Sequence.decodeAsSequence(encodedS);
        // loop on the servers
        for (ASN1Element el0 : s0.elements())
        {
          ServerState newState = new ServerState();
          short serverId = 0;
          Long outime = (long)0;
          boolean isLDAPServer = false;
          ASN1Sequence s1 = el0.decodeAsSequence();
          // loop on the list of CN of the state
          for (ASN1Element el1 : s1.elements())
          {
            ASN1OctetString o = el1.decodeAsOctetString();
            String s = o.stringValue();
            ChangeNumber cn = new ChangeNumber(s);
            if ((data.replServerState != null) && (serverId == 0))
            if ((data.replServerDbState != null) && (serverId == 0))
            {
              // we are on the first CN that is a fake CN to store the serverId
              // and the older update time
              serverId = cn.getServerId();
              outime = cn.getTime();
              isLDAPServer = (cn.getSeqnum()>0);
            }
            else
            {
              // we are on a normal CN
              newState.update(cn);
            }
          }
          // the first state is the replication state
          if (data.replServerState == null)
          if (data.replServerDbState == null)
          {
            data.replServerState = newState;
            // the first state is the replication state
            data.replServerDbState = newState;
          }
          else
          {
            // the next states are the server states
            ServerData sd = new ServerData();
            sd.state = newState;
            sd.approxFirstMissingDate = outime;
            data.ldapStates.put(serverId, sd);
            if (isLDAPServer)
              data.ldapStates.put(serverId, sd);
            else
              data.rsStates.put(serverId, sd);
          }
        }
      } catch(Exception e)
@@ -245,14 +290,17 @@
      ASN1Sequence stateElementSequence = new ASN1Sequence();
      ArrayList<ASN1Element> stateElementList = new ArrayList<ASN1Element>();
      // First loop computes the length
      /**
       * First loop computes the length
       */
      /* Put the serverStates ... */
      stateElementSequence = new ASN1Sequence();
      stateElementList = new ArrayList<ASN1Element>();
      /* first put the Replication Server state */
      ArrayList<ASN1OctetString> cnOctetList =
        data.replServerState.toASN1ArrayList();
        data.replServerDbState.toASN1ArrayList();
      ArrayList<ASN1Element> cnElementList = new ArrayList<ASN1Element>();
      for (ASN1OctetString soci : cnOctetList)
      {
@@ -288,6 +336,35 @@
        cnSequence = new ASN1Sequence(cnElementList);
        stateElementList.add(cnSequence);
      }
      // then the rs server data
      servers = data.rsStates.keySet();
      for (Short sid : servers)
      {
        // State
        ServerState statei = data.rsStates.get(sid).state;
        // First missing date
        Long outime =  data.rsStates.get(sid).approxFirstMissingDate;
        // retrieves the change numbers as an arrayList of ANSN1OctetString
        cnOctetList = statei.toASN1ArrayList();
        cnElementList = new ArrayList<ASN1Element>();
        // a fake changenumber helps storing the LDAP server ID
        // and the olderupdatetime
        ChangeNumber cn = new ChangeNumber(outime,0,sid);
        cnElementList.add(new ASN1OctetString(cn.toString()));
        // the changenumbers
        for (ASN1OctetString soci : cnOctetList)
        {
          cnElementList.add(soci);
        }
        cnSequence = new ASN1Sequence(cnElementList);
        stateElementList.add(cnSequence);
      }
      stateElementSequence.setElements(stateElementList);
      int seqLen = stateElementSequence.encode().length;
@@ -298,7 +375,9 @@
      // Allocate the array sized from the computed length
      byte[] resultByteArray = new byte[length];
      // Second loop build the array
      /**
       * Second loop really builds the array
       */
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
@@ -313,7 +392,7 @@
      /* first put the Replication Server state */
      cnOctetList =
        data.replServerState.toASN1ArrayList();
        data.replServerDbState.toASN1ArrayList();
      cnElementList = new ArrayList<ASN1Element>();
      for (ASN1OctetString soci : cnOctetList)
      {
@@ -322,7 +401,7 @@
      cnSequence = new ASN1Sequence(cnElementList);
      stateElementList.add(cnSequence);
      // then the LDAP server state
      // then the LDAP server datas
      servers = data.ldapStates.keySet();
      for (Short sid : servers)
      {
@@ -334,10 +413,10 @@
        cnElementList = new ArrayList<ASN1Element>();
        // a fake changenumber helps storing the LDAP server ID
        ChangeNumber cn = new ChangeNumber(outime,0,sid);
        ChangeNumber cn = new ChangeNumber(outime,1,sid);
        cnElementList.add(new ASN1OctetString(cn.toString()));
        // the changenumbers
        // the changenumbers that make the state
        for (ASN1OctetString soci : cnOctetList)
        {
          cnElementList.add(soci);
@@ -346,6 +425,33 @@
        cnSequence = new ASN1Sequence(cnElementList);
        stateElementList.add(cnSequence);
      }
      // then the RS server datas
      servers = data.rsStates.keySet();
      for (Short sid : servers)
      {
        ServerState statei = data.rsStates.get(sid).state;
        Long outime = data.rsStates.get(sid).approxFirstMissingDate;
        // retrieves the change numbers as an arrayList of ANSN1OctetString
        cnOctetList = statei.toASN1ArrayList();
        cnElementList = new ArrayList<ASN1Element>();
        // a fake changenumber helps storing the LDAP server ID
        ChangeNumber cn = new ChangeNumber(outime,0,sid);
        cnElementList.add(new ASN1OctetString(cn.toString()));
        // the changenumbers that make the state
        for (ASN1OctetString soci : cnOctetList)
        {
          cnElementList.add(soci);
        }
        cnSequence = new ASN1Sequence(cnElementList);
        stateElementList.add(cnSequence);
      }
      stateElementSequence.setElements(stateElementList);
      pos = addByteArray(stateElementSequence.encode(), resultByteArray, pos);
@@ -361,41 +467,62 @@
   * Get the state of the replication server that sent this message.
   * @return The state.
   */
  public ServerState getReplServerState()
  public ServerState getReplServerDbState()
  {
    return data.replServerState;
    return data.replServerDbState;
  }
  /**
   * Returns an iterator on the serverId of the connected LDAP servers.
   * @return The iterator.
   */
  public Iterator<Short> iterator()
  public Iterator<Short> ldapIterator()
  {
    return data.ldapStates.keySet().iterator();
  }
  /**
   * Returns an iterator on the serverId of the connected RS servers.
   * @return The iterator.
   */
  public Iterator<Short> rsIterator()
  {
    return data.rsStates.keySet().iterator();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String toString()
  {
    String stateS = " RState:";
    stateS += "/" + data.replServerState.toString();
    stateS += " LDAPStates:";
    Iterator<ServerData> it = data.ldapStates.values().iterator();
    while (it.hasNext())
    String stateS = "\nRState:[";
    stateS += data.replServerDbState.toString();
    stateS += "]";
    stateS += "\nLDAPStates:[";
    for (Short sid : data.ldapStates.keySet())
    {
      ServerData sd = it.next();
      stateS += "/ state=" + sd.state.toString()
      + " afmd=" + sd.approxFirstMissingDate + "] ";
      ServerData sd = data.ldapStates.get(sid);
      stateS +=
               "\n[LSstate("+ sid + ")=" +
                sd.state.toString() + "]" +
                " afmd=" + sd.approxFirstMissingDate + "]";
    }
    stateS += "\nRSStates:[";
    for (Short sid : data.rsStates.keySet())
    {
      ServerData sd = data.rsStates.get(sid);
      stateS +=
               "\n[RSState("+ sid + ")=" +
               sd.state.toString() + "]" +
               " afmd=" + sd.approxFirstMissingDate + "]";
    }
    String me = this.getClass().getCanonicalName() +
    " sender=" + this.senderID +
    "[ sender=" + this.senderID +
    " destination=" + this.destination +
    " states=" + stateS +
    " data=[" + stateS + "]" +
    "]";
    return me;
  }
opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -54,6 +54,7 @@
  private int maxReceiveDelay;
  private int maxSendDelay;
  private int windowSize;
  private boolean handshakeOnly;
  private ServerState serverState = null;
  /**
@@ -87,6 +88,8 @@
   * @param generationId The generationId for this server.
   * @param sslEncryption Whether to continue using SSL to encrypt messages
   *                      after the start messages have been exchanged.
   * @param handshakeOnly Whether this message is only to get an handshake
   *                      with the server or not.
   */
  public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
                            int maxReceiveQueue, int maxSendDelay,
@@ -95,7 +98,8 @@
                            ServerState serverState,
                            short protocolVersion,
                            long generationId,
                            boolean sslEncryption)
                            boolean sslEncryption,
                            boolean handshakeOnly)
  {
    super(protocolVersion, generationId);
@@ -109,6 +113,7 @@
    this.heartbeatInterval = heartbeatInterval;
    this.sslEncryption = sslEncryption;
    this.serverState = serverState;
    this.handshakeOnly = handshakeOnly;
    try
    {
@@ -209,10 +214,19 @@
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the handshakeOnly flag
       */
      length = getNextLength(in, pos);
      handshakeOnly = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
      * read the ServerState
      */
      serverState = new ServerState(in, pos, in.length-1);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
@@ -322,6 +336,8 @@
      byte[] byteSSLEncryption =
                     String.valueOf(sslEncryption).getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      byte[] byteHandshakeOnly =
        String.valueOf(handshakeOnly).getBytes("UTF-8");
      int length = byteDn.length + 1 + byteServerId.length + 1 +
                   byteServerUrl.length + 1 +
@@ -332,6 +348,7 @@
                   byteWindowSize.length + 1 +
                   byteHeartbeatInterval.length + 1 +
                   byteSSLEncryption.length + 1 +
                   byteHandshakeOnly.length + 1 +
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
@@ -358,6 +375,8 @@
      pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
      pos = addByteArray(byteHandshakeOnly, resultByteArray, pos);
      pos = addByteArray(byteServerState, resultByteArray, pos);
      return resultByteArray;
@@ -401,4 +420,16 @@
  {
    return sslEncryption;
  }
  /**
   * Get the SSL encryption value for the ldap server that created the
   * message.
   *
   * @return The SSL encryption value for the ldap server that created the
   *         message.
   */
  public boolean isHandshakeOnly()
  {
    return handshakeOnly;
  }
}
opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -200,16 +200,6 @@
  @Override
  public ArrayList<Attribute> getMonitorData()
  {
    if (debugEnabled())
      TRACER.debugInfo(
          "In " +
          this.replServerHandler.getDomain().getReplicationServer().
          getMonitorInstanceName()+
          " LWSH for remote server " + this.serverId +
          " connected to:" + this.replServerHandler.getMonitorInstanceName() +
      " getMonitor data");
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    attributes.add(new Attribute("server-id",
@@ -220,12 +210,12 @@
        replServerHandler.getMonitorInstanceName()));
    // Retrieves the topology counters
    MonitorData md;
    try
    {
      rsDomain.retrievesRemoteMonitorData();
      md = rsDomain.getMonitorData();
      // Compute the latency for the current SH
      ServerState remoteState = rsDomain.getServerState(serverId);
      ServerState remoteState = md.getLDAPServerState(serverId);
      if (remoteState == null)
      {
        remoteState = new ServerState();
@@ -241,29 +231,39 @@
      {
        values.add(new AttributeValue(type,str));
      }
      if (values.size() == 0)
      {
        values.add(new AttributeValue(type,"unknown"));
      }
      Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
      attributes.add(attr);
      // add the latency attribute to our monitor data
      // Compute the latency for the current SH
      int missingChanges = rsDomain.getMissingChanges(remoteState);
      attributes.add(new Attribute("missing-changes",
          String.valueOf(missingChanges)));
      // Add the oldest missing update
      Long olderUpdateTime = rsDomain.getApproxFirstMissingDate(serverId);
      if (olderUpdateTime != null)
      // Oldest missing update
      Long approxFirstMissingDate=md.getApproxFirstMissingDate(serverId);
      if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
      {
        Date date = new Date(olderUpdateTime);
        Date date = new Date(approxFirstMissingDate);
        attributes.add(new Attribute("approx-older-change-not-synchronized",
          date.toString()));
        attributes.add(
          new Attribute("approx-older-change-not-synchronized-millis",
          String.valueOf(olderUpdateTime)));
            new Attribute("approx-older-change-not-synchronized-millis",
            String.valueOf(approxFirstMissingDate)));
      }
      // Missing changes
      long missingChanges = md.getMissingChanges(serverId);
      attributes.add(new Attribute("missing-changes",
          String.valueOf(missingChanges)));
      // Replication delay
      long delay = md.getApproxDelay(serverId);
      attributes.add(new Attribute("approximate-delay",
          String.valueOf(delay)));
    }
    catch(Exception e)
    {
      // TODO: improve the log
      // We failed retrieving the remote monitor data.
      attributes.add(new Attribute("error",
        stackTraceToSingleLineString(e)));
opends/src/server/org/opends/server/replication/server/MonitorData.java
New file
@@ -0,0 +1,329 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.util.TimeThread;
/**
 * This class defines the Monitor Data that are consolidated across the
 * whole replication topology.
 */
public class MonitorData
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   *
   * - For each server, the max (most recent) CN produced
   *
   * - For each server, its state i.e. the last processed from of each
   *   other LDAP server.
   *   The change latency (missing changes) will be
   *   the difference between the max above and the state here
   *
   * - For each server, the date of the first missing change.
   *   The time latency (delay) will be the difference between now and the
   *   date of the first missing change.
   */
  /* The date of the last time they have been elaborated */
  private long buildDate = 0;
  // For each LDAP server, its server state
  private ConcurrentHashMap<Short, ServerState> LDAPStates =
    new ConcurrentHashMap<Short, ServerState>();
  // For each LDAP server, the last(max) CN it published
  private ConcurrentHashMap<Short, ChangeNumber> maxCNs =
    new ConcurrentHashMap<Short, ChangeNumber>();
  // For each LDAP server, an approximation of the date of the first missing
  // change
  private ConcurrentHashMap<Short, Long> fmd =
    new ConcurrentHashMap<Short, Long>();
  private ConcurrentHashMap<Short, Long> missingChanges =
    new ConcurrentHashMap<Short, Long>();
  // For each RS server, an approximation of the date of the first missing
  // change
  private ConcurrentHashMap<Short, Long> fmRSDate =
    new ConcurrentHashMap<Short, Long>();
  /**
   * Get an approximation of the latency delay of the replication.
   * @param serverId The server ID.
   * @return The delay
   */
  public long getApproxDelay(short serverId)
  {
    Long afmd = fmd.get(serverId);
    if ((afmd != null) && (afmd>0))
      return ((this.getBuildDate() - afmd)/1000);
    else
      return 0;
  }
  /**
   * Get an approximation of the date of the first missing update.
   * @param serverId The server ID.
   * @return The date.
   */
  public long getApproxFirstMissingDate(short serverId)
  {
    Long res;
    if ((res = fmd.get(serverId)) != null)
      return res;
    return 0;
  }
  /**
   * Get the number of missing changes.
   * @param serverId The server ID.
   * @return The number of missing changes.
   */
  public long getMissingChanges(short serverId)
  {
    Long res = missingChanges.get(serverId);
    if (res==null)
      return 0;
    else
      return res;
  }
  /**
   * Build the monitor data that are computed from the collected ones.
   */
  public void completeComputing()
  {
    String mds = "";
    // Computes the missing changes counters
    // For each LSi ,
    //   Regarding each other LSj
    //    Sum the difference : max(LSj) - state(LSi)
    Iterator<Short> lsiStateItr = this.LDAPStates.keySet().iterator();
    while (lsiStateItr.hasNext())
    {
      Short lsiSid = lsiStateItr.next();
      ServerState lsiState = this.LDAPStates.get(lsiSid);
      Long lsiMissingChanges = (long)0;
      if (lsiState != null)
      {
        Iterator<Short> lsjMaxItr = this.maxCNs.keySet().iterator();
        while (lsjMaxItr.hasNext())
        {
          Short lsjSid = lsjMaxItr.next();
          ChangeNumber lsjMaxCN = this.maxCNs.get(lsjSid);
          ChangeNumber lsiLastCN = lsiState.getMaxChangeNumber(lsjSid);
          int missingChangesLsiLsj =
            ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN);
          mds +=
            "+ diff("+lsjMaxCN+"-"
                     +lsiLastCN+")="+missingChangesLsiLsj;
          lsiMissingChanges += missingChangesLsiLsj;
        }
      }
      mds += "=" + lsiMissingChanges;
      this.missingChanges.put(lsiSid,lsiMissingChanges);
      if (debugEnabled())
        TRACER.debugInfo(
          "Complete monitor data : Missing changes ("+ lsiSid +")=" + mds);
    }
    this.setBuildDate(TimeThread.getTime());
  }
  /**
   * Returns a <code>String</code> object representing this
   * object's value.
   * @return  a string representation of the value of this object in
   */
  public String toString()
  {
    String mds = "Monitor data=\n";
    mds+= "Build date=" + this.getBuildDate();
    // RS data
    Iterator<Short> rsite = fmRSDate.keySet().iterator();
    while (rsite.hasNext())
    {
      Short sid = rsite.next();
      mds += "\nRSData(" + sid + ")=\t "+ "afmd=" + fmRSDate.get(sid);
    }
    // maxCNs
    Iterator<Short> itc = maxCNs.keySet().iterator();
    while (itc.hasNext())
    {
      Short sid = itc.next();
      ChangeNumber cn = maxCNs.get(sid);
      mds += "\nmaxCNs(" + sid + ")= " + cn.toString();
    }
    // LDAP data
    Iterator<Short> lsite = LDAPStates.keySet().iterator();
    while (lsite.hasNext())
    {
      Short sid = lsite.next();
      ServerState ss = LDAPStates.get(sid);
      mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString()
      + "] afmd=" + this.getApproxFirstMissingDate(sid);
      if (getBuildDate()>0)
      {
        mds += " missingDelay=" + this.getApproxDelay(sid);
      }
      mds +=" missingCount=" + missingChanges.get(sid);
    }
    //
    mds += "--";
    return mds;
  }
  /**
   * Sets the build date of the data.
   * @param buildDate The date.
   */
  public void setBuildDate(long buildDate)
  {
    this.buildDate = buildDate;
  }
  /**
   * Returns the build date of the data.
   * @return The date.
   */
  public long getBuildDate()
  {
    return buildDate;
  }
  /**
   * From a provided state, sets the max CN of the monitor data.
   * @param state the provided state.
   */
  public void setMaxCNs(ServerState state)
  {
    Iterator<Short> it = state.iterator();
    while (it.hasNext())
    {
      short sid = it.next();
      ChangeNumber newCN = state.getMaxChangeNumber(sid);
      setMaxCN(sid, newCN);
    }
  }
  /**
   * For the provided serverId, sets the provided CN as the max if
   * it is newer than the current max.
   * @param serverId the provided serverId
   * @param newCN the provided new CN
   */
  public void setMaxCN(short serverId, ChangeNumber newCN)
  {
    if (newCN==null) return;
    ChangeNumber currentMaxCN = maxCNs.get(serverId);
    if (currentMaxCN == null)
    {
      maxCNs.put(serverId, newCN);
    }
    else
    {
      if (newCN.newer(currentMaxCN))
        maxCNs.replace(serverId, newCN);
    }
  }
  /**
   * Get the highest know change number of the LDAP server with the provided
   * serverId.
   * @param serverId The server ID.
   * @return The highest change number.
   */
  public ChangeNumber getMaxCN(short serverId)
  {
    return maxCNs.get(serverId);
  }
  /**
   * Get the state of the LDAP server with the provided serverId.
   * @param serverId The server ID.
   * @return The server state.
   */
  public ServerState getLDAPServerState(short serverId)
  {
    return LDAPStates.get(serverId);
  }
  /**
   * Set the state of the LDAP server with the provided serverId.
   * @param serverId The server ID.
   * @param state The server state.
   */
  public void setLDAPServerState(short serverId, ServerState state)
  {
    LDAPStates.put(serverId, state);
  }
  /**
   * Set the state of the LDAP server with the provided serverId.
   * @param serverId The server ID.
   * @param newFmd The first missing date.
   */
  public void setFirstMissingDate(short serverId, Long newFmd)
  {
    if (newFmd==null) return;
    Long currentfmd = fmd.get(serverId);
    if (currentfmd==null)
    {
      fmd.put(serverId, newFmd);
    }
    else
    {
      if ((newFmd!=0) && (newFmd<currentfmd))
        fmd.replace(serverId, newFmd);
    }
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,7 +37,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -129,8 +128,8 @@
  /* Monitor data management */
  // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
  private long remoteMonitorDataLifeTime = 500;
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  private long monitorDataLifeTime = 500;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
@@ -139,21 +138,11 @@
   */
  Semaphore remoteMonitorResponsesSemaphore;
  /* The date of the last time they have been elaborated */
  private long validityDate = 0;
  // For each LDAP server, its server state
  private HashMap<Short, ServerState> LDAPStates =
    new HashMap<Short, ServerState>();
  // For each LDAP server, the last CN it published
  private HashMap<Short, ChangeNumber> maxCNs =
    new HashMap<Short, ChangeNumber>();
  // For each LDAP server, an approximation of the date of the first missing
  // change
  private HashMap<Short, Long> approxFirstMissingDate =
    new HashMap<Short, Long>();
  /**
   * The monitor data consolidated over the topology.
   */
  private  MonitorData monitorData = new MonitorData();
  private  MonitorData wrkMonitorData;
  /**
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
@@ -166,13 +155,7 @@
  {
    this.baseDn = baseDn;
    this.replicationServer = replicationServer;
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        " Created Cache for " + baseDn + " " +
        stackTraceToSingleLineString(new Exception()));
}
  }
  /**
   * Add an update that has been received to the list of
@@ -366,6 +349,10 @@
        {
          replicationServers.remove(handler.getServerId());
          handler.stopHandler();
          // Update the remote replication servers with our list
          // of connected LDAP servers
          sendReplServerInfo();
        }
      }
      else
@@ -374,12 +361,12 @@
        {
          connectedServers.remove(handler.getServerId());
          handler.stopHandler();
          // Update the remote replication servers with our list
          // of connected LDAP servers
          sendReplServerInfo();
        }
      }
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
  }
  /**
@@ -578,7 +565,8 @@
   *
   * @param serverId Identifier of the server for which the iterator is created.
   * @param changeNumber Starting point for the iterator.
   * @return the created ReplicationIterator.
   * @return the created ReplicationIterator. Null when no DB is available
   * for the provided server Id.
   */
  public ReplicationIterator getChangelogIterator(short serverId,
                    ChangeNumber changeNumber)
@@ -591,7 +579,8 @@
    {
      return handler.generateIterator(changeNumber);
    }
    catch (Exception e) {
    catch (Exception e)
    {
     return null;
    }
  }
@@ -759,6 +748,7 @@
   */
  public void process(RoutableMessage msg, ServerHandler senderHandler)
  {
    // Test the message for which a ReplicationServer is expected
    // to be the destination
    if (msg.getDestination() == this.replicationServer.getServerId())
@@ -779,20 +769,33 @@
              replServerMonitorRequestMsg.getDestination(),
              replServerMonitorRequestMsg.getsenderID());
        // Populate the RS state in the msg from the DbState
        monitorMsg.setReplServerState(this.getDbServerState());
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
        // - the server state
        // - the older missing change
        for (ServerHandler lsh : this.connectedServers.values())
        {
          monitorMsg.setLDAPServerState(
          monitorMsg.setServerState(
              lsh.getServerId(),
              lsh.getServerState(),
              lsh.getApproxFirstMissingDate());
              lsh.getApproxFirstMissingDate(),
              true);
        }
        // Same for the connected RS
        for (ServerHandler rsh : this.replicationServers.values())
        {
          monitorMsg.setServerState(
              rsh.getServerId(),
              rsh.getServerState(),
              rsh.getApproxFirstMissingDate(),
              false);
        }
        // Populate the RS state in the msg from the DbState
        monitorMsg.setReplServerDbState(this.getDbServerState());
        try
        {
          senderHandler.send(monitorMsg);
@@ -1305,118 +1308,135 @@
      }
    }
    /*
    /* =======================
     * Monitor Data generation
     * =======================
     */
    /**
     * Retrieves the remote monitor data.
     *
     * Retrieves the global monitor data.
     * @return The monitor data.
     * @throws DirectoryException When an error occurs.
     */
    protected void retrievesRemoteMonitorData()
    synchronized protected MonitorData getMonitorData()
      throws DirectoryException
    {
      if (validityDate > TimeThread.getTime())
      if (monitorData.getBuildDate() + monitorDataLifeTime
          > TimeThread.getTime())
      {
        // The current data are still valid. No need to renew them.
        return;
        if (debugEnabled())
          TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
       // The current data are still valid. No need to renew them.
        // FIXME
        return null;
      }
      // Clean
      this.LDAPStates.clear();
      this.maxCNs.clear();
      // Init the maxCNs of our direct LDAP servers from our own dbstate
      for (ServerHandler rs : connectedServers.values())
      wrkMonitorData = new MonitorData();
      synchronized(wrkMonitorData)
      {
        short serverID = rs.getServerId();
        ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
        if (cn == null)
        {
          // we have nothing in db for that server
          cn = new ChangeNumber(0, 0 , serverID);
        }
        this.maxCNs.put(serverID, cn);
      }
        if (debugEnabled())
          TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn + " Computing monitor data ");
      ServerState replServerState = this.getDbServerState();
      Iterator<Short> it = replServerState.iterator();
      while (it.hasNext())
      {
        short sid = it.next();
        ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
        ChangeNumber maxCN = this.maxCNs.get(sid);
        if ((maxCN != null) && (receivedCN.newer(maxCN)))
        // Let's process our directly connected LSes
        // - in the ServerHandler for a given LS1, the stored state contains :
        //   - the max CN produced by LS1
        //   - the last CN consumed by LS1 from LS2..n
        // - in the RSdomain/dbHandler, the built-in state contains :
        //   - the max CN produced by each server
        // So for a given LS connected we can take the state and the max from
        // the LS/state.
        for (ServerHandler directlsh : connectedServers.values())
        {
          // We found a newer one
          this.maxCNs.remove(sid);
          this.maxCNs.put(sid, receivedCN);
          short serverID = directlsh.getServerId();
          // the state comes from the state stored in the SH
          ServerState directlshState = directlsh.getServerState().duplicate();
          // the max CN sent by that LS also comes from the SH
          ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
          if (maxcn == null)
          {
            // This directly connected LS has never produced any change
            maxcn = new ChangeNumber(0, 0 , serverID);
          }
          wrkMonitorData.setMaxCN(serverID, maxcn);
          wrkMonitorData.setLDAPServerState(serverID, directlshState);
          wrkMonitorData.setFirstMissingDate(serverID, directlsh.
                                             getApproxFirstMissingDate());
        }
        // Then initialize the max CN for the LS that produced something
        // - from our own local db state
        // - whatever they are directly or undirectly connected
        ServerState dbServerState = getDbServerState();
        Iterator<Short> it = dbServerState.iterator();
        while (it.hasNext())
        {
          short sid = it.next();
          ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
          wrkMonitorData.setMaxCN(sid, storedCN);
        }
        // Now we have used all available local informations
        // and we need the remote ones.
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            " baseDn=" + baseDn + " Local monitor data: " +
            wrkMonitorData.toString());
      }
      // Send Request to the other Replication Servers
      if (remoteMonitorResponsesSemaphore == null)
      {
        remoteMonitorResponsesSemaphore = new Semaphore(
            replicationServers.size() -1);
        sendMonitorDataRequest();
        remoteMonitorResponsesSemaphore = new Semaphore(0);
        short requestCnt = sendMonitorDataRequest();
        // Wait reponses from them or timeout
        waitMonitorDataResponses(replicationServers.size());
        waitMonitorDataResponses(requestCnt);
      }
      else
      {
        // The processing of renewing the monitor cache is already running
        // We'll make it sleeping until the end
        // TODO: unit test for this case.
        while (remoteMonitorResponsesSemaphore!=null)
        {
          waitMonitorDataResponses(1);
        }
      }
      // Now we have the expected answers of an error occured
      validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
      wrkMonitorData.completeComputing();
      if (debugEnabled())
      // Store the new computed data as the reference
      synchronized(monitorData)
      {
        debugMonitorData();
      }
    }
    private void debugMonitorData()
    {
      String mds = " Monitor data=";
      Iterator<Short> ite = LDAPStates.keySet().iterator();
      while (ite.hasNext())
      {
        Short sid = ite.next();
        ServerState ss = LDAPStates.get(sid);
        mds += " LDAPState(" + sid + ")=" + ss.toString();
      }
      Iterator<Short> itc = maxCNs.keySet().iterator();
      while (itc.hasNext())
      {
        Short sid = itc.next();
        ChangeNumber cn = maxCNs.get(sid);
        mds += " maxCNs(" + sid + ")=" + cn.toString();
      }
      mds += "--";
      TRACER.debugInfo(
        // Now we have the expected answers or an error occured
        monitorData = wrkMonitorData;
        wrkMonitorData = null;
        if (debugEnabled())
          TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          mds);
          " baseDn=" + baseDn + " *** Computed MonitorData: " +
          monitorData.toString());
      }
      return monitorData;
    }
    /**
     * Sends a MonitorRequest message to all connected RS.
     * @return the number of requests sent.
     * @throws DirectoryException when a problem occurs.
     */
    protected void sendMonitorDataRequest()
    protected short sendMonitorDataRequest()
      throws DirectoryException
    {
      short sent=0;
      try
      {
        for (ServerHandler rs : replicationServers.values())
@@ -1425,6 +1445,7 @@
            MonitorRequestMessage(this.replicationServer.getServerId(),
              rs.getServerId());
          rs.send(msg);
          sent++;
        }
      }
      catch(Exception e)
@@ -1434,6 +1455,7 @@
        throw new DirectoryException(ResultCode.OTHER,
            message, e);
      }
      return sent;
    }
    /**
@@ -1446,21 +1468,30 @@
    {
      try
      {
        if (debugEnabled())
          TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDn=" + baseDn +
          " waiting for " + expectedResponses
          + " expected monitor messages");
        boolean allPermitsAcquired =
          remoteMonitorResponsesSemaphore.tryAcquire(
              expectedResponses,
              (long) 500, TimeUnit.MILLISECONDS);
              (long) 5000, TimeUnit.MILLISECONDS);
        if (!allPermitsAcquired)
        {
          logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
          // FIXME let's go on in best effort even with limited data received.
        }
        else
        {
          if (debugEnabled())
            TRACER.debugInfo(
            "In " + this.replicationServer.getMonitorInstanceName() +
            "Successfully received all " + replicationServers.size()
            " baseDn=" + baseDn +
            " Successfully received all " + expectedResponses
            + " expected monitor messages");
        }
      }
@@ -1482,48 +1513,94 @@
     */
    public void receivesMonitorDataResponse(MonitorMessage msg)
    {
      if (debugEnabled())
        TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        "Receiving " + msg + " from " + msg.getsenderID() +
        remoteMonitorResponsesSemaphore);
      if (remoteMonitorResponsesSemaphore == null)
      {
        // Ignoring the remote monitor data because an error occured previously
        // FIXME
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
            "In " + this.replicationServer.getMonitorInstanceName() +
            "Receiving " + msg + " from " + msg.getsenderID() +
            " remoteMonitorResponsesSemaphore should not be null"));
        // Ignoring the remote monitor data because an error occured
        // previously
        return;
      }
      try
      {
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN accross the RSes
        ServerState replServerState = msg.getReplServerState();
        Iterator<Short> it = replServerState.iterator();
        while (it.hasNext())
        synchronized(wrkMonitorData)
        {
          short sid = it.next();
          ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
          ChangeNumber maxCN = this.maxCNs.get(sid);
          if (receivedCN.newer(maxCN))
          {
            // We found a newer one
            this.maxCNs.remove(sid);
            this.maxCNs.put(sid, receivedCN);
          }
        }
          // Here is the RS state : list <serverID, lastChangeNumber>
          // For each LDAP Server, we keep the max CN accross the RSes
          ServerState replServerState = msg.getReplServerDbState();
          wrkMonitorData.setMaxCNs(replServerState);
        // Store the LDAP servers states
        Iterator<Short> sidIterator = msg.iterator();
        while (sidIterator.hasNext())
        {
          short sid = sidIterator.next();
          ServerState ss = msg.getLDAPServerState(sid);
          this.LDAPStates.put(sid, ss);
          this.approxFirstMissingDate.put(sid,
              msg.getApproxFirstMissingDate(sid));
          // Store the remote LDAP servers states
          Iterator<Short> lsidIterator = msg.ldapIterator();
          while (lsidIterator.hasNext())
          {
            short sid = lsidIterator.next();
            wrkMonitorData.setLDAPServerState(sid,
                msg.getLDAPServerState(sid).duplicate());
            wrkMonitorData.setFirstMissingDate(sid,
                msg.getLDAPApproxFirstMissingDate(sid));
          }
          // Process the latency reported by the remote RSi on its connections
          // to the other RSes
          Iterator<Short> rsidIterator = msg.rsIterator();
          while (rsidIterator.hasNext())
          {
            short rsid = rsidIterator.next();
            if (rsid == replicationServer.getServerId())
            {
              // this is the latency of the remote RSi regarding the current RS
              // let's update the fmd of my connected LS
              for (ServerHandler connectedlsh : connectedServers.values())
              {
                short connectedlsid = connectedlsh.getServerId();
                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
                wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
              }
            }
            else
            {
              // this is the latency of the remote RSi regarding another RSj
              // let's update the latency of the LSes connected to RSj
              ServerHandler rsjHdr = replicationServers.get(rsid);
              for(short remotelsid : rsjHdr.getConnectedServerIds())
              {
                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
                wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
              }
            }
          }
          if (debugEnabled())
          {
            if (debugEnabled())
              TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              " baseDn=" + baseDn +
              " Processed msg from " + msg.getsenderID() +
              " New monitor data: " + wrkMonitorData.toString());
          }
        }
        // Decreases the number of expected responses and potentially
        // wakes up the waiting requestor thread.
        remoteMonitorResponsesSemaphore.release();
      }
      catch (Exception e)
      {
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
            stackTraceToSingleLineString(e)));
        // If an exception occurs while processing one of the expected message,
        // the processing is aborted and the waiting thread is awoke.
        remoteMonitorResponsesSemaphore.notifyAll();
@@ -1531,65 +1608,6 @@
    }
    /**
     * Get the state of the LDAP server with the provided serverId.
     * @param serverId The server ID.
     * @return The server state.
     */
    public ServerState getServerState(short serverId)
    {
      return LDAPStates.get(serverId);
    }
    /**
     * Get the highest know change number of the LDAP server with the provided
     * serverId.
     * @param serverId The server ID.
     * @return The highest change number.
     */
    public ChangeNumber getMaxCN(short serverId)
    {
      return maxCNs.get(serverId);
    }
    /**
     * Get an approximation of the date of the oldest missing changes.
     * serverId.
     * @param serverId The server ID.
     * @return The approximation of the date of the oldest missing change.
     */
    public Long getApproxFirstMissingDate(short serverId)
    {
      return approxFirstMissingDate.get(serverId);
    }
    /**
     * Get the number of missing change for the server with the provided state.
     * @param state The provided server state.
     * @return The number of missing changes.
     */
    public int getMissingChanges(ServerState state)
    {
      // Traverse the max Cn transmitted by each server
      // For each server, get the highest CN know from the current server
      // Sum the difference betwenn the max and the last
      int missingChanges = 0;
      Iterator<Short> itc = maxCNs.keySet().iterator();
      while (itc.hasNext())
      {
        Short sid = itc.next();
        ChangeNumber maxCN = maxCNs.get(sid);
        ChangeNumber last = state.getMaxChangeNumber(sid);
        if (last == null)
        {
          last = new ChangeNumber(0,0, sid);
        }
        int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
        missingChanges += missingChangesFromSID;
      }
      return missingChanges;
    }
    /**
     * Set the purge delay on all the db Handlers for this Domain
     * of Replicaiton.
     *
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -45,6 +45,7 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -124,12 +125,12 @@
  /**
   * When this Handler is connected to a remote replication server
   * When this Handler is related to a remote replication server
   * this collection will contain as many elements as there are
   * LDAP servers connected to the remote replication server.
   */
  private List<LightweightServerHandler>
     remoteLDAPservers = new ArrayList<LightweightServerHandler>();
  private Map<Short, LightweightServerHandler> connectedServers =
    new ConcurrentHashMap<Short, LightweightServerHandler>();
  /**
   * The time in milliseconds between heartbeats from the replication
@@ -200,6 +201,8 @@
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
    long localGenerationId = -1;
    boolean handshakeOnly = false;
    try
    {
      if (baseDn != null)
@@ -244,6 +247,8 @@
        maxSendQueue = receivedMsg.getMaxSendQueue();
        heartbeatInterval = receivedMsg.getHeartbeatInterval();
        handshakeOnly = receivedMsg.isHandshakeOnly();
        // The session initiator decides whether to use SSL.
        sslEncryption = receivedMsg.getSSLEncryption();
@@ -524,60 +529,70 @@
      replicationServerDomain = replicationServer.
              getReplicationServerDomain(this.baseDn,true);
      boolean started;
      if (serverIsLDAPserver)
      if (!handshakeOnly)
      {
        started = replicationServerDomain.startServer(this);
      }
      else
      {
        started = replicationServerDomain.startReplicationServer(this);
      }
      if (started)
      {
        // sendWindow MUST be created before starting the writer
        sendWindow = new Semaphore(sendWindowSize);
        writer = new ServerWriter(session, serverId,
                this, replicationServerDomain);
        reader = new ServerReader(session, serverId,
                this, replicationServerDomain);
        reader.start();
        writer.start();
        // Create a thread to send heartbeat messages.
        if (heartbeatInterval > 0)
        boolean started;
        if (serverIsLDAPserver)
        {
          heartbeatThread = new HeartbeatThread(
              "replication Heartbeat to " + serverURL +
              " for " + this.baseDn,
              session, heartbeatInterval/3);
          heartbeatThread.start();
          started = replicationServerDomain.startServer(this);
        }
        else
        {
          started = replicationServerDomain.startReplicationServer(this);
        }
        DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
        DirectoryServer.registerMonitorProvider(this);
      }
      else
      {
        // the connection is not valid, close it.
        try
        if (started)
        {
          if (debugEnabled())
          // sendWindow MUST be created before starting the writer
          sendWindow = new Semaphore(sendWindowSize);
          writer = new ServerWriter(session, serverId,
              this, replicationServerDomain);
          reader = new ServerReader(session, serverId,
              this, replicationServerDomain);
          reader.start();
          writer.start();
          // Create a thread to send heartbeat messages.
          if (heartbeatInterval > 0)
          {
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + " RS failed to start locally " +
              " the connection from serverID="+serverId);
            heartbeatThread = new HeartbeatThread(
                "replication Heartbeat to " + serverURL +
                " for " + this.baseDn,
                session, heartbeatInterval/3);
            heartbeatThread.start();
          }
          session.close();
        } catch (IOException e1)
        {
          // ignore
          DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
          DirectoryServer.registerMonitorProvider(this);
        }
        else
        {
          // the connection is not valid, close it.
          try
          {
            if (debugEnabled())
            {
              TRACER.debugInfo("In " +
                  replicationServerDomain.getReplicationServer().
                  getMonitorInstanceName() + " RS failed to start locally " +
                  " the connection from serverID="+serverId);
            }
            session.close();
          } catch (IOException e1)
          {
            // ignore
          }
        }
      }
      else
      {
        // For a hanshakeOnly connection, let's only create a reader
        // in order to detect the connection closure.
        reader = new ServerReader(session, serverId,
            this, replicationServerDomain);
        reader.start();
      }
    }
    catch (Exception e)
@@ -842,22 +857,22 @@
  /**
   * Get the age of the older change that has not yet been replicated
   * to the server handled by this ServerHandler.
   *
   * @return The age if the older change has not yet been replicated
   *         to the server handled by this ServerHandler.
   */
  public Long getApproxFirstMissingDate()
  {
    // Get the older CN received
    // From it, get the next sequence number
    // Get the CN for the next sequence number
    // If not present in the local RS db,
    // then approximate with the older update time
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN == null)
      return null;
    Long result = (long)0;
    return olderUpdateCN.getTime();
    // Get the older CN received
    ChangeNumber olderUpdateCN = getOlderUpdateCN();
    if (olderUpdateCN != null)
    {
      // If not present in the local RS db,
      // then approximate with the older update time
      result=olderUpdateCN.getTime();
    }
    return result;
  }
  /**
@@ -874,29 +889,82 @@
  /**
   * Get the older Change Number for that server.
   * Returns null when the queue is empty.
   * @return The older change number.
   */
  public ChangeNumber getOlderUpdateCN()
  {
    ChangeNumber result = null;
    synchronized (msgQueue)
    {
      if (isFollowing())
      {
        if (msgQueue.isEmpty())
          return null;
        UpdateMessage msg = msgQueue.first();
        return msg.getChangeNumber();
        {
          result=null;
        }
        else
        {
          UpdateMessage msg = msgQueue.first();
          result = msg.getChangeNumber();
        }
      }
      else
      {
        if (lateQueue.isEmpty())
          return null;
        {
          // isFollowing is false AND lateQueue is empty
          // We may be at the very moment when the writer has emptyed the
          // lateQueue when it sent the last update. The writer will fill again
          // the lateQueue when it will send the next update but we are not yet
          // there. So let's take the last change not sent directly from
          // the db.
        UpdateMessage msg = lateQueue.first();
        return msg.getChangeNumber();
          ReplicationIteratorComparator comparator =
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
          try
          {
            // Build a list of candidates iterator (i.e. db i.e. server)
            for (short serverId : replicationServerDomain.getServers())
            {
              // get the last already sent CN from that server
              ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
              // get an iterator in this server db from that last change
              ReplicationIterator iterator =
                replicationServerDomain.getChangelogIterator(serverId, lastCsn);
              // if that iterator has changes, then it is a candidate
              // it is added in the sorted list at a position given by its
              // current change (see ReplicationIteratorComparator).
              if ((iterator != null) && (iterator.getChange() != null))
              {
                iteratorSortedSet.add(iterator);
              }
            }
            UpdateMessage msg = iteratorSortedSet.first().getChange();
            result = msg.getChangeNumber();
          }
          catch(Exception e)
          {
            result=null;
          }
          finally
          {
            for (ReplicationIterator iterator : iteratorSortedSet)
            {
              iterator.releaseCursor();
            }
          }
        }
        else
        {
          UpdateMessage msg = lateQueue.first();
          result = msg.getChangeNumber();
        }
      }
    }
    return result;
  }
  /**
@@ -958,7 +1026,7 @@
       */
      while (msgQueue.size() > maxQueueSize)
      {
        following = false;
        setFollowing(false);
        msgQueue.removeFirst();
      }
    }
@@ -1083,6 +1151,13 @@
              }
            }
          }
          // The loop below relies on the fact that it is sorted based
          // on the currentChange of each iterator to consider the next
          // change accross all servers.
          // Hence it is necessary to remove and eventual add again an iterator
          // when looping in order to keep consistent the order of the
          // iterators (see ReplicationIteratorComparator.
          while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
          {
            ReplicationIterator iterator = iteratorSortedSet.first();
@@ -1107,7 +1182,7 @@
            {
              if (msgQueue.size() < maxQueueSize)
              {
                following = true;
                setFollowing(true);
              }
            }
          }
@@ -1119,7 +1194,7 @@
              if (msgQueue.contains(msg))
              {
                /* we finally catched up with the regular queue */
                following = true;
                setFollowing(true);
                lateQueue.clear();
                UpdateMessage msg1;
                do
@@ -1459,14 +1534,6 @@
      attributes.add(new Attribute("connected-to", this.replicationServerDomain.
          getReplicationServer().getMonitorInstanceName()));
      // Add the oldest missing update
      Long olderUpdateTime = this.getApproxFirstMissingDate();
      if (olderUpdateTime != null)
      {
        Date date = new Date(olderUpdateTime);
        attributes.add(new Attribute("approx-older-change-not-synchronized",
          date.toString()));
      }
    }
    else
    {
@@ -1477,27 +1544,42 @@
    attributes.add(new Attribute("base-dn",
                                 baseDn.toString()));
    // Update stats
    // Retrieves the topology counters
    if (serverIsLDAPserver)
    {
      MonitorData md;
      try
      {
        replicationServerDomain.retrievesRemoteMonitorData();
        md = replicationServerDomain.getMonitorData();
        // Oldest missing update
        Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
        if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
        {
          Date date = new Date(approxFirstMissingDate);
          attributes.add(new Attribute("approx-older-change-not-synchronized",
              date.toString()));
          attributes.add(
              new Attribute("approx-older-change-not-synchronized-millis",
                  String.valueOf(approxFirstMissingDate)));
        }
        // Missing changes
        long missingChanges = md.getMissingChanges(serverId);
        attributes.add(new Attribute("missing-changes",
            String.valueOf(missingChanges)));
        // Replication delay
        long delay = md.getApproxDelay(serverId);
        attributes.add(new Attribute("approximate-delay",
            String.valueOf(delay)));
      }
      catch(Exception e)
      {
        // FIXME: We failed retrieving the remote monitor data
        // TODO: improve the log
        // We failed retrieving the remote monitor data.
        attributes.add(new Attribute("error",
            stackTraceToSingleLineString(e)));
      }
      // Compute the latency for the current SH
      int missingChanges =
        replicationServerDomain.getMissingChanges(serverState);
      // add the latency attribute to our monitor data
      attributes.add(new Attribute("missing-changes",
          String.valueOf(missingChanges)));
    }
    // Deprecated
@@ -1532,8 +1614,6 @@
    attributes.add(new Attribute("waiting-changes",
        String.valueOf(getRcvMsgQueueSize())));
    // Age of oldest missing change
    attributes.add(new Attribute("approximate-delay",
                                 String.valueOf(getApproxDelay())));
    // Date of the oldest missing change
    long olderUpdateTime = getOlderUpdateTime();
@@ -1731,14 +1811,14 @@
     List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
     generationId = infoMsg.getGenerationId();
     synchronized(remoteLDAPservers)
     synchronized(connectedServers)
     {
       // Removes the existing structures
       for (LightweightServerHandler lsh : remoteLDAPservers)
       for (LightweightServerHandler lsh : connectedServers.values())
       {
         lsh.stopHandler();
       }
       remoteLDAPservers.clear();
       connectedServers.clear();
       // Creates the new structure according to the message received.
       for (String newConnectedServer : newRemoteLDAPservers)
@@ -1746,7 +1826,7 @@
         LightweightServerHandler lsh
         = new LightweightServerHandler(newConnectedServer, this);
         lsh.startHandler();
         remoteLDAPservers.add(lsh);
         connectedServers.put(lsh.getServerId(), lsh);
       }
     }
   }
@@ -1762,14 +1842,17 @@
    */
   public boolean isRemoteLDAPServer(short wantedServer)
   {
     for (LightweightServerHandler server : remoteLDAPservers)
     synchronized(connectedServers)
     {
       if (wantedServer == server.getServerId())
       for (LightweightServerHandler server : connectedServers.values())
       {
         return true;
         if (wantedServer == server.getServerId())
         {
           return true;
         }
       }
       return false;
     }
     return false;
   }
   /**
@@ -1781,7 +1864,7 @@
    */
   public boolean hasRemoteLDAPServers()
   {
     return !remoteLDAPservers.isEmpty();
     return !connectedServers.isEmpty();
   }
  /**
@@ -1907,4 +1990,13 @@
  {
    return this.replicationServerDomain;
  }
  /**
   * Return a Set containing the servers known by this replicationServer.
   * @return a set containing the servers known by this replicationServer.
   */
  public Set<Short> getConnectedServerIds()
  {
    return connectedServers.keySet();
  }
}
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -120,6 +120,7 @@
      {
        ReplicationMessage msg = session.receive();
        /*
        if (debugEnabled())
        {
          TRACER.debugInfo(
@@ -128,6 +129,7 @@
              (handler.isReplicationServer()?" From RS ":" From LS")+
              " with serverId=" + serverId + " receives " + msg);
        }
        */
        if (msg instanceof AckMessage)
        {
          AckMessage ack = (AckMessage) msg;
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -120,6 +120,7 @@
          continue;
        }
        /*
        if (debugEnabled())
        {
          TRACER.debugInfo(
@@ -131,6 +132,7 @@
            " server=" + handler.getServerId() +
            " generationId=" + handler.getGenerationId());
        }
        */
        session.publish(update);
      }
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
@@ -316,6 +316,10 @@
    CN1 = new ChangeNumber((long)0, 3, (short)0);
    // 3-0 = 3
    CN2 = new ChangeNumber((long)0, 0, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 3);
    // 3-1 = 2
    CN2 = new ChangeNumber((long)0, 1, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 2);
@@ -327,5 +331,15 @@
    // 3-4 == MAXINT (modulo)
    CN2 = new ChangeNumber((long)0, 4, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE);
    CN1 = new ChangeNumber((long)0, 0, (short)0);
    // 0-0 = 0
    CN2 = new ChangeNumber((long)0, 0, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 0);
    // 0-1 = MAXINT(modulo)
    CN2 = new ChangeNumber((long)0, 1, (short)0);
    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -496,7 +496,7 @@
    state.update(new ChangeNumber((long)1, 1,(short)1));
    ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
        window, window, window, window, window, window, state, (short)1, 
        (long)1, true);
        (long)1, true, false);
    ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -511,6 +511,7 @@
        newMsg.getServerState().getMaxChangeNumber((short)1));
    assertEquals(msg.getVersion(), newMsg.getVersion());
    assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
    assertEquals(msg.isHandshakeOnly(), newMsg.isHandshakeOnly());
  }
  @DataProvider(name="changelogStart")
@@ -614,20 +615,28 @@
                                       (short) 123, sid2);
    s2.update(cn2);
    // LS3 state
    ServerState s3 = new ServerState();
    short sid3 = 333;
    ChangeNumber cn3 = new ChangeNumber(now,
                                       (short) 123, sid3);
    s3.update(cn3);
    MonitorMessage msg =
      new MonitorMessage(sender, dest);
    msg.setReplServerState(rsState);
    msg.setLDAPServerState(sid1, s1, now+1);
    msg.setLDAPServerState(sid2, s2, now+2);
    msg.setReplServerDbState(rsState);
    msg.setServerState(sid1, s1, now+1, true);
    msg.setServerState(sid2, s2, now+2, true);
    msg.setServerState(sid3, s3, now+3, false);
    
    byte[] b = msg.getBytes();
    MonitorMessage newMsg = new MonitorMessage(b);
    assertEquals(rsState, msg.getReplServerState());
    assertEquals(newMsg.getReplServerState().toString(),
        msg.getReplServerState().toString());
    assertEquals(rsState, msg.getReplServerDbState());
    assertEquals(newMsg.getReplServerDbState().toString(),
        msg.getReplServerDbState().toString());
    
    Iterator<Short> it = newMsg.iterator();
    Iterator<Short> it = newMsg.ldapIterator();
    while (it.hasNext())
    {
      short sid = it.next();
@@ -635,16 +644,32 @@
      if (sid == sid1)
      {
        assertEquals(s.toString(), s1.toString(), "");
        assertEquals((Long)(now+1), newMsg.getApproxFirstMissingDate(sid), "");
        assertEquals((Long)(now+1), newMsg.getLDAPApproxFirstMissingDate(sid), "");
      }
      else if (sid == sid2)
      {
        assertEquals(s.toString(), s2.toString());        
        assertEquals((Long)(now+2), newMsg.getApproxFirstMissingDate(sid), "");
        assertEquals((Long)(now+2), newMsg.getLDAPApproxFirstMissingDate(sid), "");
      }
      else
      {
        fail("Bad sid");
        fail("Bad sid" + sid);
      }
    }
    Iterator<Short> it2 = newMsg.rsIterator();
    while (it2.hasNext())
    {
      short sid = it2.next();
      ServerState s = newMsg.getRSServerState(sid);
      if (sid == sid3)
      {
        assertEquals(s.toString(), s3.toString(), "");
        assertEquals((Long)(now+3), newMsg.getRSApproxFirstMissingDate(sid), "");
      }
      else
      {
        fail("Bad sid " + sid);
      }
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -874,7 +874,7 @@
      ServerStartMessage msg =
        new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"),
            0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(),
            ProtocolVersion.currentVersion(), 0, sslEncryption);
            ProtocolVersion.currentVersion(), 0, sslEncryption, false);
      session.publish(msg);
      // Read the Replication Server state from the ReplServerStartMessage that
@@ -907,7 +907,7 @@
          0, 0, 0, 0, WINDOW, (long) 5000, replServerState,
          ProtocolVersion.currentVersion(),
          ReplicationTestCase.getGenerationId(baseDn),
          sslEncryption);
          sslEncryption, false);
      session.publish(msg);
      // Read the ReplServerStartMessage that come back.