mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
22.28.2008 5722eba8e33871a7d8bc6f5277992f1e26b00286
partial fix for #1302: on startup servers should wait to catchup before accepting connections
1 files added
2 files modified
1442 ■■■■ changed files
opends/src/messages/messages/replication.properties 16 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 742 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java 684 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -76,8 +76,8 @@
NOTICE_SERVER_DISCONNECT_16=%s has disconnected from this replication server
NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \
 listening on %s
NOTICE_CHANGELOG_MISSING_CHANGES_18=The replication server %s is missing some \
 changes that this server has already processed on suffix %s
NOTICE_FOUND_CHANGELOGS_WITH_MY_CHANGES_18=Found %d replication server(s) with \
up to date changes for suffix %s
NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \
 server should be configured
SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
@@ -85,8 +85,8 @@
MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \
 database for base DN %s
NOTICE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES_22=Could not find a \
 replication server that has seen all the local changes on suffix %s. Going to replay \
 changes
 replication server that has seen all the local changes on suffix %s. Found %d \
replications server(s) not up to date. Going to replay changes
NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \
 server on suffix %s, retrying...
NOTICE_EXCEPTION_CLOSING_DATABASE_24=Error closing changelog database %s :
@@ -261,4 +261,12 @@
 while replaying replication message : %s
SEVERE_ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND_110=The replication server \
 configuration could not be found
DEBUG_GOING_TO_SEARCH_FOR_CHANGES_111=The replication server is late \
regarding our changes: going to send missing ones
DEBUG_SENDING_CHANGE_112=Sending change number: %s
DEBUG_CHANGES_SENT_113=All missing changes sent to replication server
SEVERE_ERR_PUBLISHING_FAKE_OPS_114=Caught exception publishing fake operations \
for domain %s to replication server %s : %s
SEVERE_ERR_COMPUTING_FAKE_OPS_115=Caught exception computing fake operations \
for domain %s for replication server %s : %s
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -22,9 +22,10 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -41,6 +42,8 @@
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
@@ -61,17 +64,16 @@
import org.opends.server.types.SearchResultReference;
import org.opends.server.types.SearchScope;
/**
 * The broker for Multi-master Replication.
 */
public class ReplicationBroker implements InternalSearchListener
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private boolean shutdown = false;
  private Collection<String> servers;
  private boolean connected = false;
@@ -95,23 +97,22 @@
  private long generationId = -1;
  private ReplSessionSecurity replSessionSecurity;
  // Trick for avoiding a inner class for many parameters return for
  // performHandshake method.
  private String tmpReadableServerName = null;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
   * A thread to monitor heartbeats on the session.
   */
  private HeartbeatMonitor heartbeatMonitor = null;
  /**
   * The number of times the connection was lost.
   */
  private int numLostConnections = 0;
  /**
   * When the broker cannot connect to any replication server
   * it log an error and keeps continuing every second.
@@ -121,7 +122,6 @@
   * finally succeed to connect.
   */
  private boolean connectionError = false;
  private final Object connectPhaseLock = new Object();
  /**
@@ -149,9 +149,9 @@
   * @param replSessionSecurity The session security configuration.
   */
  public ReplicationBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval,
      long generationId, ReplSessionSecurity replSessionSecurity)
    int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
    int maxSendDelay, int window, long heartbeatInterval,
    long generationId, ReplSessionSecurity replSessionSecurity)
  {
    this.baseDn = baseDn;
    this.serverID = serverID;
@@ -164,7 +164,7 @@
      new TreeSet<FakeOperation>(new FakeOperationComparator());
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window/2;
    this.halfRcvWindow = window / 2;
    this.heartbeatInterval = heartbeatInterval;
    this.protocolVersion = ProtocolVersion.currentVersion();
    this.generationId = generationId;
@@ -194,7 +194,6 @@
    this.connect();
  }
  /**
   * Connect to a ReplicationServer.
   *
@@ -202,7 +201,7 @@
   */
  private void connect()
  {
    ReplServerStartMessage replServerStartMsg = null;
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
@@ -211,187 +210,121 @@
      heartbeatMonitor = null;
    }
    // checkState is true for the first loop on all replication servers
    // looking for one already up-to-date.
    // If we found some responding replication servers but none up-to-date
    // then we set check-state to false and do a second loop where the first
    // found will be the one elected and then we will update this replication
    // server.
    boolean checkState = true;
    boolean receivedResponse = true;
    // TODO: We are doing here 2 loops opening , closing , reopening session to
    // the same servers .. risk to have 'same server id' erros.
    // Would be better to do only one loop, keeping the best candidate while
    // traversing the list of replication servers to connect to.
    if (servers.size()==1)
    {
      checkState = false;
    }
    synchronized (connectPhaseLock)
    {
      while ((!connected) && (!shutdown) && (receivedResponse))
      /*
       * Connect to each replication server and get their ServerState then find
       * out which one is the best to connect to.
       */
      for (String server : servers)
      {
        receivedResponse = false;
        for (String server : servers)
        {
          int separator = server.lastIndexOf(':');
          String port = server.substring(separator + 1);
          String hostname = server.substring(0, separator);
        // Connect to server and get reply message
        ReplServerStartMessage replServerStartMsg =
          performHandshake(server, false);
        tmpReadableServerName = null; // Not needed now
        // Store reply message in list
        if (replServerStartMsg != null)
        {
          ServerState rsState = replServerStartMsg.getServerState();
          rsStates.put(server, rsState);
        }
      } // for servers
      ReplServerStartMessage replServerStartMsg = null;
      if (rsStates.size() > 0)
      {
        // At least one server answered, find the best one.
        String bestServer = computeBestReplicationServer(state, rsStates,
          serverID, baseDn);
        // Best found, now connect to this one
        replServerStartMsg = performHandshake(bestServer, true);
        if (replServerStartMsg != null)
        {
          try
          {
            /*
             * Open a socket connection to the next candidate.
             */
            InetSocketAddress ServerAddr = new InetSocketAddress(
                InetAddress.getByName(hostname), Integer.parseInt(port));
            Socket socket = new Socket();
            socket.setReceiveBufferSize(1000000);
            socket.setTcpNoDelay(true);
            socket.connect(ServerAddr, 500);
            session = replSessionSecurity.createClientSession(server, socket);
            boolean isSslEncryption =
                 replSessionSecurity.isSslEncryption(server);
            /*
             * Send our ServerStartMessage.
             */
            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                halfRcvWindow*2, heartbeatInterval, state,
                protocolVersion, generationId, isSslEncryption);
            session.publish(msg);
            /*
             * Read the ReplServerStartMessage that should come back.
             */
            session.setSoTimeout(1000);
            replServerStartMsg = (ReplServerStartMessage) session.receive();
            receivedResponse = true;
            /*
             * We have sent our own protocol version to the replication server.
             * The replication server will use the same one (or an older one
             * if it is an old replication server).
             */
            protocolVersion = ProtocolVersion.minWithCurrent(
                replServerStartMsg.getVersion());
            session.setSoTimeout(timeout);
            if (!isSslEncryption)
            {
              session.stopEncryption();
            }
            /*
             * We must not publish changes to a replicationServer that has not
             * seen all our previous changes because this could cause some
             * other ldap servers to miss those changes.
             * Check that the ReplicationServer has seen all our previous
             * changes.
             * If not, try another replicationServer.
             * If no other replicationServer has seen all our changes, recover
             * those changes and send them again to any replicationServer.
             */
            ChangeNumber replServerMaxChangeNumber =
              replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
            if (replServerMaxChangeNumber == null)
            {
              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
            }
            ChangeNumber ourMaxChangeNumber =
              state.getMaxChangeNumber(serverID);
            if ((ourMaxChangeNumber == null) ||
                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            if ((ourMaxChangeNumber != null) &&
              (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            {
              replicationServer = ServerAddr.toString();
              maxSendWindow = replServerStartMsg.getWindowSize();
              connected = true;
              startHeartBeat();
              break;
            }
            else
            {
              if (checkState == true)
              // Replication server is missing some of our changes: let's send
              // them to him.
              replayOperations.clear();
              Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
              logError(message);
              /*
               * Get all the changes that have not been seen by this
               * replication server and populate the replayOperations
               * list.
               */
              InternalSearchOperation op = seachForChangedEntries(
                baseDn, replServerMaxChangeNumber, this);
              if (op.getResultCode() != ResultCode.SUCCESS)
              {
                /* This replicationServer is missing some
                 * of our changes, we are going to try another server
                 * but before log a notice message
                /*
                 * An error happened trying to search for the updates
                 * This server will start acepting again new updates but
                 * some inconsistencies will stay between servers.
                 * Log an error for the repair tool
                 * that will need to resynchronize the servers.
                 */
                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
                    baseDn.toNormalizedString());
                message = ERR_CANNOT_RECOVER_CHANGES.get(
                  baseDn.toNormalizedString());
                logError(message);
              } else
              {
                for (FakeOperation replayOp : replayOperations)
                {
                  message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber().
                    toString());
                  logError(message);
                  session.publish(replayOp.generateMessage());
                }
                message = DEBUG_CHANGES_SENT.get();
                logError(message);
              }
              else
              {
                replayOperations.clear();
                 // TODO: i18n
                logError(Message.raw("going to search for changes"));
                /*
                 * Get all the changes that have not been seen by this
                 * replicationServer and populate the replayOperations
                 * list.
                 */
                InternalSearchOperation op = seachForChangedEntries(
                    baseDn, replServerMaxChangeNumber, this);
                if (op.getResultCode() != ResultCode.SUCCESS)
                {
                  /*
                   * An error happened trying to search for the updates
                   * This server will start acepting again new updates but
                   * some inconsistencies will stay between servers.
                   * Log an error for the repair tool
                   * that will need to resynchronize the servers.
                   */
                  Message message = ERR_CANNOT_RECOVER_CHANGES.get(
                      baseDn.toNormalizedString());
                  logError(message);
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = replServerStartMsg.getWindowSize();
                  connected = true;
                  startHeartBeat();
                }
                else
                {
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = replServerStartMsg.getWindowSize();
                  connected = true;
                  for (FakeOperation replayOp : replayOperations)
                  {
                    logError(Message.raw("sendingChange")); // TODO: i18n
                    session.publish(replayOp.generateMessage());
                  }
                  startHeartBeat();
                  logError(Message.raw("changes sent")); // TODO: i18n
                  break;
                }
              }
            }
          }
          catch (ConnectException e)
            replicationServer = tmpReadableServerName;
            maxSendWindow = replServerStartMsg.getWindowSize();
            connected = true;
            startHeartBeat();
          } catch (IOException e)
          {
            /*
             * There was no server waiting on this host:port
             * Log a notice and try the next replicationServer in the list
             */
            if (!connectionError )
            {
              // the error message is only logged once to avoid overflowing
              // the error log
              Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
              logError(message);
            }
          }
          catch (Exception e)
          {
            Message message = ERR_EXCEPTION_STARTING_SESSION.get(
                baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
                stackTraceToSingleLineString(e));
            Message message = ERR_PUBLISHING_FAKE_OPS.get(
              baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
              stackTraceToSingleLineString(e));
            logError(message);
          }
          finally
          } catch (Exception e)
          {
            Message message = ERR_COMPUTING_FAKE_OPS.get(
              baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
              stackTraceToSingleLineString(e));
            logError(message);
          } finally
          {
            if (connected == false)
            {
@@ -402,81 +335,60 @@
                  session.close();
                } catch (IOException e)
                {
                  // The session was already closed, just ignore.
                // The session was already closed, just ignore.
                }
                session = null;
              }
            }
          }
        } // for servers
        // We have traversed all the replication servers
        if ((!connected) && (checkState == true) && receivedResponse)
        {
          /*
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
              baseDn.toNormalizedString());
          logError(message);
          checkState = false;
        }
      }
      // We have traversed all the replication servers as many times as needed
      // to find one if one is up and running.
        } // Could perform handshake with best
      } // Reached some servers
      if (connected)
      {
        // This server has connected correctly.
        // Log a message to let the administrator know that the failure was
        // resolved.
        // wakeup all the thread that were waiting on the window
        // Wakeup all the thread that were waiting on the window
        // on the previous connection.
        connectionError = false;
        if (sendWindow != null)
        {
          sendWindow.release(Integer.MAX_VALUE);
        }
        this.sendWindow = new Semaphore(maxSendWindow);
        connectPhaseLock.notify();
        if ((replServerStartMsg.getGenerationId() == this.generationId) ||
           (replServerStartMsg.getGenerationId() == -1))
          (replServerStartMsg.getGenerationId() == -1))
        {
          Message message =
            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
                baseDn.toString(),
                replicationServer,
                Long.toString(this.generationId));
            baseDn.toString(),
            replicationServer,
            Long.toString(this.generationId));
          logError(message);
        }
        else
        } else
        {
          Message message =
            NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
                baseDn.toString(),
                replicationServer,
                Long.toString(this.generationId),
                Long.toString(replServerStartMsg.getGenerationId()));
            baseDn.toString(),
            replicationServer,
            Long.toString(this.generationId),
            Long.toString(replServerStartMsg.getGenerationId()));
          logError(message);
        }
      }
      else
      } else
      {
        /*
         * This server could not find any replicationServer
         * It's going to start in degraded mode.
         * Log a message
         * This server could not find any replicationServer. It's going to start
         * in degraded mode. Log a message.
         */
        if (!connectionError)
        {
          checkState = false;
          connectionError = true;
          connectPhaseLock.notify();
          Message message =
              NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
            NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
          logError(message);
        }
      }
@@ -484,6 +396,315 @@
  }
  /**
   * Connect to the provided server performing the handshake (start messages
   * exchange) and return the reply message from the replication server.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
   * @return The ReplServerStartMessage the server replied. Null if could not
   *         get an answer.
   */
  public ReplServerStartMessage performHandshake(String server,
    boolean keepConnection)
  {
    ReplServerStartMessage replServerStartMsg = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
    String port = server.substring(separator + 1);
    String hostname = server.substring(0, separator);
    boolean error = false;
    try
    {
      /*
       * Open a socket connection to the next candidate.
       */
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      tmpReadableServerName = serverAddr.toString();
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(serverAddr, 500);
      session = replSessionSecurity.createClientSession(server, socket);
      boolean isSslEncryption =
        replSessionSecurity.isSslEncryption(server);
      /*
       * Send our ServerStartMessage.
       */
      ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
        maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
        halfRcvWindow * 2, heartbeatInterval, state,
        protocolVersion, generationId, isSslEncryption);
      session.publish(msg);
      /*
       * Read the ReplServerStartMessage that should come back.
       */
      session.setSoTimeout(1000);
      replServerStartMsg = (ReplServerStartMessage) session.receive();
      /*
       * We have sent our own protocol version to the replication server.
       * The replication server will use the same one (or an older one
       * if it is an old replication server).
       */
      protocolVersion = ProtocolVersion.minWithCurrent(
        replServerStartMsg.getVersion());
      session.setSoTimeout(timeout);
      if (!isSslEncryption)
      {
        session.stopEncryption();
      }
    } catch (ConnectException e)
    {
      /*
       * There was no server waiting on this host:port
       * Log a notice and try the next replicationServer in the list
       */
      if (!connectionError)
      {
        // the error message is only logged once to avoid overflowing
        // the error log
        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
        logError(message);
      }
      error = true;
    } catch (Exception e)
    {
      Message message = ERR_EXCEPTION_STARTING_SESSION.get(
        baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      logError(message);
      error = true;
    }
    // Close session if requested
    if (!keepConnection || error)
    {
      if (session != null)
      {
        try
        {
          session.close();
        } catch (IOException e)
        {
        // The session was already closed, just ignore.
        }
        session = null;
      }
      if (error)
      {
        replServerStartMsg = null;
      } // Be sure to return null.
    }
    return replServerStartMsg;
  }
  /**
   * Returns the replication server that best fits our need so that we can
   * connect to it.
   *
   * Note: this method put as public static for unit testing purpose.
   *
   * @param myState The local server state.
   * @param rsStates The list of available replication servers and their
   *                 associated server state.
   * @param serverId The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @return The computed best replication server.
   */
  public static String computeBestReplicationServer(ServerState myState,
    HashMap<String, ServerState> rsStates, short serverId, DN baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
     * if for instance we failed and restarted, having sent some changes to the
     * RS but without having time to store our own state) regarding our own
     * server id. Then, among them, choose the server that is the most up to
     * date regarding the whole topology.
     *
     * If no server is up to date regarding our own server id, find the one who
     * is the most up to date regarding our server id.
     */
    // Should never happen (sanity check)
    if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) ||
      (baseDn == null))
    {
      return null;
    }
    String bestServer = null;
    // Servers up to dates with regard to our changes
    HashMap<String, ServerState> upToDateServers =
      new HashMap<String, ServerState>();
    // Servers late with regard to our changes
    HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
    /*
     * Start loop to differenciate up to date servers from late ones.
     */
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId);
    if (myChangeNumber == null)
    {
      myChangeNumber = new ChangeNumber(0, 0, serverId);
    }
    for (String repServer : rsStates.keySet())
    {
      ServerState rsState = rsStates.get(repServer);
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId);
      if (rsChangeNumber == null)
      {
        rsChangeNumber = new ChangeNumber(0, 0, serverId);
      }
      // Store state in right list
      if (myChangeNumber.olderOrEqual(rsChangeNumber))
      {
        upToDateServers.put(repServer, rsState);
      } else
      {
        lateOnes.put(repServer, rsState);
      }
    }
    if (upToDateServers.size() > 0)
    {
      /*
       * Some up to date servers, among them, choose the one that has the
       * maximum number of changes to send us. This is the most up to date one
       * regarding the whole topology. This server is the one which has the less
       * difference with the topology server state. For comparison, we need to
       * compute the difference for each server id with the topology server
       * state.
       */
      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
        upToDateServers.size(),
        baseDn.toNormalizedString());
      logError(message);
      /*
       * First of all, compute the virtual server state for the whole topology,
       * which is composed of the most up to date change numbers for
       * each server id in the topology.
       */
      ServerState topoState = new ServerState();
      for (ServerState curState : upToDateServers.values())
      {
        Iterator<Short> it = curState.iterator();
        while (it.hasNext())
        {
          Short sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Update topology state
          topoState.update(curSidCn);
        }
      } // For up to date servers
      // Min of the max shifts
      long minShift = -1L;
      for (String upServer : upToDateServers.keySet())
      {
        /*
         * Compute the maximum difference between the time of a server id's
         * change number and the time of the matching server id's change
         * number in the topology server state.
         *
         * Note: we could have used the sequence number here instead of the
         * timestamp, but this would have caused a problem when the sequence
         * number loops and comes back to 0 (computation would have becomen
         * meaningless).
         */
        long shift = -1L;
        ServerState curState = upToDateServers.get(upServer);
        Iterator<Short> it = curState.iterator();
        while (it.hasNext())
        {
          Short sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Cannot be null as checked at construction time
          ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
          // Cannot be negative as topoState computed as being the max CN
          // for each server id in the topology
          long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
          if (tmpShift > shift)
          {
            shift = tmpShift;
          }
        }
        if ((minShift < 0) // First time in loop
          || (shift < minShift))
        {
          // This sever is even closer to topo state
          bestServer = upServer;
          minShift = shift;
        }
      } // For up to date servers
    } else
    {
      /*
       * We could not find a replication server that has seen all the
       * changes that this server has already processed,
       */
      // lateOnes cannot be empty
      Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
        baseDn.toNormalizedString(), lateOnes.size());
      logError(message);
      // Min of the shifts
      long minShift = -1L;
      for (String lateServer : lateOnes.keySet())
      {
        /*
         * Choose the server who is the closest to us regarding our server id
         * (this is the most up to date regarding our server id).
         */
        ServerState curState = lateOnes.get(lateServer);
        ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId);
        if (ourSidCn == null)
        {
          ourSidCn = new ChangeNumber(0, 0, serverId);
        }
        // Cannot be negative as our Cn for our server id is strictly
        // greater than those of the servers in late server list
        long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
        if ((minShift < 0) // First time in loop
          || (tmpShift < minShift))
        {
          // This sever is even closer to topo state
          bestServer = lateServer;
          minShift = tmpShift;
        }
      } // For late servers
    }
    return bestServer;
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute.
   * @param baseDn the base DN
@@ -493,26 +714,26 @@
   * @throws Exception when raised.
   */
  public static InternalSearchOperation seachForChangedEntries(
      DN baseDn,
      ChangeNumber fromChangeNumber,
      InternalSearchListener resultListener)
  throws Exception
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  {
    InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
    LDAPFilter filter = LDAPFilter.decode(
        "("+ Historical.HISTORICALATTRIBUTENAME +
        ">=dummy:" + fromChangeNumber + ")");
      "(" + Historical.HISTORICALATTRIBUTENAME +
      ">=dummy:" + fromChangeNumber + ")");
    LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
    attrs.add(Historical.HISTORICALATTRIBUTENAME);
    attrs.add(Historical.ENTRYUIDNAME);
    return conn.processSearch(
        new ASN1OctetString(baseDn.toString()),
        SearchScope.WHOLE_SUBTREE,
        DereferencePolicy.NEVER_DEREF_ALIASES,
        0, 0, false, filter,
        attrs,
        resultListener);
      new ASN1OctetString(baseDn.toString()),
      SearchScope.WHOLE_SUBTREE,
      DereferencePolicy.NEVER_DEREF_ALIASES,
      0, 0, false, filter,
      attrs,
      resultListener);
  }
  /**
@@ -524,14 +745,13 @@
    if (heartbeatInterval > 0)
    {
      heartbeatMonitor =
           new HeartbeatMonitor("Replication Heartbeat Monitor on " +
               baseDn + " with " + getReplicationServer(),
               session, heartbeatInterval);
        new HeartbeatMonitor("Replication Heartbeat Monitor on " +
        baseDn + " with " + getReplicationServer(),
        session, heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
  /**
   * restart the ReplicationBroker.
   */
@@ -556,7 +776,7 @@
      }
    } catch (IOException e1)
    {
      // ignore
    // ignore
    }
    if (failingSession == session)
@@ -572,7 +792,7 @@
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
         baseDn.toNormalizedString(), e.getLocalizedMessage()));
          baseDn.toNormalizedString(), e.getLocalizedMessage()));
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
@@ -583,13 +803,12 @@
          Thread.sleep(500);
        } catch (InterruptedException e)
        {
          // ignore
        // ignore
        }
      }
    }
  }
  /**
   * Publish a message to the other servers.
   * @param msg the message to publish
@@ -598,7 +817,7 @@
  {
    boolean done = false;
    while (!done)
    while (!done && !shutdown)
    {
      if (connectionError)
      {
@@ -611,7 +830,7 @@
        if (debugEnabled())
        {
          debugInfo("ReplicationBroker.publish() Publishing a " +
              " message is not possible due to existing connection error.");
            " message is not possible due to existing connection error.");
        }
        return;
@@ -642,9 +861,8 @@
          // want to hold off reconnection in case the connection dropped.
          credit =
            currentWindowSemaphore.tryAcquire(
                (long) 500, TimeUnit.MILLISECONDS);
        }
        else
            (long) 500, TimeUnit.MILLISECONDS);
        } else
        {
          credit = true;
        }
@@ -685,24 +903,22 @@
            if (debugEnabled())
            {
              debugInfo("ReplicationBroker.publish() " +
                  "IO exception raised : " + e.getLocalizedMessage());
                "IO exception raised : " + e.getLocalizedMessage());
            }
          }
        }
      }
      catch (InterruptedException e)
      } catch (InterruptedException e)
      {
        // just loop.
        if (debugEnabled())
        {
          debugInfo("ReplicationBroker.publish() " +
              "Interrupted exception raised." + e.getLocalizedMessage());
            "Interrupted exception raised." + e.getLocalizedMessage());
        }
      }
    }
  }
  /**
   * Receive a message.
   * This method is not multithread safe and should either always be
@@ -730,8 +946,7 @@
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          sendWindow.release(windowMsg.getNumAck());
        }
        else
        } else
        {
          if (msg instanceof UpdateMessage)
          {
@@ -752,11 +967,11 @@
        if (shutdown == false)
        {
          Message message =
              NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
            NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
          logError(message);
          debugInfo("ReplicationBroker.receive() " + baseDn +
              " Exception raised." + e + e.getLocalizedMessage());
            " Exception raised." + e + e.getLocalizedMessage());
          this.reStart(failingSession);
        }
      }
@@ -764,7 +979,6 @@
    return null;
  }
  /**
   * stop the server.
   */
@@ -773,8 +987,10 @@
    replicationServer = "stopped";
    shutdown = true;
    connected = false;
    if (heartbeatMonitor!= null)
    if (heartbeatMonitor != null)
    {
      heartbeatMonitor.shutdown();
    }
    try
    {
      if (debugEnabled())
@@ -784,9 +1000,12 @@
      }
      if (session != null)
      {
        session.close();
      }
    } catch (IOException e)
    {}
    {
    }
  }
  /**
@@ -834,12 +1053,13 @@
  {
    return replicationServer;
  }
  /**
   * {@inheritDoc}
   */
  public void handleInternalSearchEntry(
      InternalSearchOperation searchOperation,
      SearchResultEntry searchEntry)
    InternalSearchOperation searchOperation,
    SearchResultEntry searchEntry)
  {
    /*
     * Only deal with modify operation so far
@@ -862,10 +1082,10 @@
   * {@inheritDoc}
   */
  public void handleInternalSearchReference(
      InternalSearchOperation searchOperation,
      SearchResultReference searchReference)
    InternalSearchOperation searchOperation,
    SearchResultReference searchReference)
  {
    // TODO to be implemented
  // TODO to be implemented
  }
  /**
@@ -906,9 +1126,12 @@
  public int getCurrentSendWindow()
  {
    if (connected)
    {
      return sendWindow.availablePermits();
    else
    } else
    {
      return 0;
    }
  }
  /**
@@ -920,7 +1143,6 @@
    return numLostConnections;
  }
  /**
   * Change some config parameters.
   *
@@ -933,8 +1155,8 @@
   * @param heartbeatInterval   The heartbeat interval.
   */
  public void changeConfig(Collection<String> replicationServers,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval)
    int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
    int maxSendDelay, int window, long heartbeatInterval)
  {
    this.servers = replicationServers;
    this.maxRcvWindow = window;
@@ -943,9 +1165,9 @@
    this.maxReceiveQueue = maxReceiveQueue;
    this.maxSendDelay = maxSendDelay;
    this.maxSendQueue = maxSendQueue;
    // TODO : Changing those parameters requires to either restart a new
    // session with the replicationServer or renegociate the parameters that
    // were sent in the ServerStart message
  // TODO : Changing those parameters requires to either restart a new
  // session with the replicationServer or renegociate the parameters that
  // were sent in the ServerStart message
  }
  /**
@@ -968,7 +1190,11 @@
    return !connectionError;
  }
  private boolean debugEnabled() { return true; }
  private boolean debugEnabled()
  {
    return true;
  }
  private static final void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
New file
@@ -0,0 +1,684 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.HashMap;
import static org.opends.server.replication.plugin.ReplicationBroker.*;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.*;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
 * Test the algorithm for find the best replication server among the configured
 * ones.
 */
public class ComputeBestServerTest extends ReplicationTestCase
{
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  private void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
    if (debugEnabled())
    {
      TRACER.debugInfo("** TEST **" + s);
    }
  }
  private void debugInfo(String message, Exception e)
  {
    debugInfo(message + stackTraceToSingleLineString(e));
  }
  /**
   * Set up the environment.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  @Override
  public void setUp() throws Exception
  {
  // Don't need server context in these tests
  }
  /**
   * Clean up the environment.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @AfterClass
  @Override
  public void classCleanUp() throws Exception
  {
  // Don't need server context in these tests
  }
  /**
   * Test with one replication server, nobody has a change number (simulates)
   * very first connection.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void testNullCNBoth() throws Exception
  {
    String testCase = "testNullCNBoth";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with one replication server, only replication server has a non null
   * changenumber for ds server id
   * @throws Exception If a problem occured
   */
  @Test
  public void testNullCNDS() throws Exception
  {
    String testCase = "testNullCNDS";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with one replication server, only ds server has a non null
   * changenumber for ds server id but rs has a null one.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void testNullCNRS() throws Exception
  {
    String testCase = "testNullCNRS";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with one replication server, up to date.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test1ServerUp() throws Exception
  {
    String testCase = "test1ServerUp";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 2 replication servers, up to date.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test2ServersUp() throws Exception
  {
    String testCase = "test2ServersUp";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER1, aState);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(2L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 3 replication servers, up to date.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test3ServersUp() throws Exception
  {
    String testCase = "test3ServersUp";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    final String LOOSER2 = "looser2";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER1, aState);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(2L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(3L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(3L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(2L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER2, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with one replication server, late.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test1ServerLate() throws Exception
  {
    String testCase = "test1ServerLate";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 2 replication servers, late.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test2ServersLate() throws Exception
  {
    String testCase = "test2ServersLate";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(2L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER1, aState);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 3 replication servers, late.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test3ServersLate() throws Exception
  {
    String testCase = "test3ServersLate";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    final String LOOSER2 = "looser2";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER1, aState);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(3L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(2L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER2, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 6 replication servers, some up, some late, one null
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test6ServersMixed() throws Exception
  {
    String testCase = "test6ServersMixed";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    final String LOOSER2 = "looser2";
    final String LOOSER3 = "looser3";
    final String LOOSER4 = "looser4";
    final String LOOSER5 = "looser5";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(5L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(4L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER1, aState);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(7L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(6L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(5L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER2, aState);
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(3L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER3, aState);
    // State for server 4
    aState = new ServerState();
    cn = new ChangeNumber(6L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(6L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(8L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    // State for server 5 (null one for our serverid)
    aState = new ServerState();
    cn = new ChangeNumber(5L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(5L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER4, aState);
    // State for server 6
    aState = new ServerState();
    cn = new ChangeNumber(5L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(7L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(6L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER5, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
}