mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
14.55.2007 c90277aee027fd834936b44a621a142cf71de444
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,6 +25,7 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -41,6 +42,8 @@
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
@@ -61,17 +64,16 @@
import org.opends.server.types.SearchResultReference;
import org.opends.server.types.SearchScope;
/**
 * The broker for Multi-master Replication.
 */
public class ReplicationBroker implements InternalSearchListener
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private boolean shutdown = false;
  private Collection<String> servers;
  private boolean connected = false;
@@ -95,23 +97,23 @@
  private long generationId = -1;
  private ReplSessionSecurity replSessionSecurity;
  // Trick for avoiding a inner class for many parameters return for
  // performHandshake method.
  private String tmpReadableServerName = null;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  /**
   * A thread to monitor heartbeats on the session.
   */
  private HeartbeatMonitor heartbeatMonitor = null;
  /**
   * The number of times the connection was lost.
   */
  private int numLostConnections = 0;
  /**
   * When the broker cannot connect to any replication server
   * it log an error and keeps continuing every second.
@@ -121,7 +123,6 @@
   * finally succeed to connect.
   */
  private boolean connectionError = false;
  private final Object connectPhaseLock = new Object();
  /**
@@ -149,9 +150,9 @@
   * @param replSessionSecurity The session security configuration.
   */
  public ReplicationBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval,
      long generationId, ReplSessionSecurity replSessionSecurity)
    int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
    int maxSendDelay, int window, long heartbeatInterval,
    long generationId, ReplSessionSecurity replSessionSecurity)
  {
    this.baseDn = baseDn;
    this.serverID = serverID;
@@ -164,7 +165,7 @@
      new TreeSet<FakeOperation>(new FakeOperationComparator());
    this.rcvWindow = window;
    this.maxRcvWindow = window;
    this.halfRcvWindow = window/2;
    this.halfRcvWindow = window / 2;
    this.heartbeatInterval = heartbeatInterval;
    this.protocolVersion = ProtocolVersion.currentVersion();
    this.generationId = generationId;
@@ -194,7 +195,6 @@
    this.connect();
  }
  /**
   * Connect to a ReplicationServer.
   *
@@ -202,7 +202,7 @@
   */
  private void connect()
  {
    ReplServerStartMessage replServerStartMsg = null;
    HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
@@ -211,187 +211,121 @@
      heartbeatMonitor = null;
    }
    // checkState is true for the first loop on all replication servers
    // looking for one already up-to-date.
    // If we found some responding replication servers but none up-to-date
    // then we set check-state to false and do a second loop where the first
    // found will be the one elected and then we will update this replication
    // server.
    boolean checkState = true;
    boolean receivedResponse = true;
    // TODO: We are doing here 2 loops opening , closing , reopening session to
    // the same servers .. risk to have 'same server id' erros.
    // Would be better to do only one loop, keeping the best candidate while
    // traversing the list of replication servers to connect to.
    if (servers.size()==1)
    {
      checkState = false;
    }
    synchronized (connectPhaseLock)
    {
      while ((!connected) && (!shutdown) && (receivedResponse))
      /*
       * Connect to each replication server and get their ServerState then find
       * out which one is the best to connect to.
       */
      for (String server : servers)
      {
        receivedResponse = false;
        for (String server : servers)
        {
          int separator = server.lastIndexOf(':');
          String port = server.substring(separator + 1);
          String hostname = server.substring(0, separator);
        // Connect to server and get reply message
        ReplServerStartMessage replServerStartMsg =
          performHandshake(server, false);
        tmpReadableServerName = null; // Not needed now
        // Store reply message in list
        if (replServerStartMsg != null)
        {
          ServerState rsState = replServerStartMsg.getServerState();
          rsStates.put(server, rsState);
        }
      } // for servers
      ReplServerStartMessage replServerStartMsg = null;
      if (rsStates.size() > 0)
      {
        // At least one server answered, find the best one.
        String bestServer = computeBestReplicationServer(state, rsStates,
          serverID, baseDn);
        // Best found, now connect to this one
        replServerStartMsg = performHandshake(bestServer, true);
        if (replServerStartMsg != null)
        {
          try
          {
            /*
             * Open a socket connection to the next candidate.
             */
            InetSocketAddress ServerAddr = new InetSocketAddress(
                InetAddress.getByName(hostname), Integer.parseInt(port));
            Socket socket = new Socket();
            socket.setReceiveBufferSize(1000000);
            socket.setTcpNoDelay(true);
            socket.connect(ServerAddr, 500);
            session = replSessionSecurity.createClientSession(server, socket);
            boolean isSslEncryption =
                 replSessionSecurity.isSslEncryption(server);
            /*
             * Send our ServerStartMessage.
             */
            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                halfRcvWindow*2, heartbeatInterval, state,
                protocolVersion, generationId, isSslEncryption);
            session.publish(msg);
            /*
             * Read the ReplServerStartMessage that should come back.
             */
            session.setSoTimeout(1000);
            replServerStartMsg = (ReplServerStartMessage) session.receive();
            receivedResponse = true;
            /*
             * We have sent our own protocol version to the replication server.
             * The replication server will use the same one (or an older one
             * if it is an old replication server).
             */
            protocolVersion = ProtocolVersion.minWithCurrent(
                replServerStartMsg.getVersion());
            session.setSoTimeout(timeout);
            if (!isSslEncryption)
            {
              session.stopEncryption();
            }
            /*
             * We must not publish changes to a replicationServer that has not
             * seen all our previous changes because this could cause some
             * other ldap servers to miss those changes.
             * Check that the ReplicationServer has seen all our previous
             * changes.
             * If not, try another replicationServer.
             * If no other replicationServer has seen all our changes, recover
             * those changes and send them again to any replicationServer.
             */
            ChangeNumber replServerMaxChangeNumber =
              replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
            if (replServerMaxChangeNumber == null)
            {
              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
            }
            ChangeNumber ourMaxChangeNumber =
              state.getMaxChangeNumber(serverID);
            if ((ourMaxChangeNumber == null) ||
                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            if ((ourMaxChangeNumber != null) &&
              (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            {
              replicationServer = ServerAddr.toString();
              maxSendWindow = replServerStartMsg.getWindowSize();
              connected = true;
              startHeartBeat();
              break;
            }
            else
            {
              if (checkState == true)
              // Replication server is missing some of our changes: let's send
              // them to him.
              replayOperations.clear();
              Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
              logError(message);
              /*
               * Get all the changes that have not been seen by this
               * replication server and populate the replayOperations
               * list.
               */
              InternalSearchOperation op = seachForChangedEntries(
                baseDn, replServerMaxChangeNumber, this);
              if (op.getResultCode() != ResultCode.SUCCESS)
              {
                /* This replicationServer is missing some
                 * of our changes, we are going to try another server
                 * but before log a notice message
                /*
                 * An error happened trying to search for the updates
                 * This server will start acepting again new updates but
                 * some inconsistencies will stay between servers.
                 * Log an error for the repair tool
                 * that will need to resynchronize the servers.
                 */
                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
                    baseDn.toNormalizedString());
                message = ERR_CANNOT_RECOVER_CHANGES.get(
                  baseDn.toNormalizedString());
                logError(message);
              } else
              {
                for (FakeOperation replayOp : replayOperations)
                {
                  message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber().
                    toString());
                  logError(message);
                  session.publish(replayOp.generateMessage());
                }
                message = DEBUG_CHANGES_SENT.get();
                logError(message);
              }
              else
              {
                replayOperations.clear();
                 // TODO: i18n
                logError(Message.raw("going to search for changes"));
                /*
                 * Get all the changes that have not been seen by this
                 * replicationServer and populate the replayOperations
                 * list.
                 */
                InternalSearchOperation op = seachForChangedEntries(
                    baseDn, replServerMaxChangeNumber, this);
                if (op.getResultCode() != ResultCode.SUCCESS)
                {
                  /*
                   * An error happened trying to search for the updates
                   * This server will start acepting again new updates but
                   * some inconsistencies will stay between servers.
                   * Log an error for the repair tool
                   * that will need to resynchronize the servers.
                   */
                  Message message = ERR_CANNOT_RECOVER_CHANGES.get(
                      baseDn.toNormalizedString());
                  logError(message);
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = replServerStartMsg.getWindowSize();
                  connected = true;
                  startHeartBeat();
                }
                else
                {
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = replServerStartMsg.getWindowSize();
                  connected = true;
                  for (FakeOperation replayOp : replayOperations)
                  {
                    logError(Message.raw("sendingChange")); // TODO: i18n
                    session.publish(replayOp.generateMessage());
                  }
                  startHeartBeat();
                  logError(Message.raw("changes sent")); // TODO: i18n
                  break;
                }
              }
            }
          }
          catch (ConnectException e)
            replicationServer = tmpReadableServerName;
            maxSendWindow = replServerStartMsg.getWindowSize();
            connected = true;
            startHeartBeat();
          } catch (IOException e)
          {
            /*
             * There was no server waiting on this host:port
             * Log a notice and try the next replicationServer in the list
             */
            if (!connectionError )
            {
              // the error message is only logged once to avoid overflowing
              // the error log
              Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
              logError(message);
            }
          }
          catch (Exception e)
          {
            Message message = ERR_EXCEPTION_STARTING_SESSION.get(
                baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
                stackTraceToSingleLineString(e));
            Message message = ERR_PUBLISHING_FAKE_OPS.get(
              baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
              stackTraceToSingleLineString(e));
            logError(message);
          }
          finally
          } catch (Exception e)
          {
            Message message = ERR_COMPUTING_FAKE_OPS.get(
              baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
              stackTraceToSingleLineString(e));
            logError(message);
          } finally
          {
            if (connected == false)
            {
@@ -402,81 +336,60 @@
                  session.close();
                } catch (IOException e)
                {
                  // The session was already closed, just ignore.
                // The session was already closed, just ignore.
                }
                session = null;
              }
            }
          }
        } // for servers
        // We have traversed all the replication servers
        if ((!connected) && (checkState == true) && receivedResponse)
        {
          /*
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
              baseDn.toNormalizedString());
          logError(message);
          checkState = false;
        }
      }
      // We have traversed all the replication servers as many times as needed
      // to find one if one is up and running.
        } // Could perform handshake with best
      } // Reached some servers
      if (connected)
      {
        // This server has connected correctly.
        // Log a message to let the administrator know that the failure was
        // resolved.
        // wakeup all the thread that were waiting on the window
        // Wakeup all the thread that were waiting on the window
        // on the previous connection.
        connectionError = false;
        if (sendWindow != null)
        {
          sendWindow.release(Integer.MAX_VALUE);
        }
        this.sendWindow = new Semaphore(maxSendWindow);
        connectPhaseLock.notify();
        if ((replServerStartMsg.getGenerationId() == this.generationId) ||
           (replServerStartMsg.getGenerationId() == -1))
          (replServerStartMsg.getGenerationId() == -1))
        {
          Message message =
            NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
                baseDn.toString(),
                replicationServer,
                Long.toString(this.generationId));
            baseDn.toString(),
            replicationServer,
            Long.toString(this.generationId));
          logError(message);
        }
        else
        } else
        {
          Message message =
            NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
                baseDn.toString(),
                replicationServer,
                Long.toString(this.generationId),
                Long.toString(replServerStartMsg.getGenerationId()));
            baseDn.toString(),
            replicationServer,
            Long.toString(this.generationId),
            Long.toString(replServerStartMsg.getGenerationId()));
          logError(message);
        }
      }
      else
      } else
      {
        /*
         * This server could not find any replicationServer
         * It's going to start in degraded mode.
         * Log a message
         * This server could not find any replicationServer. It's going to start
         * in degraded mode. Log a message.
         */
        if (!connectionError)
        {
          checkState = false;
          connectionError = true;
          connectPhaseLock.notify();
          Message message =
              NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
            NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
          logError(message);
        }
      }
@@ -484,6 +397,315 @@
  }
  /**
   * Connect to the provided server performing the handshake (start messages
   * exchange) and return the reply message from the replication server.
   *
   * @param server Server to connect to.
   * @param keepConnection Do we keep session opened or not after handshake.
   * @return The ReplServerStartMessage the server replied. Null if could not
   *         get an answer.
   */
  public ReplServerStartMessage performHandshake(String server,
    boolean keepConnection)
  {
    ReplServerStartMessage replServerStartMsg = null;
    // Parse server string.
    int separator = server.lastIndexOf(':');
    String port = server.substring(separator + 1);
    String hostname = server.substring(0, separator);
    boolean error = false;
    try
    {
      /*
       * Open a socket connection to the next candidate.
       */
      int intPort = Integer.parseInt(port);
      InetSocketAddress serverAddr = new InetSocketAddress(
        InetAddress.getByName(hostname), intPort);
      tmpReadableServerName = serverAddr.toString();
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(serverAddr, 500);
      session = replSessionSecurity.createClientSession(server, socket);
      boolean isSslEncryption =
        replSessionSecurity.isSslEncryption(server);
      /*
       * Send our ServerStartMessage.
       */
      ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
        maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
        halfRcvWindow * 2, heartbeatInterval, state,
        protocolVersion, generationId, isSslEncryption);
      session.publish(msg);
      /*
       * Read the ReplServerStartMessage that should come back.
       */
      session.setSoTimeout(1000);
      replServerStartMsg = (ReplServerStartMessage) session.receive();
      /*
       * We have sent our own protocol version to the replication server.
       * The replication server will use the same one (or an older one
       * if it is an old replication server).
       */
      protocolVersion = ProtocolVersion.minWithCurrent(
        replServerStartMsg.getVersion());
      session.setSoTimeout(timeout);
      if (!isSslEncryption)
      {
        session.stopEncryption();
      }
    } catch (ConnectException e)
    {
      /*
       * There was no server waiting on this host:port
       * Log a notice and try the next replicationServer in the list
       */
      if (!connectionError)
      {
        // the error message is only logged once to avoid overflowing
        // the error log
        Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
        logError(message);
      }
      error = true;
    } catch (Exception e)
    {
      Message message = ERR_EXCEPTION_STARTING_SESSION.get(
        baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
        stackTraceToSingleLineString(e));
      logError(message);
      error = true;
    }
    // Close session if requested
    if (!keepConnection || error)
    {
      if (session != null)
      {
        try
        {
          session.close();
        } catch (IOException e)
        {
        // The session was already closed, just ignore.
        }
        session = null;
      }
      if (error)
      {
        replServerStartMsg = null;
      } // Be sure to return null.
    }
    return replServerStartMsg;
  }
  /**
   * Returns the replication server that best fits our need so that we can
   * connect to it.
   *
   * Note: this method put as public static for unit testing purpose.
   *
   * @param myState The local server state.
   * @param rsStates The list of available replication servers and their
   *                 associated server state.
   * @param serverId The server id for the suffix we are working for.
   * @param baseDn The suffix for which we are working for.
   * @return The computed best replication server.
   */
  public static String computeBestReplicationServer(ServerState myState,
    HashMap<String, ServerState> rsStates, short serverId, DN baseDn)
  {
    /*
     * Find replication servers who are up to date (or more up to date than us,
     * if for instance we failed and restarted, having sent some changes to the
     * RS but without having time to store our own state) regarding our own
     * server id. Then, among them, choose the server that is the most up to
     * date regarding the whole topology.
     *
     * If no server is up to date regarding our own server id, find the one who
     * is the most up to date regarding our server id.
     */
    // Should never happen (sanity check)
    if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) ||
      (baseDn == null))
    {
      return null;
    }
    String bestServer = null;
    // Servers up to dates with regard to our changes
    HashMap<String, ServerState> upToDateServers =
      new HashMap<String, ServerState>();
    // Servers late with regard to our changes
    HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
    /*
     * Start loop to differenciate up to date servers from late ones.
     */
    ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId);
    if (myChangeNumber == null)
    {
      myChangeNumber = new ChangeNumber(0, 0, serverId);
    }
    for (String repServer : rsStates.keySet())
    {
      ServerState rsState = rsStates.get(repServer);
      ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId);
      if (rsChangeNumber == null)
      {
        rsChangeNumber = new ChangeNumber(0, 0, serverId);
      }
      // Store state in right list
      if (myChangeNumber.olderOrEqual(rsChangeNumber))
      {
        upToDateServers.put(repServer, rsState);
      } else
      {
        lateOnes.put(repServer, rsState);
      }
    }
    if (upToDateServers.size() > 0)
    {
      /*
       * Some up to date servers, among them, choose the one that has the
       * maximum number of changes to send us. This is the most up to date one
       * regarding the whole topology. This server is the one which has the less
       * difference with the topology server state. For comparison, we need to
       * compute the difference for each server id with the topology server
       * state.
       */
      Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
        upToDateServers.size(),
        baseDn.toNormalizedString());
      logError(message);
      /*
       * First of all, compute the virtual server state for the whole topology,
       * which is composed of the most up to date change numbers for
       * each server id in the topology.
       */
      ServerState topoState = new ServerState();
      for (ServerState curState : upToDateServers.values())
      {
        Iterator<Short> it = curState.iterator();
        while (it.hasNext())
        {
          Short sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Update topology state
          topoState.update(curSidCn);
        }
      } // For up to date servers
      // Min of the max shifts
      long minShift = -1L;
      for (String upServer : upToDateServers.keySet())
      {
        /*
         * Compute the maximum difference between the time of a server id's
         * change number and the time of the matching server id's change
         * number in the topology server state.
         *
         * Note: we could have used the sequence number here instead of the
         * timestamp, but this would have caused a problem when the sequence
         * number loops and comes back to 0 (computation would have becomen
         * meaningless).
         */
        long shift = -1L;
        ServerState curState = upToDateServers.get(upServer);
        Iterator<Short> it = curState.iterator();
        while (it.hasNext())
        {
          Short sId = it.next();
          ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
          if (curSidCn == null)
          {
            curSidCn = new ChangeNumber(0, 0, sId);
          }
          // Cannot be null as checked at construction time
          ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
          // Cannot be negative as topoState computed as being the max CN
          // for each server id in the topology
          long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
          if (tmpShift > shift)
          {
            shift = tmpShift;
          }
        }
        if ((minShift < 0) // First time in loop
          || (shift < minShift))
        {
          // This sever is even closer to topo state
          bestServer = upServer;
          minShift = shift;
        }
      } // For up to date servers
    } else
    {
      /*
       * We could not find a replication server that has seen all the
       * changes that this server has already processed,
       */
      // lateOnes cannot be empty
      Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
        baseDn.toNormalizedString(), lateOnes.size());
      logError(message);
      // Min of the shifts
      long minShift = -1L;
      for (String lateServer : lateOnes.keySet())
      {
        /*
         * Choose the server who is the closest to us regarding our server id
         * (this is the most up to date regarding our server id).
         */
        ServerState curState = lateOnes.get(lateServer);
        ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId);
        if (ourSidCn == null)
        {
          ourSidCn = new ChangeNumber(0, 0, serverId);
        }
        // Cannot be negative as our Cn for our server id is strictly
        // greater than those of the servers in late server list
        long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
        if ((minShift < 0) // First time in loop
          || (tmpShift < minShift))
        {
          // This sever is even closer to topo state
          bestServer = lateServer;
          minShift = tmpShift;
        }
      } // For late servers
    }
    return bestServer;
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute.
   * @param baseDn the base DN
@@ -493,26 +715,26 @@
   * @throws Exception when raised.
   */
  public static InternalSearchOperation seachForChangedEntries(
      DN baseDn,
      ChangeNumber fromChangeNumber,
      InternalSearchListener resultListener)
  throws Exception
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  {
    InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
    LDAPFilter filter = LDAPFilter.decode(
        "("+ Historical.HISTORICALATTRIBUTENAME +
        ">=dummy:" + fromChangeNumber + ")");
      "(" + Historical.HISTORICALATTRIBUTENAME +
      ">=dummy:" + fromChangeNumber + ")");
    LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
    attrs.add(Historical.HISTORICALATTRIBUTENAME);
    attrs.add(Historical.ENTRYUIDNAME);
    return conn.processSearch(
        new ASN1OctetString(baseDn.toString()),
        SearchScope.WHOLE_SUBTREE,
        DereferencePolicy.NEVER_DEREF_ALIASES,
        0, 0, false, filter,
        attrs,
        resultListener);
      new ASN1OctetString(baseDn.toString()),
      SearchScope.WHOLE_SUBTREE,
      DereferencePolicy.NEVER_DEREF_ALIASES,
      0, 0, false, filter,
      attrs,
      resultListener);
  }
  /**
@@ -524,14 +746,13 @@
    if (heartbeatInterval > 0)
    {
      heartbeatMonitor =
           new HeartbeatMonitor("Replication Heartbeat Monitor on " +
               baseDn + " with " + getReplicationServer(),
               session, heartbeatInterval);
        new HeartbeatMonitor("Replication Heartbeat Monitor on " +
        baseDn + " with " + getReplicationServer(),
        session, heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
  /**
   * restart the ReplicationBroker.
   */
@@ -556,7 +777,7 @@
      }
    } catch (IOException e1)
    {
      // ignore
    // ignore
    }
    if (failingSession == session)
@@ -572,7 +793,7 @@
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
         baseDn.toNormalizedString(), e.getLocalizedMessage()));
          baseDn.toNormalizedString(), e.getLocalizedMessage()));
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
@@ -583,13 +804,12 @@
          Thread.sleep(500);
        } catch (InterruptedException e)
        {
          // ignore
        // ignore
        }
      }
    }
  }
  /**
   * Publish a message to the other servers.
   * @param msg the message to publish
@@ -598,7 +818,7 @@
  {
    boolean done = false;
    while (!done)
    while (!done && !shutdown)
    {
      if (connectionError)
      {
@@ -611,7 +831,7 @@
        if (debugEnabled())
        {
          debugInfo("ReplicationBroker.publish() Publishing a " +
              " message is not possible due to existing connection error.");
            " message is not possible due to existing connection error.");
        }
        return;
@@ -642,9 +862,8 @@
          // want to hold off reconnection in case the connection dropped.
          credit =
            currentWindowSemaphore.tryAcquire(
                (long) 500, TimeUnit.MILLISECONDS);
        }
        else
            (long) 500, TimeUnit.MILLISECONDS);
        } else
        {
          credit = true;
        }
@@ -685,24 +904,22 @@
            if (debugEnabled())
            {
              debugInfo("ReplicationBroker.publish() " +
                  "IO exception raised : " + e.getLocalizedMessage());
                "IO exception raised : " + e.getLocalizedMessage());
            }
          }
        }
      }
      catch (InterruptedException e)
      } catch (InterruptedException e)
      {
        // just loop.
        if (debugEnabled())
        {
          debugInfo("ReplicationBroker.publish() " +
              "Interrupted exception raised." + e.getLocalizedMessage());
            "Interrupted exception raised." + e.getLocalizedMessage());
        }
      }
    }
  }
  /**
   * Receive a message.
   * This method is not multithread safe and should either always be
@@ -730,8 +947,7 @@
        {
          WindowMessage windowMsg = (WindowMessage) msg;
          sendWindow.release(windowMsg.getNumAck());
        }
        else
        } else
        {
          if (msg instanceof UpdateMessage)
          {
@@ -752,11 +968,11 @@
        if (shutdown == false)
        {
          Message message =
              NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
            NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
          logError(message);
          debugInfo("ReplicationBroker.receive() " + baseDn +
              " Exception raised." + e + e.getLocalizedMessage());
            " Exception raised." + e + e.getLocalizedMessage());
          this.reStart(failingSession);
        }
      }
@@ -764,7 +980,6 @@
    return null;
  }
  /**
   * stop the server.
   */
@@ -773,8 +988,10 @@
    replicationServer = "stopped";
    shutdown = true;
    connected = false;
    if (heartbeatMonitor!= null)
    if (heartbeatMonitor != null)
    {
      heartbeatMonitor.shutdown();
    }
    try
    {
      if (debugEnabled())
@@ -784,9 +1001,12 @@
      }
      if (session != null)
      {
        session.close();
      }
    } catch (IOException e)
    {}
    {
    }
  }
  /**
@@ -834,12 +1054,13 @@
  {
    return replicationServer;
  }
  /**
   * {@inheritDoc}
   */
  public void handleInternalSearchEntry(
      InternalSearchOperation searchOperation,
      SearchResultEntry searchEntry)
    InternalSearchOperation searchOperation,
    SearchResultEntry searchEntry)
  {
    /*
     * Only deal with modify operation so far
@@ -862,10 +1083,10 @@
   * {@inheritDoc}
   */
  public void handleInternalSearchReference(
      InternalSearchOperation searchOperation,
      SearchResultReference searchReference)
    InternalSearchOperation searchOperation,
    SearchResultReference searchReference)
  {
    // TODO to be implemented
  // TODO to be implemented
  }
  /**
@@ -906,9 +1127,12 @@
  public int getCurrentSendWindow()
  {
    if (connected)
    {
      return sendWindow.availablePermits();
    else
    } else
    {
      return 0;
    }
  }
  /**
@@ -920,7 +1144,6 @@
    return numLostConnections;
  }
  /**
   * Change some config parameters.
   *
@@ -933,8 +1156,8 @@
   * @param heartbeatInterval   The heartbeat interval.
   */
  public void changeConfig(Collection<String> replicationServers,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval)
    int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
    int maxSendDelay, int window, long heartbeatInterval)
  {
    this.servers = replicationServers;
    this.maxRcvWindow = window;
@@ -943,9 +1166,9 @@
    this.maxReceiveQueue = maxReceiveQueue;
    this.maxSendDelay = maxSendDelay;
    this.maxSendQueue = maxSendQueue;
    // TODO : Changing those parameters requires to either restart a new
    // session with the replicationServer or renegociate the parameters that
    // were sent in the ServerStart message
  // TODO : Changing those parameters requires to either restart a new
  // session with the replicationServer or renegociate the parameters that
  // were sent in the ServerStart message
  }
  /**
@@ -968,7 +1191,11 @@
    return !connectionError;
  }
  private boolean debugEnabled() { return true; }
  private boolean debugEnabled()
  {
    return true;
  }
  private static final void debugInfo(String s)
  {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));