mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
04.12.2007 23b1e20ff9fe938572a0b62ec5a12f12154445df
The problem was that the publisher thread is stuck waiting for the window
to re-open on a connection that has been closed without notifying the publisher.

Several changes were done to avoid this :

- reading the monitoring information does not acquire the lock on the PendingChanges object anymore so that we can use it to debug such problems.

- When a connection to a server goes down, the operation now never
tries to re-open the connection, but wait for the receiver thread to do it.
The operation thread wait in the post-op until the reconnection is finished or until the receiver thread has found that there are no replication server available.

- tries to make the window mechanism more robustby introducing a loop around
the sendWindow.acquire() call so that the publisher thread is never
blocked indefinitely in this call in case of bugs or other problems
that could lead to this situation.
Also add a WindowProbe message that is sent to the replication server when the publisher notice that the window has been closed for a while to check if the window is really closed.

- notify the publisher thread when the connection has been shutdown.
1 files added
12 files modified
745 ■■■■■ changed files
opends/src/server/org/opends/server/messages/ReplicationMessages.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 510 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/WindowMessage.java 12 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/WindowProbe.java 84 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 31 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 8 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 14 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 60 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -444,6 +444,12 @@
  /**
   * The connection to the curent Replication Server has failed.
   */
  public static final int MSGID_DISCONNECTED_FROM_CHANGELOG =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 63;
  /**
   * Register the messages from this class in the core server.
   *
   */
@@ -607,6 +613,9 @@
        "The Replication is configured for suffix  %s "
        + "but was not able to connect to any Replication Server");
    registerMessage(MSGID_NOW_FOUND_CHANGELOG,
        "A Replication Server was found for suffix %s");
        "Replication Server %s now used for Replication Domain %s");
    registerMessage(MSGID_DISCONNECTED_FROM_CHANGELOG,
        "The connection to Replication Server %s has been dropped by the "
        + "Replication Server");
  }
}
opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
@@ -54,7 +54,6 @@
  public HistoricalCsnOrderingMatchingRule()
  {
    super();
    // TODO Auto-generated constructor stub
  }
  /**
@@ -81,7 +80,7 @@
  @Override
  public void initializeMatchingRule(OrderingMatchingRuleCfg configuration)
  {
    // TODO Auto-generated method stub
    // No implementation needed here.
  }
  /**
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -106,7 +106,7 @@
   *
   * @return The number of update currently in the list.
   */
  public synchronized int size()
  public int size()
  {
    return pendingChanges.size();
  }
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -44,6 +44,7 @@
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -60,6 +61,7 @@
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ErrorLogCategory;
@@ -83,7 +85,6 @@
  private boolean shutdown = false;
  private Collection<String> servers;
  private boolean connected = false;
  private final Object lock = new Object();
  private String replicationServer = "Not connected";
  private TreeSet<FakeOperation> replayOperations;
  private ProtocolSession session = null;
@@ -120,7 +121,7 @@
  private int numLostConnections = 0;
  /**
   * When the broker cannort connect to any replication server
   * When the broker cannot connect to any replication server
   * it log an error and keeps continuing every second.
   * This boolean is set when the first failure happens and is used
   * to avoid repeating the error message for further failure to connect
@@ -129,6 +130,8 @@
   */
  private boolean connectionError = false;
  private Object connectPhaseLock = new Object();
  /**
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
   *
@@ -217,234 +220,243 @@
    boolean checkState = true;
    boolean receivedResponse = true;
    while ((!connected) && (!shutdown) && (receivedResponse))
    synchronized (connectPhaseLock)
    {
      receivedResponse = false;
      for (String server : servers)
      while ((!connected) && (!shutdown) && (receivedResponse))
      {
        int separator = server.lastIndexOf(':');
        String port = server.substring(separator + 1);
        String hostname = server.substring(0, separator);
        try
        receivedResponse = false;
        for (String server : servers)
        {
          /*
           * Open a socket connection to the next candidate.
           */
          InetSocketAddress ServerAddr = new InetSocketAddress(
              InetAddress.getByName(hostname), Integer.parseInt(port));
          Socket socket = new Socket();
          socket.setReceiveBufferSize(1000000);
          socket.setTcpNoDelay(true);
          socket.connect(ServerAddr, 500);
          session = new SocketSession(socket);
          int separator = server.lastIndexOf(':');
          String port = server.substring(separator + 1);
          String hostname = server.substring(0, separator);
          /*
           * Send our ServerStartMessage.
           */
          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
              halfRcvWindow*2, heartbeatInterval, state,
              protocolVersion);
          session.publish(msg);
          /*
           * Read the ReplServerStartMessage that should come back.
           */
          session.setSoTimeout(1000);
          startMsg = (ReplServerStartMessage) session.receive();
          receivedResponse = true;
          /*
           * We have sent our own protocol version to the replication server.
           * The replication server will use the same one (or an older one
           * if it is an old replication server).
           */
          protocolVersion = ProtocolVersion.minWithCurrent(
              startMsg.getVersion());
          session.setSoTimeout(timeout);
          /*
           * We must not publish changes to a replicationServer that has not
           * seen all our previous changes because this could cause some
           * other ldap servers to miss those changes.
           * Check that the ReplicationServer has seen all our previous changes.
           * If not, try another replicationServer.
           * If no other replicationServer has seen all our changes, recover
           * those changes and send them again to any replicationServer.
           */
          ChangeNumber replServerMaxChangeNumber =
            startMsg.getServerState().getMaxChangeNumber(serverID);
          if (replServerMaxChangeNumber == null)
            replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
          ChangeNumber ourMaxChangeNumber =  state.getMaxChangeNumber(serverID);
          if ((ourMaxChangeNumber == null) ||
              (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
          try
          {
            replicationServer = ServerAddr.toString();
            maxSendWindow = startMsg.getWindowSize();
            this.sendWindow = new Semaphore(maxSendWindow);
            connected = true;
            startHeartBeat();
            break;
          }
          else
          {
            if (checkState == true)
            /*
             * Open a socket connection to the next candidate.
             */
            InetSocketAddress ServerAddr = new InetSocketAddress(
                InetAddress.getByName(hostname), Integer.parseInt(port));
            Socket socket = new Socket();
            socket.setReceiveBufferSize(1000000);
            socket.setTcpNoDelay(true);
            socket.connect(ServerAddr, 500);
            session = new SocketSession(socket);
            /*
             * Send our ServerStartMessage.
             */
            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                halfRcvWindow*2, heartbeatInterval, state,
                protocolVersion);
            session.publish(msg);
            /*
             * Read the ReplServerStartMessage that should come back.
             */
            session.setSoTimeout(1000);
            startMsg = (ReplServerStartMessage) session.receive();
            receivedResponse = true;
            /*
             * We have sent our own protocol version to the replication server.
             * The replication server will use the same one (or an older one
             * if it is an old replication server).
             */
            protocolVersion = ProtocolVersion.minWithCurrent(
                startMsg.getVersion());
            session.setSoTimeout(timeout);
            /*
             * We must not publish changes to a replicationServer that has not
             * seen all our previous changes because this could cause some
             * other ldap servers to miss those changes.
             * Check that the ReplicationServer has seen all our previous
             * changes.
             * If not, try another replicationServer.
             * If no other replicationServer has seen all our changes, recover
             * those changes and send them again to any replicationServer.
             */
            ChangeNumber replServerMaxChangeNumber =
              startMsg.getServerState().getMaxChangeNumber(serverID);
            if (replServerMaxChangeNumber == null)
              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
            ChangeNumber ourMaxChangeNumber =
              state.getMaxChangeNumber(serverID);
            if ((ourMaxChangeNumber == null) ||
                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            {
              /* This replicationServer is missing some
               * of our changes, we are going to try another server
               * but before log a notice message
               */
              int    msgID   = MSGID_CHANGELOG_MISSING_CHANGES;
              String message = getMessage(msgID, server);
              logError(ErrorLogCategory.SYNCHRONIZATION,
                       ErrorLogSeverity.NOTICE,
                       message, msgID);
              replicationServer = ServerAddr.toString();
              maxSendWindow = startMsg.getWindowSize();
              connected = true;
              startHeartBeat();
              break;
            }
            else
            {
              replayOperations.clear();
              /*
               * Get all the changes that have not been seen by this
               * replicationServer and update it
               */
              InternalClientConnection conn =
                  InternalClientConnection.getRootConnection();
              LDAPFilter filter = LDAPFilter.decode(
                  "("+ Historical.HISTORICALATTRIBUTENAME +
                  ">=dummy:" + replServerMaxChangeNumber + ")");
              LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
              attrs.add(Historical.HISTORICALATTRIBUTENAME);
              InternalSearchOperation op = conn.processSearch(
                  new ASN1OctetString(baseDn.toString()),
                  SearchScope.WHOLE_SUBTREE,
                  DereferencePolicy.NEVER_DEREF_ALIASES,
                  0, 0, false, filter,
                  attrs, this);
              if (op.getResultCode() != ResultCode.SUCCESS)
              if (checkState == true)
              {
                /*
                 * An error happened trying to search for the updates
                 * This server therefore can't start acepting new updates.
                 * TODO : should stop the LDAP server (how to ?)
                /* This replicationServer is missing some
                 * of our changes, we are going to try another server
                 * but before log a notice message
                 */
                int    msgID   = MSGID_CANNOT_RECOVER_CHANGES;
                String message = getMessage(msgID);
                int    msgID   = MSGID_CHANGELOG_MISSING_CHANGES;
                String message = getMessage(msgID, server);
                logError(ErrorLogCategory.SYNCHRONIZATION,
                         ErrorLogSeverity.FATAL_ERROR,
                         message, msgID);
                    ErrorLogSeverity.NOTICE,
                    message, msgID);
              }
              else
              {
                replicationServer = ServerAddr.toString();
                maxSendWindow = startMsg.getWindowSize();
                this.sendWindow = new Semaphore(maxSendWindow);
                connected = true;
                for (FakeOperation replayOp : replayOperations)
                replayOperations.clear();
                logError(ErrorLogCategory.SYNCHRONIZATION,
                    ErrorLogSeverity.NOTICE,
                    "going to search for changes", 1);
                /*
                 * Get all the changes that have not been seen by this
                 * replicationServer and update it
                 */
                InternalClientConnection conn =
                  InternalClientConnection.getRootConnection();
                LDAPFilter filter = LDAPFilter.decode(
                    "("+ Historical.HISTORICALATTRIBUTENAME +
                    ">=dummy:" + replServerMaxChangeNumber + ")");
                LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
                attrs.add(Historical.HISTORICALATTRIBUTENAME);
                InternalSearchOperation op = conn.processSearch(
                    new ASN1OctetString(baseDn.toString()),
                    SearchScope.WHOLE_SUBTREE,
                    DereferencePolicy.NEVER_DEREF_ALIASES,
                    0, 0, false, filter,
                    attrs, this);
                if (op.getResultCode() != ResultCode.SUCCESS)
                {
                  publish(replayOp.generateMessage());
                  /*
                   * An error happened trying to search for the updates
                   * This server will start acepting again new updates but
                   * some inconsistencies will stay between servers.
                   * TODO : REPAIR : log an error for the repair tool
                   * that will need to resynchronize the servers.
                   */
                  int    msgID   = MSGID_CANNOT_RECOVER_CHANGES;
                  String message = getMessage(msgID);
                  logError(ErrorLogCategory.SYNCHRONIZATION,
                      ErrorLogSeverity.FATAL_ERROR,
                      message, msgID);
                }
                startHeartBeat();
                break;
                else
                {
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = startMsg.getWindowSize();
                  connected = true;
                  for (FakeOperation replayOp : replayOperations)
                  {
                    logError(ErrorLogCategory.SYNCHRONIZATION,
                        ErrorLogSeverity.NOTICE,
                        "sendingChange", 1);
                    session.publish(replayOp.generateMessage());
                  }
                  startHeartBeat();
                  logError(ErrorLogCategory.SYNCHRONIZATION,
                      ErrorLogSeverity.NOTICE,
                      "changes sent", 1);
                  break;
                }
              }
            }
          }
          catch (ConnectException e)
          {
            /*
             * There was no server waiting on this host:port
             * Log a notice and try the next replicationServer in the list
             */
            if (!connectionError )
            {
              // the error message is only logged once to avoid overflowing
              // the error log
              int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
              String message = getMessage(msgID, server);
              logError(ErrorLogCategory.SYNCHRONIZATION,
                  ErrorLogSeverity.NOTICE,
                  message, msgID);
            }
          }
          catch (Exception e)
          {
            int    msgID   = MSGID_EXCEPTION_STARTING_SESSION;
            String message = getMessage(msgID);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.SEVERE_ERROR,
                message + stackTraceToSingleLineString(e), msgID);
          }
          finally
          {
            if (connected == false)
            {
              if (session != null)
              {
                session.close();
                session = null;
              }
            }
          }
        }
        catch (ConnectException e)
        if ((!connected) && (checkState == true) && receivedResponse)
        {
          /*
           * There was no server waiting on this host:port
           * Log a notice and try the next replicationServer in the list
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          if (!connectionError )
          {
            // the error message is only logged once to avoid overflowing
            // the error log
            int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
            String message = getMessage(msgID, server);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.NOTICE,
                message, msgID);
          }
        }
        catch (Exception e)
        {
          int    msgID   = MSGID_EXCEPTION_STARTING_SESSION;
          String message = getMessage(msgID)  + stackTraceToSingleLineString(e);
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
          String message = getMessage(msgID);
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
        }
        finally
        {
          if (connected == false)
          {
            if (session != null)
            {
              logError(ErrorLogCategory.SYNCHRONIZATION,
                  ErrorLogSeverity.NOTICE,
                  "Broker : connect closing session" , 1);
              session.close();
              session = null;
            }
          }
              ErrorLogSeverity.NOTICE,
              message, msgID);
          checkState = false;
        }
      }
      if ((!connected) && (checkState == true) && receivedResponse)
      if (connected)
      {
        // This server has connected correctly.
        // Log a message to let the administrator know that the failure was
        // resolved.
        // wakeup all the thread that were waiting on the window
        // on the previous connection.
        connectionError = false;
        if (sendWindow != null)
          sendWindow.release(Integer.MAX_VALUE);
        this.sendWindow = new Semaphore(maxSendWindow);
        connectPhaseLock.notify();
        int    msgID   = MSGID_NOW_FOUND_CHANGELOG;
        String message =
          getMessage(msgID, replicationServer, baseDn.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE, message, msgID);
      }
      else
      {
        /*
         * We could not find a replicationServer that has seen all the
         * changes that this server has already processed, start again
         * the loop looking for any replicationServer.
         * This server could not find any replicationServer
         * It's going to start in degraded mode.
         * Log a message
         */
        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
        String message = getMessage(msgID);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            message, msgID);
        try
        if (!connectionError)
        {
          Thread.sleep(500);
        } catch (InterruptedException e)
        {
          checkState = false;
          connectionError = true;
          connectPhaseLock.notify();
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
          String message = getMessage(msgID, baseDn.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE, message, msgID);
        }
        checkState = false;
      }
    }
    if (connected)
    {
      // This server has connected correctly.
      // let's check if it was previosuly on error, in this case log
      // a message to let the administratot know that the failure was resolved.
      if (connectionError)
      {
        connectionError = false;
        int    msgID   = MSGID_NOW_FOUND_CHANGELOG;
        String message = getMessage(msgID, baseDn.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE, message, msgID);
      }
    }
    else
    {
      /*
       * This server could not find any replicationServer
       * It's going to start in degraded mode.
       * Log a message
       */
      if (!connectionError)
      {
        checkState = false;
        connectionError = true;
        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
        String message = getMessage(msgID, baseDn.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE, message, msgID);
      }
    }
  }
@@ -530,30 +542,90 @@
  public void publish(ReplicationMessage msg)
  {
    boolean done = false;
    ProtocolSession failingSession = session;
    while (!done)
    {
      if (connectionError)
        return;
      synchronized (lock)
      {
        try
        // It was not possible to connect to any replication server.
        // Since the operation was already processed, we have no other
        // choice than to return without sending the ReplicationMessage
        // and relying on the resend procedure of the connect phase to
        // fix the problem when we finally connect.
        return;
      }
      try
      {
        boolean credit;
        ProtocolSession current_session;
        Semaphore currentWindowSemaphore;
        // save the session at the time when we acquire the
        // sendwindow credit so that we can make sure later
        // that the session did not change in between.
        // This is necessary to make sure that we don't publish a message
        // on a session with a credit that was acquired from a previous
        // session.
        synchronized (connectPhaseLock)
        {
          if (this.connected == false)
            this.reStart(failingSession);
          if (msg instanceof UpdateMessage)
            sendWindow.acquire();
          session.publish(msg);
          done = true;
        } catch (IOException e)
        {
          this.reStart(failingSession);
          current_session = session;
          currentWindowSemaphore = sendWindow;
        }
        catch (InterruptedException e)
        if (msg instanceof UpdateMessage)
        {
          this.reStart(failingSession);
          // Acquiring the window credit must be done outside of the
          // connectPhaseLock because it can be blocking and we don't
          // want to hold off reconnection in case the connection dropped.
          credit =
            currentWindowSemaphore.tryAcquire(
                (long) 500, TimeUnit.MILLISECONDS);
        }
        else
        {
          credit = true;
        }
        if (credit)
        {
          synchronized (connectPhaseLock)
          {
            // check the session. If it has changed, some
            // deconnection/reconnection happened and we need to restart from
            // scratch.
            if (session == current_session)
            {
              session.publish(msg);
              done = true;
            }
          }
        }
        if (!credit)
        {
          // the window is still closed.
          // Send a WindowProbe message to wakeup the receiver in case the
          // window update message was lost somehow...
          // then loop to check again if connection was closed.
          session.publish(new WindowProbe());
        }
      } catch (IOException e)
      {
        // The receive threads should handle reconnection or
        // mark this broker in error. Just retry.
        synchronized (connectPhaseLock)
        {
          try
          {
            connectPhaseLock.wait(100);
          } catch (InterruptedException e1)
          {
            // ignore
          }
        }
      }
      catch (InterruptedException e)
      {
        // just loop.
      }
    }
  }
@@ -561,6 +633,10 @@
  /**
   * Receive a message.
   * This method is not multithread safe and should either always be
   * called in a single thread or protected by a locking mechanism
   * before being called.
   *
   * @return the received message
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
@@ -603,10 +679,12 @@
      {
        if (shutdown == false)
        {
          synchronized (lock)
          {
            this.reStart(failingSession);
          }
          int    msgID   = MSGID_DISCONNECTED_FROM_CHANGELOG;
          String message = getMessage(msgID, replicationServer);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE,
              message + " " + e.getMessage(), msgID);
          this.reStart(failingSession);
        }
      }
    }
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -561,7 +561,7 @@
  {
    if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
    {
      // this policy imply that we always aceept updates.
      // this policy imply that we always accept updates.
      return true;
    }
    if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
@@ -587,7 +587,7 @@
      }
    }
    // we should never get there as the only possible policies are
    // ACCEPT_UPDATES and DENY_UPDATES
    // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
    return true;
  }
opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
@@ -59,7 +59,7 @@
  @Override()
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
  {
    // TODO Auto-generated method stub
    // no implementation needed.
  }
  /**
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -52,6 +52,7 @@
  static final byte MSG_TYPE_ENTRY = 12;
  static final byte MSG_TYPE_DONE = 13;
  static final byte MSG_TYPE_ERROR = 14;
  static final byte MSG_TYPE_WINDOW_PROBE = 15;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
@@ -136,6 +137,9 @@
      case MSG_TYPE_ERROR:
        msg = new ErrorMessage(buffer);
      break;
      case MSG_TYPE_WINDOW_PROBE:
        msg = new WindowProbe(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opends/src/server/org/opends/server/replication/protocol/WindowMessage.java
@@ -32,9 +32,13 @@
/**
 * This message is used by LDAP server when they first connect.
 * to a replication server to let them know who they are and what is their state
 * (their RUV)
 * This message is used by LDAP server or by Replication Servers to
 * update the send window of the remote entities.
 *
 * A receiving entity should create such a message with a given credit
 * when it wants to open the send window of the remote entity.
 * A LDAP or Replication Server should increase its send window when receiving
 * such a message.
 */
public class WindowMessage extends ReplicationMessage implements
    Serializable
@@ -47,7 +51,7 @@
   * Create a new WindowMessage.
   *
   * @param numAck The number of acknowledged messages.
   *               The window will be increase by this number.
   *               The window will be increase by this credit number.
   */
  public WindowMessage(int numAck)
  {
opends/src/server/org/opends/server/replication/protocol/WindowProbe.java
New file
@@ -0,0 +1,84 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.Serializable;
import java.util.zip.DataFormatException;
/**
 * This message is used by LDAP or Replication Server that have been
 * out of credit for a while and want to check that the remote servers.
 *
 * A sending entity that is blocked because its send window is closed
 * for a while should create such a message to check that the window
 * closure is valid.
 *
 * An entity that received such a message should respond with a
 * WindowUpdate message indicating the curent credit available.
 */
public class WindowProbe extends ReplicationMessage implements
    Serializable
{
  private static final long serialVersionUID = 8442267608764026867L;
  /**
   * Create a new WindowProbe message.
   */
  public WindowProbe()
  {
  }
  /**
   * Creates a new WindowProbe from its encoded form.
   *
   * @param in The byte array containing the encoded form of the
   *           WindowMessage.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the WindowMessage.
   */
  public WindowProbe(byte[] in) throws DataFormatException
  {
    // WindowProbe Message only contains its type.
    if (in[0] != MSG_TYPE_WINDOW_PROBE)
      throw new DataFormatException("input is not a valid Window Message");
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    // WindowProbe Message only contains its type.
    byte[] resultByteArray = new byte[1];
    resultByteArray[0] = MSG_TYPE_WINDOW_PROBE;
    return resultByteArray;
  }
}
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -60,6 +60,7 @@
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -1372,4 +1373,34 @@
    session.publish(msg);
  }
  /**
   * Process the reception of a WindowProbe message.
   *
   * @param  windowProbeMsg The message to process.
   *
   * @throws IOException    When the session becomes unavailable.
   */
  public void process(WindowProbe windowProbeMsg) throws IOException
  {
    if (rcvWindow > 0)
    {
      // The LDAP server believes that its window is closed
      // while it is not, this means that some problem happened in the
      // window exchange procedure !
      // lets update the LDAP server with out current window size and hope
      // that everything will work better in the futur.
      // TODO also log an error message.
      WindowMessage msg = new WindowMessage(rcvWindow);
      session.publish(msg);
      outAckCount++;
    }
    else
    {
      // Both the LDAP server and the replication server believes that the
      // window is closed. Lets check the flowcontrol in case we
      // can now resume operations and send a windowMessage if necessary.
      checkWindow();
    }
  }
}
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -45,6 +45,7 @@
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.loggers.debug.DebugTracer;
@@ -159,6 +160,11 @@
          ErrorMessage errorMsg = (ErrorMessage) msg;
          handler.process(errorMsg);
        }
        else if (msg instanceof WindowProbe)
        {
          WindowProbe windowProbeMsg = (WindowProbe) msg;
          handler.process(windowProbeMsg);
        }
        else if (msg == null)
        {
          /*
@@ -184,7 +190,7 @@
      String message = getMessage(msgID, handler.toString());
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.NOTICE,
               message + e.getMessage(), msgID);
               message + ": " + e.getMessage(), msgID);
    } catch (ClassNotFoundException e)
    {
      /*
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -534,7 +534,7 @@
  /**
   * Test that WindowMessageTest encoding and decoding works
   * by checking that : msg == new WindowMessageTest(msg.getBytes()).
   * by checking that : msg == new WindowMessage(msg.getBytes()).
   */
  @Test()
  public void WindowMessageTest() throws Exception
@@ -543,6 +543,18 @@
    WindowMessage newMsg = new WindowMessage(msg.getBytes());
    assertEquals(msg.getNumAck(), newMsg.getNumAck());
  }
  /**
   * Test that WindowProbe encoding and decoding works
   * by checking that : new WindowProbe(msg.getBytes()) does not throws
   * an exception.
   */
  @Test()
  public void WindowProbeTest() throws Exception
  {
    WindowProbe msg = new WindowProbe();
    new WindowProbe(msg.getBytes());
  }
  /**
   * Test that EntryMessage encoding and decoding works
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -27,11 +27,15 @@
package org.opends.server.replication.server;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import static org.opends.server.replication.protocol.OperationContext.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
@@ -49,7 +53,13 @@
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ServerStartMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
@@ -748,6 +758,56 @@
  }
  /**
   * Test that the Replication sends back correctly WindowsUpdate
   * when we send a WindowProbe.
   */
  @Test()
  public void windowProbeTest() throws Exception
  {
    final int WINDOW = 10;
    /*
     * Open a socket connection to the replication server
     */
    InetSocketAddress ServerAddr = new InetSocketAddress(
        InetAddress.getByName("localhost"), replicationServerPort);
    Socket socket = new Socket();
    socket.setReceiveBufferSize(1000000);
    socket.setTcpNoDelay(true);
    socket.connect(ServerAddr, 500);
    SocketSession session = new SocketSession(socket);
    /*
     * Send our ServerStartMessage.
     */
    ServerStartMessage msg =
      new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"),
          0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(),
          ProtocolVersion.currentVersion());
    session.publish(msg);
    /*
     * Read the ReplServerStartMessage that should come back.
     */
    session.setSoTimeout(10000);
    ReplServerStartMessage replStartMsg =
      (ReplServerStartMessage) session.receive();
    int serverwindow = replStartMsg.getWindowSize();
    // push a WindowProbe message
    session.publish(new WindowProbe());
    WindowMessage windowMsg = (WindowMessage) session.receive();
    assertEquals(serverwindow, windowMsg.getNumAck());
    // check that this did not change the window by sending a probe again.
    session.publish(new WindowProbe());
    windowMsg = (WindowMessage) session.receive();
    assertEquals(serverwindow, windowMsg.getNumAck());
  }
  /**
   * After the tests stop the replicationServer.
   */
  @AfterClass()