mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
22.28.2008 5722eba8e33871a7d8bc6f5277992f1e26b00286
partial fix for #1302: on startup servers should wait to catchup before accepting connections
1 files added
2 files modified
1294 ■■■■ changed files
opends/src/messages/messages/replication.properties 16 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 594 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java 684 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -76,8 +76,8 @@
NOTICE_SERVER_DISCONNECT_16=%s has disconnected from this replication server
NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \
 listening on %s
NOTICE_CHANGELOG_MISSING_CHANGES_18=The replication server %s is missing some \
 changes that this server has already processed on suffix %s
NOTICE_FOUND_CHANGELOGS_WITH_MY_CHANGES_18=Found %d replication server(s) with \
up to date changes for suffix %s
NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \
 server should be configured
SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
@@ -85,8 +85,8 @@
MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \
 database for base DN %s
NOTICE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES_22=Could not find a \
 replication server that has seen all the local changes on suffix %s. Going to replay \
 changes
 replication server that has seen all the local changes on suffix %s. Found %d \
replications server(s) not up to date. Going to replay changes
NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \
 server on suffix %s, retrying...
NOTICE_EXCEPTION_CLOSING_DATABASE_24=Error closing changelog database %s :
@@ -261,4 +261,12 @@
 while replaying replication message : %s
SEVERE_ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND_110=The replication server \
 configuration could not be found
DEBUG_GOING_TO_SEARCH_FOR_CHANGES_111=The replication server is late \
regarding our changes: going to send missing ones
DEBUG_SENDING_CHANGE_112=Sending change number: %s
DEBUG_CHANGES_SENT_113=All missing changes sent to replication server
SEVERE_ERR_PUBLISHING_FAKE_OPS_114=Caught exception publishing fake operations \
for domain %s to replication server %s : %s
SEVERE_ERR_COMPUTING_FAKE_OPS_115=Caught exception computing fake operations \
for domain %s for replication server %s : %s
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -22,9 +22,10 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -41,6 +42,8 @@
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
@@ -61,17 +64,16 @@
import org.opends.server.types.SearchResultReference;
import org.opends.server.types.SearchScope;
/**
 * The broker for Multi-master Replication.
 */
public class ReplicationBroker implements InternalSearchListener
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private boolean shutdown = false;
  private Collection<String> servers;
  private boolean connected = false;
@@ -95,23 +97,22 @@
  private long generationId = -1;
  private ReplSessionSecurity replSessionSecurity;
  // Trick for avoiding a inner class for many parameters return for
  // performHandshake method.
  private String tmpReadableServerName = null;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
   * A thread to monitor heartbeats on the session.
   */
  private HeartbeatMonitor heartbeatMonitor = null;
  /**
   * The number of times the connection was lost.
   */
  private int numLostConnections = 0;
  /**
   * When the broker cannot connect to any replication server
   * it log an error and keeps continuing every second.
@@ -121,7 +122,6 @@
   * finally succeed to connect.
   */
  private boolean connectionError = false;
  private final Object connectPhaseLock = new Object();
  /**
@@ -194,7 +194,6 @@
    this.connect();
  }
  /**
   * Connect to a ReplicationServer.
   *
@@ -202,7 +201,7 @@
   */
  private void connect()
  {
    ReplServerStartMessage replServerStartMsg = null;
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
@@ -211,127 +210,74 @@
      heartbeatMonitor = null;
    }
    // checkState is true for the first loop on all replication servers
    // looking for one already up-to-date.
    // If we found some responding replication servers but none up-to-date
    // then we set check-state to false and do a second loop where the first
    // found will be the one elected and then we will update this replication
    // server.
    boolean checkState = true;
    boolean receivedResponse = true;
    // TODO: We are doing here 2 loops opening , closing , reopening session to
    // the same servers .. risk to have 'same server id' erros.
    // Would be better to do only one loop, keeping the best candidate while
    // traversing the list of replication servers to connect to.
    if (servers.size()==1)
    {
      checkState = false;
    }
    synchronized (connectPhaseLock)
    {
      while ((!connected) && (!shutdown) && (receivedResponse))
      {
        receivedResponse = false;
      /*
       * Connect to each replication server and get their ServerState then find
       * out which one is the best to connect to.
       */
        for (String server : servers)
        {
          int separator = server.lastIndexOf(':');
          String port = server.substring(separator + 1);
          String hostname = server.substring(0, separator);
        // Connect to server and get reply message
        ReplServerStartMessage replServerStartMsg =
          performHandshake(server, false);
        tmpReadableServerName = null; // Not needed now
        // Store reply message in list
        if (replServerStartMsg != null)
        {
          ServerState rsState = replServerStartMsg.getServerState();
          rsStates.put(server, rsState);
        }
      } // for servers
      ReplServerStartMessage replServerStartMsg = null;
      if (rsStates.size() > 0)
      {
        // At least one server answered, find the best one.
        String bestServer = computeBestReplicationServer(state, rsStates,
          serverID, baseDn);
        // Best found, now connect to this one
        replServerStartMsg = performHandshake(bestServer, true);
        if (replServerStartMsg != null)
        {
          try
          {
            /*
             * Open a socket connection to the next candidate.
             */
            InetSocketAddress ServerAddr = new InetSocketAddress(
                InetAddress.getByName(hostname), Integer.parseInt(port));
            Socket socket = new Socket();
            socket.setReceiveBufferSize(1000000);
            socket.setTcpNoDelay(true);
            socket.connect(ServerAddr, 500);
            session = replSessionSecurity.createClientSession(server, socket);
            boolean isSslEncryption =
                 replSessionSecurity.isSslEncryption(server);
            /*
             * Send our ServerStartMessage.
             */
            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                halfRcvWindow*2, heartbeatInterval, state,
                protocolVersion, generationId, isSslEncryption);
            session.publish(msg);
            /*
             * Read the ReplServerStartMessage that should come back.
             */
            session.setSoTimeout(1000);
            replServerStartMsg = (ReplServerStartMessage) session.receive();
            receivedResponse = true;
            /*
             * We have sent our own protocol version to the replication server.
             * The replication server will use the same one (or an older one
             * if it is an old replication server).
             */
            protocolVersion = ProtocolVersion.minWithCurrent(
                replServerStartMsg.getVersion());
            session.setSoTimeout(timeout);
            if (!isSslEncryption)
            {
              session.stopEncryption();
            }
            /*
             * We must not publish changes to a replicationServer that has not
             * seen all our previous changes because this could cause some
             * other ldap servers to miss those changes.
             * Check that the ReplicationServer has seen all our previous
             * changes.
             * If not, try another replicationServer.
             * If no other replicationServer has seen all our changes, recover
             * those changes and send them again to any replicationServer.
             */
            ChangeNumber replServerMaxChangeNumber =
              replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
            if (replServerMaxChangeNumber == null)
            {
              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
            }
            ChangeNumber ourMaxChangeNumber =
              state.getMaxChangeNumber(serverID);
            if ((ourMaxChangeNumber == null) ||
                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            if ((ourMaxChangeNumber != null) &&
              (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            {
              replicationServer = ServerAddr.toString();
              maxSendWindow = replServerStartMsg.getWindowSize();
              connected = true;
              startHeartBeat();
              break;
            }
            else
            {
              if (checkState == true)
              {
                /* This replicationServer is missing some
                 * of our changes, we are going to try another server
                 * but before log a notice message
                 */
                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
                    baseDn.toNormalizedString());
                logError(message);
              }
              else
              {
              // Replication server is missing some of our changes: let's send
              // them to him.
                replayOperations.clear();
                 // TODO: i18n
                logError(Message.raw("going to search for changes"));
              Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
              logError(message);
                /*
                 * Get all the changes that have not been seen by this
                 * replicationServer and populate the replayOperations
               * replication server and populate the replayOperations
                 * list.
                 */
                InternalSearchOperation op = seachForChangedEntries(
@@ -345,53 +291,40 @@
                   * Log an error for the repair tool
                   * that will need to resynchronize the servers.
                   */
                  Message message = ERR_CANNOT_RECOVER_CHANGES.get(
                message = ERR_CANNOT_RECOVER_CHANGES.get(
                      baseDn.toNormalizedString());
                  logError(message);
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = replServerStartMsg.getWindowSize();
                  connected = true;
                  startHeartBeat();
                }
                else
              } else
                {
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = replServerStartMsg.getWindowSize();
                  connected = true;
                  for (FakeOperation replayOp : replayOperations)
                  {
                    logError(Message.raw("sendingChange")); // TODO: i18n
                  message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber().
                    toString());
                  logError(message);
                    session.publish(replayOp.generateMessage());
                  }
                  startHeartBeat();
                  logError(Message.raw("changes sent")); // TODO: i18n
                  break;
                }
              }
            }
          }
          catch (ConnectException e)
          {
            /*
             * There was no server waiting on this host:port
             * Log a notice and try the next replicationServer in the list
             */
            if (!connectionError )
            {
              // the error message is only logged once to avoid overflowing
              // the error log
              Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
                message = DEBUG_CHANGES_SENT.get();
              logError(message);
            }
          }
          catch (Exception e)
            replicationServer = tmpReadableServerName;
            maxSendWindow = replServerStartMsg.getWindowSize();
            connected = true;
            startHeartBeat();
          } catch (IOException e)
          {
            Message message = ERR_EXCEPTION_STARTING_SESSION.get(
                baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
            Message message = ERR_PUBLISHING_FAKE_OPS.get(
              baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
                stackTraceToSingleLineString(e));
            logError(message);
          }
          finally
          } catch (Exception e)
          {
            Message message = ERR_COMPUTING_FAKE_OPS.get(
              baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
              stackTraceToSingleLineString(e));
            logError(message);
          } finally
          {
            if (connected == false)
            {
@@ -408,37 +341,20 @@
              }
            }
          }
        } // for servers
        // We have traversed all the replication servers
        if ((!connected) && (checkState == true) && receivedResponse)
        {
          /*
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
              baseDn.toNormalizedString());
          logError(message);
          checkState = false;
        }
      }
      // We have traversed all the replication servers as many times as needed
      // to find one if one is up and running.
        } // Could perform handshake with best
      } // Reached some servers
      if (connected)
      {
        // This server has connected correctly.
        // Log a message to let the administrator know that the failure was
        // resolved.
        // wakeup all the thread that were waiting on the window
        // Wakeup all the thread that were waiting on the window
        // on the previous connection.
        connectionError = false;
        if (sendWindow != null)
        {
          sendWindow.release(Integer.MAX_VALUE);
        }
        this.sendWindow = new Semaphore(maxSendWindow);
        connectPhaseLock.notify();
@@ -451,8 +367,7 @@
                replicationServer,
                Long.toString(this.generationId));
          logError(message);
        }
        else
        } else
        {
          Message message =
            NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
@@ -462,17 +377,14 @@
                Long.toString(replServerStartMsg.getGenerationId()));
          logError(message);
        }
      }
      else
      } else
      {
        /*
         * This server could not find any replicationServer
         * It's going to start in degraded mode.
         * Log a message
         * This server could not find any replicationServer. It's going to start
         * in degraded mode. Log a message.
         */
        if (!connectionError)
        {
          checkState = false;
          connectionError = true;
          connectPhaseLock.notify();
          Message message =
@@ -484,6 +396,315 @@
  }
  /**
   * Connect to the provided server performing the handshake (start messages
   * exchange) and return the reply message from the replication server.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
   * @return The ReplServerStartMessage the server replied. Null if could not
   *         get an answer.
   */
  public ReplServerStartMessage performHandshake(String server,
    boolean keepConnection)
  {
    ReplServerStartMessage replServerStartMsg = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
    String port = server.substring(separator + 1);
    String hostname = server.substring(0, separator);
    boolean error = false;
    try
    {
      /*
       * Open a socket connection to the next candidate.
       */
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      tmpReadableServerName = serverAddr.toString();
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(serverAddr, 500);
      session = replSessionSecurity.createClientSession(server, socket);
      boolean isSslEncryption =
        replSessionSecurity.isSslEncryption(server);
      /*
       * Send our ServerStartMessage.
       */
      ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
        maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
        halfRcvWindow * 2, heartbeatInterval, state,
        protocolVersion, generationId, isSslEncryption);
      session.publish(msg);
      /*
       * Read the ReplServerStartMessage that should come back.
       */
      session.setSoTimeout(1000);
      replServerStartMsg = (ReplServerStartMessage) session.receive();
      /*
       * We have sent our own protocol version to the replication server.
       * The replication server will use the same one (or an older one
       * if it is an old replication server).
       */
      protocolVersion = ProtocolVersion.minWithCurrent(
        replServerStartMsg.getVersion());
      session.setSoTimeout(timeout);
      if (!isSslEncryption)
      {
        session.stopEncryption();
      }
    } catch (ConnectException e)
    {
      /*
       * There was no server waiting on this host:port
       * Log a notice and try the next replicationServer in the list
       */
      if (!connectionError)
      {
        // the error message is only logged once to avoid overflowing
        // the error log
        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
        logError(message);
      }
      error = true;
    } catch (Exception e)
    {
      Message message = ERR_EXCEPTION_STARTING_SESSION.get(
        baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      logError(message);
      error = true;
    }
    // Close session if requested
    if (!keepConnection || error)
    {
      if (session != null)
      {
        try
        {
          session.close();
        } catch (IOException e)
        {
        // The session was already closed, just ignore.
        }
        session = null;
      }
      if (error)
      {
        replServerStartMsg = null;
      } // Be sure to return null.
    }
    return replServerStartMsg;
  }
  /**
   * Returns the replication server that best fits our need so that we can
   * connect to it.
   *
   * Note: this method put as public static for unit testing purpose.
   *
   * @param myState The local server state.
   * @param rsStates The list of available replication servers and their
   *                 associated server state.
   * @param serverId The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @return The computed best replication server.
   */
  public static String computeBestReplicationServer(ServerState myState,
    HashMap<String, ServerState> rsStates, short serverId, DN baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
     * if for instance we failed and restarted, having sent some changes to the
     * RS but without having time to store our own state) regarding our own
     * server id. Then, among them, choose the server that is the most up to
     * date regarding the whole topology.
     *
     * If no server is up to date regarding our own server id, find the one who
     * is the most up to date regarding our server id.
     */
    // Should never happen (sanity check)
    if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) ||
      (baseDn == null))
    {
      return null;
    }
    String bestServer = null;
    // Servers up to dates with regard to our changes
    HashMap<String, ServerState> upToDateServers =
      new HashMap<String, ServerState>();
    // Servers late with regard to our changes
    HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
    /*
     * Start loop to differenciate up to date servers from late ones.
     */
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId);
    if (myChangeNumber == null)
    {
      myChangeNumber = new ChangeNumber(0, 0, serverId);
    }
    for (String repServer : rsStates.keySet())
    {
      ServerState rsState = rsStates.get(repServer);
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId);
      if (rsChangeNumber == null)
      {
        rsChangeNumber = new ChangeNumber(0, 0, serverId);
      }
      // Store state in right list
      if (myChangeNumber.olderOrEqual(rsChangeNumber))
      {
        upToDateServers.put(repServer, rsState);
      } else
      {
        lateOnes.put(repServer, rsState);
      }
    }
    if (upToDateServers.size() > 0)
    {
      /*
       * Some up to date servers, among them, choose the one that has the
       * maximum number of changes to send us. This is the most up to date one
       * regarding the whole topology. This server is the one which has the less
       * difference with the topology server state. For comparison, we need to
       * compute the difference for each server id with the topology server
       * state.
       */
      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
        upToDateServers.size(),
        baseDn.toNormalizedString());
      logError(message);
      /*
       * First of all, compute the virtual server state for the whole topology,
       * which is composed of the most up to date change numbers for
       * each server id in the topology.
       */
      ServerState topoState = new ServerState();
      for (ServerState curState : upToDateServers.values())
      {
        Iterator<Short> it = curState.iterator();
        while (it.hasNext())
        {
          Short sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Update topology state
          topoState.update(curSidCn);
        }
      } // For up to date servers
      // Min of the max shifts
      long minShift = -1L;
      for (String upServer : upToDateServers.keySet())
      {
        /*
         * Compute the maximum difference between the time of a server id's
         * change number and the time of the matching server id's change
         * number in the topology server state.
         *
         * Note: we could have used the sequence number here instead of the
         * timestamp, but this would have caused a problem when the sequence
         * number loops and comes back to 0 (computation would have becomen
         * meaningless).
         */
        long shift = -1L;
        ServerState curState = upToDateServers.get(upServer);
        Iterator<Short> it = curState.iterator();
        while (it.hasNext())
        {
          Short sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Cannot be null as checked at construction time
          ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
          // Cannot be negative as topoState computed as being the max CN
          // for each server id in the topology
          long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
          if (tmpShift > shift)
          {
            shift = tmpShift;
          }
        }
        if ((minShift < 0) // First time in loop
          || (shift < minShift))
        {
          // This sever is even closer to topo state
          bestServer = upServer;
          minShift = shift;
        }
      } // For up to date servers
    } else
    {
      /*
       * We could not find a replication server that has seen all the
       * changes that this server has already processed,
       */
      // lateOnes cannot be empty
      Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
        baseDn.toNormalizedString(), lateOnes.size());
      logError(message);
      // Min of the shifts
      long minShift = -1L;
      for (String lateServer : lateOnes.keySet())
      {
        /*
         * Choose the server who is the closest to us regarding our server id
         * (this is the most up to date regarding our server id).
         */
        ServerState curState = lateOnes.get(lateServer);
        ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId);
        if (ourSidCn == null)
        {
          ourSidCn = new ChangeNumber(0, 0, serverId);
        }
        // Cannot be negative as our Cn for our server id is strictly
        // greater than those of the servers in late server list
        long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
        if ((minShift < 0) // First time in loop
          || (tmpShift < minShift))
        {
          // This sever is even closer to topo state
          bestServer = lateServer;
          minShift = tmpShift;
        }
      } // For late servers
    }
    return bestServer;
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute.
   * @param baseDn the base DN
@@ -531,7 +752,6 @@
    }
  }
  /**
   * restart the ReplicationBroker.
   */
@@ -589,7 +809,6 @@
    }
  }
  /**
   * Publish a message to the other servers.
   * @param msg the message to publish
@@ -598,7 +817,7 @@
  {
    boolean done = false;
    while (!done)
    while (!done && !shutdown)
    {
      if (connectionError)
      {
@@ -643,8 +862,7 @@
          credit =
            currentWindowSemaphore.tryAcquire(
                (long) 500, TimeUnit.MILLISECONDS);
        }
        else
        } else
        {
          credit = true;
        }
@@ -689,8 +907,7 @@
            }
          }
        }
      }
      catch (InterruptedException e)
      } catch (InterruptedException e)
      {
        // just loop.
        if (debugEnabled())
@@ -702,7 +919,6 @@
    }
  }
  /**
   * Receive a message.
   * This method is not multithread safe and should either always be
@@ -730,8 +946,7 @@
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          sendWindow.release(windowMsg.getNumAck());
        }
        else
        } else
        {
          if (msg instanceof UpdateMessage)
          {
@@ -764,7 +979,6 @@
    return null;
  }
  /**
   * stop the server.
   */
@@ -774,7 +988,9 @@
    shutdown = true;
    connected = false;
    if (heartbeatMonitor!= null)
    {
      heartbeatMonitor.shutdown();
    }
    try
    {
      if (debugEnabled())
@@ -784,9 +1000,12 @@
      }
      if (session != null)
      {
        session.close();
      }
    } catch (IOException e)
    {}
    {
    }
  }
  /**
@@ -834,6 +1053,7 @@
  {
    return replicationServer;
  }
  /**
   * {@inheritDoc}
   */
@@ -906,10 +1126,13 @@
  public int getCurrentSendWindow()
  {
    if (connected)
    {
      return sendWindow.availablePermits();
    else
    } else
    {
      return 0;
  }
  }
  /**
   * Get the number of times the connection was lost.
@@ -920,7 +1143,6 @@
    return numLostConnections;
  }
  /**
   * Change some config parameters.
   *
@@ -968,7 +1190,11 @@
    return !connectionError;
  }
  private boolean debugEnabled() { return true; }
  private boolean debugEnabled()
  {
    return true;
  }
  private static final void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
New file
@@ -0,0 +1,684 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.HashMap;
import static org.opends.server.replication.plugin.ReplicationBroker.*;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.*;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
 * Test the algorithm for find the best replication server among the configured
 * ones.
 */
public class ComputeBestServerTest extends ReplicationTestCase
{
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  private void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
    if (debugEnabled())
    {
      TRACER.debugInfo("** TEST **" + s);
    }
  }
  private void debugInfo(String message, Exception e)
  {
    debugInfo(message + stackTraceToSingleLineString(e));
  }
  /**
   * Set up the environment.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  @Override
  public void setUp() throws Exception
  {
  // Don't need server context in these tests
  }
  /**
   * Clean up the environment.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @AfterClass
  @Override
  public void classCleanUp() throws Exception
  {
  // Don't need server context in these tests
  }
  /**
   * Test with one replication server, nobody has a change number (simulates)
   * very first connection.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void testNullCNBoth() throws Exception
  {
    String testCase = "testNullCNBoth";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with one replication server, only replication server has a non null
   * changenumber for ds server id
   * @throws Exception If a problem occured
   */
  @Test
  public void testNullCNDS() throws Exception
  {
    String testCase = "testNullCNDS";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with one replication server, only ds server has a non null
   * changenumber for ds server id but rs has a null one.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void testNullCNRS() throws Exception
  {
    String testCase = "testNullCNRS";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with one replication server, up to date.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test1ServerUp() throws Exception
  {
    String testCase = "test1ServerUp";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 2 replication servers, up to date.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test2ServersUp() throws Exception
  {
    String testCase = "test2ServersUp";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER1, aState);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(2L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 3 replication servers, up to date.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test3ServersUp() throws Exception
  {
    String testCase = "test3ServersUp";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    final String LOOSER2 = "looser2";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER1, aState);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(2L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(3L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(3L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(2L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER2, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with one replication server, late.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test1ServerLate() throws Exception
  {
    String testCase = "test1ServerLate";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(1L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(1L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 2 replication servers, late.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test2ServersLate() throws Exception
  {
    String testCase = "test2ServersLate";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(2L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(0L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER1, aState);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 3 replication servers, late.
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test3ServersLate() throws Exception
  {
    String testCase = "test3ServersLate";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    final String LOOSER2 = "looser2";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(4L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(1L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER1, aState);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(3L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(0L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(2L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER2, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
  /**
   * Test with 6 replication servers, some up, some late, one null
   *
   * @throws Exception If a problem occured
   */
  @Test
  public void test6ServersMixed() throws Exception
  {
    String testCase = "test6ServersMixed";
    debugInfo("Starting " + testCase);
    // definitions for server ids
    short myId1 = 1;
    short myId2 = 2;
    short myId3 = 3;
    // definitions for server names
    final String WINNER = "winner";
    final String LOOSER1 = "looser1";
    final String LOOSER2 = "looser2";
    final String LOOSER3 = "looser3";
    final String LOOSER4 = "looser4";
    final String LOOSER5 = "looser5";
    // Create my state
    ServerState mySt = new ServerState();
    ChangeNumber cn = new ChangeNumber(5L, 0, myId1);
    mySt.update(cn);
    cn = new ChangeNumber(2L, 0, myId2); // Should not be used inside algo
    mySt.update(cn);
    cn = new ChangeNumber(3L, 0, myId3); // Should not be used inside algo
    mySt.update(cn);
    // Create replication servers state list
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // State for server 1
    ServerState aState = new ServerState();
    cn = new ChangeNumber(4L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER1, aState);
    // State for server 2
    aState = new ServerState();
    cn = new ChangeNumber(7L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(6L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(5L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER2, aState);
    // State for server 3
    aState = new ServerState();
    cn = new ChangeNumber(3L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(10L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER3, aState);
    // State for server 4
    aState = new ServerState();
    cn = new ChangeNumber(6L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(6L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(8L, 0, myId3);
    aState.update(cn);
    rsStates.put(WINNER, aState);
    // State for server 5 (null one for our serverid)
    aState = new ServerState();
    cn = new ChangeNumber(5L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(5L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER4, aState);
    // State for server 6
    aState = new ServerState();
    cn = new ChangeNumber(5L, 0, myId1);
    aState.update(cn);
    cn = new ChangeNumber(7L, 0, myId2);
    aState.update(cn);
    cn = new ChangeNumber(6L, 0, myId3);
    aState.update(cn);
    rsStates.put(LOOSER5, aState);
    String bestServer =
      computeBestReplicationServer(mySt, rsStates, myId1, new DN());
    assertEquals(bestServer, WINNER, "Wrong best replication server.");
  }
}