mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
20.55.2007 79ad048ba3057275a80c0dfa2532b95b6e2c0648
The code checked in with revision 3561 cause some problems with
hanging tests and failing tests.

These changes roll them back until we fully understand the problem.

They also enable back the tests that were disabled.
1 files deleted
5 files modified
1449 ■■■■ changed files
opends/src/messages/messages/replication.properties 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/core/AccessControlConfigManager.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 737 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 4 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java 684 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -76,8 +76,8 @@
NOTICE_SERVER_DISCONNECT_16=%s has disconnected from this replication server
NOTICE_NO_CHANGELOG_SERVER_LISTENING_17=There is no replication server \
 listening on %s
NOTICE_FOUND_CHANGELOGS_WITH_MY_CHANGES_18=Found %d replication server(s) with \
up to date chnages for suffix %s
NOTICE_CHANGELOG_MISSING_CHANGES_18=The replication server %s is missing some \
 changes that this server has already processed on suffix %s
NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \
 server should be configured
SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \
@@ -85,8 +85,8 @@
MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \
 database for base DN %s
NOTICE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES_22=Could not find a \
 replication server that has seen all the local changes on suffix %s. Found %d \
replications server(s) not up to date. Going to replay changes
 replication server that has seen all the local changes on suffix %s. Going to replay \
 changes
NOTICE_COULD_NOT_FIND_CHANGELOG_23=Could not connect to any replication \
 server on suffix %s, retrying...
NOTICE_EXCEPTION_CLOSING_DATABASE_24=Error closing changelog database %s :
@@ -249,13 +249,6 @@
SEVERE_ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED_99=The replication \
  server backend cannot export its entries in LDIF format because the \
  export-ldif command must be run as a task
DEBUG_GOING_TO_SEARCH_FOR_CHANGES_100=The replication server is late \
regarding our changes: going to send missing ones
DEBUG_SENDING_CHANGE_101=Sending change number: %s
DEBUG_CHANGES_SENT_102=All missing changes sent to replication server
SEVERE_ERR_PUBLISHING_FAKE_OPS_103=Caught exception publishing fake operations \
for domain %s to replication server %s : %s
SEVERE_ERR_COMPUTING_FAKE_OPS_104=Caught exception computing fake operations \
for domain %s for replication server %s : %s
NOTICE_SSL_SERVER_CON_ATTEMPT_ERROR_105=SSL connection attempt from %s (%s) \
failed: %s
  failed: %s
opends/src/server/org/opends/server/core/AccessControlConfigManager.java
@@ -236,7 +236,8 @@
        if(!enabledOld) {
           AccessControlHandler oldHandler =
                   accessControlHandler.getAndSet(getHandler(newHandlerClass,
                                                  newConfiguration, true, true));
                                                  newConfiguration, true,
                                                  true));
           oldHandler.finalizeAccessControlHandler();
        } else {
          //Check if the class name is being changed.
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,7 +25,6 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -42,8 +41,6 @@
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
@@ -64,16 +61,17 @@
import org.opends.server.types.SearchResultReference;
import org.opends.server.types.SearchScope;
/**
 * The broker for Multi-master Replication.
 */
public class ReplicationBroker implements InternalSearchListener
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private boolean shutdown = false;
  private Collection<String> servers;
  private boolean connected = false;
@@ -97,23 +95,23 @@
  private long generationId = -1;
  private ReplSessionSecurity replSessionSecurity;
  // Trick for avoiding a inner class for many parameters return for
  // performHandshake method.
  private String tmpReadableServerName = null;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
   * A thread to monitor heartbeats on the session.
   */
  private HeartbeatMonitor heartbeatMonitor = null;
  /**
   * The number of times the connection was lost.
   */
  private int numLostConnections = 0;
  /**
   * When the broker cannot connect to any replication server
   * it log an error and keeps continuing every second.
@@ -123,6 +121,7 @@
   * finally succeed to connect.
   */
  private boolean connectionError = false;
  private final Object connectPhaseLock = new Object();
  /**
@@ -150,9 +149,9 @@
   * @param replSessionSecurity The session security configuration.
   */
  public ReplicationBroker(ServerState state, DN baseDn, short serverID,
    int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
    int maxSendDelay, int window, long heartbeatInterval,
    long generationId, ReplSessionSecurity replSessionSecurity)
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval,
      long generationId, ReplSessionSecurity replSessionSecurity)
  {
    this.baseDn = baseDn;
    this.serverID = serverID;
@@ -165,7 +164,7 @@
      new TreeSet<FakeOperation>(new FakeOperationComparator());
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window / 2;
    this.halfRcvWindow = window/2;
    this.heartbeatInterval = heartbeatInterval;
    this.protocolVersion = ProtocolVersion.currentVersion();
    this.generationId = generationId;
@@ -195,6 +194,7 @@
    this.connect();
  }
  /**
   * Connect to a ReplicationServer.
   *
@@ -202,7 +202,7 @@
   */
  private void connect()
  {
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    ReplServerStartMessage replServerStartMsg = null;
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
@@ -211,121 +211,187 @@
      heartbeatMonitor = null;
    }
    // checkState is true for the first loop on all replication servers
    // looking for one already up-to-date.
    // If we found some responding replication servers but none up-to-date
    // then we set check-state to false and do a second loop where the first
    // found will be the one elected and then we will update this replication
    // server.
    boolean checkState = true;
    boolean receivedResponse = true;
    // TODO: We are doing here 2 loops opening , closing , reopening session to
    // the same servers .. risk to have 'same server id' erros.
    // Would be better to do only one loop, keeping the best candidate while
    // traversing the list of replication servers to connect to.
    if (servers.size()==1)
    {
      checkState = false;
    }
    synchronized (connectPhaseLock)
    {
      /*
       * Connect to each replication server and get their ServerState then find
       * out which one is the best to connect to.
       */
      for (String server : servers)
      while ((!connected) && (!shutdown) && (receivedResponse))
      {
        // Connect to server and get reply message
        ReplServerStartMessage replServerStartMsg =
          performHandshake(server, false);
        tmpReadableServerName = null; // Not needed now
        // Store reply message in list
        if (replServerStartMsg != null)
        receivedResponse = false;
        for (String server : servers)
        {
          ServerState rsState = replServerStartMsg.getServerState();
          rsStates.put(server, rsState);
        }
      } // for servers
          int separator = server.lastIndexOf(':');
          String port = server.substring(separator + 1);
          String hostname = server.substring(0, separator);
      ReplServerStartMessage replServerStartMsg = null;
      if (rsStates.size() > 0)
      {
        // At least one server answered, find the best one.
        String bestServer = computeBestReplicationServer(state, rsStates,
          serverID, baseDn);
        // Best found, now connect to this one
        replServerStartMsg = performHandshake(bestServer, true);
        if (replServerStartMsg != null)
        {
          try
          {
            /*
             * Open a socket connection to the next candidate.
             */
            InetSocketAddress ServerAddr = new InetSocketAddress(
                InetAddress.getByName(hostname), Integer.parseInt(port));
            Socket socket = new Socket();
            socket.setReceiveBufferSize(1000000);
            socket.setTcpNoDelay(true);
            socket.connect(ServerAddr, 500);
            session = replSessionSecurity.createClientSession(server, socket);
            boolean isSslEncryption =
                 replSessionSecurity.isSslEncryption(server);
            /*
             * Send our ServerStartMessage.
             */
            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                halfRcvWindow*2, heartbeatInterval, state,
                protocolVersion, generationId, isSslEncryption);
            session.publish(msg);
            /*
             * Read the ReplServerStartMessage that should come back.
             */
            session.setSoTimeout(1000);
            replServerStartMsg = (ReplServerStartMessage) session.receive();
            receivedResponse = true;
            /*
             * We have sent our own protocol version to the replication server.
             * The replication server will use the same one (or an older one
             * if it is an old replication server).
             */
            protocolVersion = ProtocolVersion.minWithCurrent(
                replServerStartMsg.getVersion());
            session.setSoTimeout(timeout);
            if (!isSslEncryption)
            {
              session.stopEncryption();
            }
            /*
             * We must not publish changes to a replicationServer that has not
             * seen all our previous changes because this could cause some
             * other ldap servers to miss those changes.
             * Check that the ReplicationServer has seen all our previous
             * changes.
             * If not, try another replicationServer.
             * If no other replicationServer has seen all our changes, recover
             * those changes and send them again to any replicationServer.
             */
            ChangeNumber replServerMaxChangeNumber =
              replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
            if (replServerMaxChangeNumber == null)
            {
              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
            }
            ChangeNumber ourMaxChangeNumber =
              state.getMaxChangeNumber(serverID);
            if ((ourMaxChangeNumber != null) &&
              (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            if ((ourMaxChangeNumber == null) ||
                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            {
              // Replication server is missing some of our changes: let's send
              // them to him.
              replayOperations.clear();
              Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
              logError(message);
              /*
               * Get all the changes that have not been seen by this
               * replication server and populate the replayOperations
               * list.
               */
              InternalSearchOperation op = seachForChangedEntries(
                baseDn, replServerMaxChangeNumber, this);
              if (op.getResultCode() != ResultCode.SUCCESS)
              replicationServer = ServerAddr.toString();
              maxSendWindow = replServerStartMsg.getWindowSize();
              connected = true;
              startHeartBeat();
              break;
            }
            else
            {
              if (checkState == true)
              {
                /*
                 * An error happened trying to search for the updates
                 * This server will start acepting again new updates but
                 * some inconsistencies will stay between servers.
                 * Log an error for the repair tool
                 * that will need to resynchronize the servers.
                /* This replicationServer is missing some
                 * of our changes, we are going to try another server
                 * but before log a notice message
                 */
                message = ERR_CANNOT_RECOVER_CHANGES.get(
                  baseDn.toNormalizedString());
                logError(message);
              } else
              {
                for (FakeOperation replayOp : replayOperations)
                {
                  message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber().
                    toString());
                  logError(message);
                  session.publish(replayOp.generateMessage());
                }
                message = DEBUG_CHANGES_SENT.get();
                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
                    baseDn.toNormalizedString());
                logError(message);
              }
            }
              else
              {
                replayOperations.clear();
            replicationServer = tmpReadableServerName;
            maxSendWindow = replServerStartMsg.getWindowSize();
            connected = true;
            startHeartBeat();
          } catch (IOException e)
                 // TODO: i18n
                logError(Message.raw("going to search for changes"));
                /*
                 * Get all the changes that have not been seen by this
                 * replicationServer and populate the replayOperations
                 * list.
                 */
                InternalSearchOperation op = seachForChangedEntries(
                    baseDn, replServerMaxChangeNumber, this);
                if (op.getResultCode() != ResultCode.SUCCESS)
                {
                  /*
                   * An error happened trying to search for the updates
                   * This server will start acepting again new updates but
                   * some inconsistencies will stay between servers.
                   * Log an error for the repair tool
                   * that will need to resynchronize the servers.
                   */
                  Message message = ERR_CANNOT_RECOVER_CHANGES.get(
                      baseDn.toNormalizedString());
                  logError(message);
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = replServerStartMsg.getWindowSize();
                  connected = true;
                  startHeartBeat();
                }
                else
                {
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = replServerStartMsg.getWindowSize();
                  connected = true;
                  for (FakeOperation replayOp : replayOperations)
                  {
                    logError(Message.raw("sendingChange")); // TODO: i18n
                    session.publish(replayOp.generateMessage());
                  }
                  startHeartBeat();
                  logError(Message.raw("changes sent")); // TODO: i18n
                  break;
                }
              }
            }
          }
          catch (ConnectException e)
          {
            Message message = ERR_PUBLISHING_FAKE_OPS.get(
              baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
              stackTraceToSingleLineString(e));
            logError(message);
          } catch (Exception e)
            /*
             * There was no server waiting on this host:port
             * Log a notice and try the next replicationServer in the list
             */
            if (!connectionError )
            {
              // the error message is only logged once to avoid overflowing
              // the error log
              Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
              logError(message);
            }
          }
          catch (Exception e)
          {
            Message message = ERR_COMPUTING_FAKE_OPS.get(
              baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
              stackTraceToSingleLineString(e));
            Message message = ERR_EXCEPTION_STARTING_SESSION.get(
                baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
                stackTraceToSingleLineString(e));
            logError(message);
          } finally
          }
          finally
          {
            if (connected == false)
            {
@@ -336,60 +402,81 @@
                  session.close();
                } catch (IOException e)
                {
                // The session was already closed, just ignore.
                  // The session was already closed, just ignore.
                }
                session = null;
              }
            }
          }
        } // Could perform handshake with best
      } // Reached some servers
        } // for servers
        // We have traversed all the replication servers
        if ((!connected) && (checkState == true) && receivedResponse)
        {
          /*
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
              baseDn.toNormalizedString());
          logError(message);
          checkState = false;
        }
      }
      // We have traversed all the replication servers as many times as needed
      // to find one if one is up and running.
      if (connected)
      {
        // This server has connected correctly.
        // Log a message to let the administrator know that the failure was
        // resolved.
        // Wakeup all the thread that were waiting on the window
        // wakeup all the thread that were waiting on the window
        // on the previous connection.
        connectionError = false;
        if (sendWindow != null)
        {
          sendWindow.release(Integer.MAX_VALUE);
        }
        this.sendWindow = new Semaphore(maxSendWindow);
        connectPhaseLock.notify();
        if ((replServerStartMsg.getGenerationId() == this.generationId) ||
          (replServerStartMsg.getGenerationId() == -1))
           (replServerStartMsg.getGenerationId() == -1))
        {
          Message message =
            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
            baseDn.toString(),
            replicationServer,
            Long.toString(this.generationId));
                baseDn.toString(),
                replicationServer,
                Long.toString(this.generationId));
          logError(message);
        } else
        }
        else
        {
          Message message =
            NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
            baseDn.toString(),
            replicationServer,
            Long.toString(this.generationId),
            Long.toString(replServerStartMsg.getGenerationId()));
                baseDn.toString(),
                replicationServer,
                Long.toString(this.generationId),
                Long.toString(replServerStartMsg.getGenerationId()));
          logError(message);
        }
      } else
      }
      else
      {
        /*
         * This server could not find any replicationServer. It's going to start
         * in degraded mode. Log a message.
         * This server could not find any replicationServer
         * It's going to start in degraded mode.
         * Log a message
         */
        if (!connectionError)
        {
          checkState = false;
          connectionError = true;
          connectPhaseLock.notify();
          Message message =
            NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
              NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
          logError(message);
        }
      }
@@ -397,315 +484,6 @@
  }
  /**
   * Connect to the provided server performing the handshake (start messages
   * exchange) and return the reply message from the replication server.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
   * @return The ReplServerStartMessage the server replied. Null if could not
   *         get an answer.
   */
  public ReplServerStartMessage performHandshake(String server,
    boolean keepConnection)
  {
    ReplServerStartMessage replServerStartMsg = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
    String port = server.substring(separator + 1);
    String hostname = server.substring(0, separator);
    boolean error = false;
    try
    {
      /*
       * Open a socket connection to the next candidate.
       */
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      tmpReadableServerName = serverAddr.toString();
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(serverAddr, 500);
      session = replSessionSecurity.createClientSession(server, socket);
      boolean isSslEncryption =
        replSessionSecurity.isSslEncryption(server);
      /*
       * Send our ServerStartMessage.
       */
      ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
        maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
        halfRcvWindow * 2, heartbeatInterval, state,
        protocolVersion, generationId, isSslEncryption);
      session.publish(msg);
      /*
       * Read the ReplServerStartMessage that should come back.
       */
      session.setSoTimeout(1000);
      replServerStartMsg = (ReplServerStartMessage) session.receive();
      /*
       * We have sent our own protocol version to the replication server.
       * The replication server will use the same one (or an older one
       * if it is an old replication server).
       */
      protocolVersion = ProtocolVersion.minWithCurrent(
        replServerStartMsg.getVersion());
      session.setSoTimeout(timeout);
      if (!isSslEncryption)
      {
        session.stopEncryption();
      }
    } catch (ConnectException e)
    {
      /*
       * There was no server waiting on this host:port
       * Log a notice and try the next replicationServer in the list
       */
      if (!connectionError)
      {
        // the error message is only logged once to avoid overflowing
        // the error log
        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
        logError(message);
      }
      error = true;
    } catch (Exception e)
    {
      Message message = ERR_EXCEPTION_STARTING_SESSION.get(
        baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      logError(message);
      error = true;
    }
    // Close session if requested
    if (!keepConnection || error)
    {
      if (session != null)
      {
        try
        {
          session.close();
        } catch (IOException e)
        {
        // The session was already closed, just ignore.
        }
        session = null;
      }
      if (error)
      {
        replServerStartMsg = null;
      } // Be sure to return null.
    }
    return replServerStartMsg;
  }
  /**
   * Returns the replication server that best fits our need so that we can
   * connect to it.
   *
   * Note: this method put as public static for unit testing purpose.
   *
   * @param myState The local server state.
   * @param rsStates The list of available replication servers and their
   *                 associated server state.
   * @param serverId The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @return The computed best replication server.
   */
  public static String computeBestReplicationServer(ServerState myState,
    HashMap<String, ServerState> rsStates, short serverId, DN baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
     * if for instance we failed and restarted, having sent some changes to the
     * RS but without having time to store our own state) regarding our own
     * server id. Then, among them, choose the server that is the most up to
     * date regarding the whole topology.
     *
     * If no server is up to date regarding our own server id, find the one who
     * is the most up to date regarding our server id.
     */
    // Should never happen (sanity check)
    if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) ||
      (baseDn == null))
    {
      return null;
    }
    String bestServer = null;
    // Servers up to dates with regard to our changes
    HashMap<String, ServerState> upToDateServers =
      new HashMap<String, ServerState>();
    // Servers late with regard to our changes
    HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
    /*
     * Start loop to differenciate up to date servers from late ones.
     */
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId);
    if (myChangeNumber == null)
    {
      myChangeNumber = new ChangeNumber(0, 0, serverId);
    }
    for (String repServer : rsStates.keySet())
    {
      ServerState rsState = rsStates.get(repServer);
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId);
      if (rsChangeNumber == null)
      {
        rsChangeNumber = new ChangeNumber(0, 0, serverId);
      }
      // Store state in right list
      if (myChangeNumber.olderOrEqual(rsChangeNumber))
      {
        upToDateServers.put(repServer, rsState);
      } else
      {
        lateOnes.put(repServer, rsState);
      }
    }
    if (upToDateServers.size() > 0)
    {
      /*
       * Some up to date servers, among them, choose the one that has the
       * maximum number of changes to send us. This is the most up to date one
       * regarding the whole topology. This server is the one which has the less
       * difference with the topology server state. For comparison, we need to
       * compute the difference for each server id with the topology server
       * state.
       */
      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
        upToDateServers.size(),
        baseDn.toNormalizedString());
      logError(message);
      /*
       * First of all, compute the virtual server state for the whole topology,
       * which is composed of the most up to date change numbers for
       * each server id in the topology.
       */
      ServerState topoState = new ServerState();
      for (ServerState curState : upToDateServers.values())
      {
        Iterator<Short> it = curState.iterator();
        while (it.hasNext())
        {
          Short sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Update topology state
          topoState.update(curSidCn);
        }
      } // For up to date servers
      // Min of the max shifts
      long minShift = -1L;
      for (String upServer : upToDateServers.keySet())
      {
        /*
         * Compute the maximum difference between the time of a server id's
         * change number and the time of the matching server id's change
         * number in the topology server state.
         *
         * Note: we could have used the sequence number here instead of the
         * timestamp, but this would have caused a problem when the sequence
         * number loops and comes back to 0 (computation would have becomen
         * meaningless).
         */
        long shift = -1L;
        ServerState curState = upToDateServers.get(upServer);
        Iterator<Short> it = curState.iterator();
        while (it.hasNext())
        {
          Short sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Cannot be null as checked at construction time
          ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
          // Cannot be negative as topoState computed as being the max CN
          // for each server id in the topology
          long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
          if (tmpShift > shift)
          {
            shift = tmpShift;
          }
        }
        if ((minShift < 0) // First time in loop
          || (shift < minShift))
        {
          // This sever is even closer to topo state
          bestServer = upServer;
          minShift = shift;
        }
      } // For up to date servers
    } else
    {
      /*
       * We could not find a replication server that has seen all the
       * changes that this server has already processed,
       */
      // lateOnes cannot be empty
      Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
        baseDn.toNormalizedString(), lateOnes.size());
      logError(message);
      // Min of the shifts
      long minShift = -1L;
      for (String lateServer : lateOnes.keySet())
      {
        /*
         * Choose the server who is the closest to us regarding our server id
         * (this is the most up to date regarding our server id).
         */
        ServerState curState = lateOnes.get(lateServer);
        ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId);
        if (ourSidCn == null)
        {
          ourSidCn = new ChangeNumber(0, 0, serverId);
        }
        // Cannot be negative as our Cn for our server id is strictly
        // greater than those of the servers in late server list
        long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
        if ((minShift < 0) // First time in loop
          || (tmpShift < minShift))
        {
          // This sever is even closer to topo state
          bestServer = lateServer;
          minShift = tmpShift;
        }
      } // For late servers
    }
    return bestServer;
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute.
   * @param baseDn the base DN
@@ -715,26 +493,26 @@
   * @throws Exception when raised.
   */
  public static InternalSearchOperation seachForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
      DN baseDn,
      ChangeNumber fromChangeNumber,
      InternalSearchListener resultListener)
  throws Exception
  {
    InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
    LDAPFilter filter = LDAPFilter.decode(
      "(" + Historical.HISTORICALATTRIBUTENAME +
      ">=dummy:" + fromChangeNumber + ")");
        "("+ Historical.HISTORICALATTRIBUTENAME +
        ">=dummy:" + fromChangeNumber + ")");
    LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
    attrs.add(Historical.HISTORICALATTRIBUTENAME);
    attrs.add(Historical.ENTRYUIDNAME);
    return conn.processSearch(
      new ASN1OctetString(baseDn.toString()),
      SearchScope.WHOLE_SUBTREE,
      DereferencePolicy.NEVER_DEREF_ALIASES,
      0, 0, false, filter,
      attrs,
      resultListener);
        new ASN1OctetString(baseDn.toString()),
        SearchScope.WHOLE_SUBTREE,
        DereferencePolicy.NEVER_DEREF_ALIASES,
        0, 0, false, filter,
        attrs,
        resultListener);
  }
  /**
@@ -746,13 +524,14 @@
    if (heartbeatInterval > 0)
    {
      heartbeatMonitor =
        new HeartbeatMonitor("Replication Heartbeat Monitor on " +
        baseDn + " with " + getReplicationServer(),
        session, heartbeatInterval);
           new HeartbeatMonitor("Replication Heartbeat Monitor on " +
               baseDn + " with " + getReplicationServer(),
               session, heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
  /**
   * restart the ReplicationBroker.
   */
@@ -777,7 +556,7 @@
      }
    } catch (IOException e1)
    {
    // ignore
      // ignore
    }
    if (failingSession == session)
@@ -793,7 +572,7 @@
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
          baseDn.toNormalizedString(), e.getLocalizedMessage()));
         baseDn.toNormalizedString(), e.getLocalizedMessage()));
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
@@ -804,12 +583,13 @@
          Thread.sleep(500);
        } catch (InterruptedException e)
        {
        // ignore
          // ignore
        }
      }
    }
  }
  /**
   * Publish a message to the other servers.
   * @param msg the message to publish
@@ -818,7 +598,7 @@
  {
    boolean done = false;
    while (!done && !shutdown)
    while (!done)
    {
      if (connectionError)
      {
@@ -831,7 +611,7 @@
        if (debugEnabled())
        {
          debugInfo("ReplicationBroker.publish() Publishing a " +
            " message is not possible due to existing connection error.");
              " message is not possible due to existing connection error.");
        }
        return;
@@ -862,8 +642,9 @@
          // want to hold off reconnection in case the connection dropped.
          credit =
            currentWindowSemaphore.tryAcquire(
            (long) 500, TimeUnit.MILLISECONDS);
        } else
                (long) 500, TimeUnit.MILLISECONDS);
        }
        else
        {
          credit = true;
        }
@@ -904,22 +685,24 @@
            if (debugEnabled())
            {
              debugInfo("ReplicationBroker.publish() " +
                "IO exception raised : " + e.getLocalizedMessage());
                  "IO exception raised : " + e.getLocalizedMessage());
            }
          }
        }
      } catch (InterruptedException e)
      }
      catch (InterruptedException e)
      {
        // just loop.
        if (debugEnabled())
        {
          debugInfo("ReplicationBroker.publish() " +
            "Interrupted exception raised." + e.getLocalizedMessage());
              "Interrupted exception raised." + e.getLocalizedMessage());
        }
      }
    }
  }
  /**
   * Receive a message.
   * This method is not multithread safe and should either always be
@@ -947,7 +730,8 @@
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          sendWindow.release(windowMsg.getNumAck());
        } else
        }
        else
        {
          if (msg instanceof UpdateMessage)
          {
@@ -968,11 +752,11 @@
        if (shutdown == false)
        {
          Message message =
            NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
              NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
          logError(message);
          debugInfo("ReplicationBroker.receive() " + baseDn +
            " Exception raised." + e + e.getLocalizedMessage());
              " Exception raised." + e + e.getLocalizedMessage());
          this.reStart(failingSession);
        }
      }
@@ -980,6 +764,7 @@
    return null;
  }
  /**
   * stop the server.
   */
@@ -988,10 +773,8 @@
    replicationServer = "stopped";
    shutdown = true;
    connected = false;
    if (heartbeatMonitor != null)
    {
    if (heartbeatMonitor!= null)
      heartbeatMonitor.shutdown();
    }
    try
    {
      if (debugEnabled())
@@ -1001,12 +784,9 @@
      }
      if (session != null)
      {
        session.close();
      }
    } catch (IOException e)
    {
    }
    {}
  }
  /**
@@ -1054,13 +834,12 @@
  {
    return replicationServer;
  }
  /**
   * {@inheritDoc}
   */
  public void handleInternalSearchEntry(
    InternalSearchOperation searchOperation,
    SearchResultEntry searchEntry)
      InternalSearchOperation searchOperation,
      SearchResultEntry searchEntry)
  {
    /*
     * Only deal with modify operation so far
@@ -1083,10 +862,10 @@
   * {@inheritDoc}
   */
  public void handleInternalSearchReference(
    InternalSearchOperation searchOperation,
    SearchResultReference searchReference)
      InternalSearchOperation searchOperation,
      SearchResultReference searchReference)
  {
  // TODO to be implemented
    // TODO to be implemented
  }
  /**
@@ -1127,12 +906,9 @@
  public int getCurrentSendWindow()
  {
    if (connected)
    {
      return sendWindow.availablePermits();
    } else
    {
    else
      return 0;
    }
  }
  /**
@@ -1144,6 +920,7 @@
    return numLostConnections;
  }
  /**
   * Change some config parameters.
   *
@@ -1156,8 +933,8 @@
   * @param heartbeatInterval   The heartbeat interval.
   */
  public void changeConfig(Collection<String> replicationServers,
    int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
    int maxSendDelay, int window, long heartbeatInterval)
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval)
  {
    this.servers = replicationServers;
    this.maxRcvWindow = window;
@@ -1166,9 +943,9 @@
    this.maxReceiveQueue = maxReceiveQueue;
    this.maxSendDelay = maxSendDelay;
    this.maxSendQueue = maxSendQueue;
  // TODO : Changing those parameters requires to either restart a new
  // session with the replicationServer or renegociate the parameters that
  // were sent in the ServerStart message
    // TODO : Changing those parameters requires to either restart a new
    // session with the replicationServer or renegociate the parameters that
    // were sent in the ServerStart message
  }
  /**
@@ -1191,11 +968,7 @@
    return !connectionError;
  }
  private boolean debugEnabled()
  {
    return true;
  }
  private boolean debugEnabled() { return true; }
  private static final void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -1285,7 +1285,7 @@
      debugInfo("Successfully ending " + testCase);
    }
  }
  @Test(enabled=false)
  @Test(enabled=true)
  public void generationIdTest() throws Exception
  {
    testSingleRS();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -761,7 +761,7 @@
  /**
   * Tests the export side of the Initialize task
   */
  @Test(enabled=false)
  @Test(enabled=true)
  public void initializeExport() throws Exception
  {
    String testCase = "Replication/InitializeExport";
@@ -1185,7 +1185,7 @@
    }
  }
  @Test(enabled=false)
  @Test(enabled=true)
  public void initializeExportMultiSS() throws Exception
  {
    String testCase = "Replication/InitializeExportMultiSS";
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
File was deleted