mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
04.12.2007 23b1e20ff9fe938572a0b62ec5a12f12154445df
opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -444,6 +444,12 @@
  /**
   * The connection to the curent Replication Server has failed.
   */
  public static final int MSGID_DISCONNECTED_FROM_CHANGELOG =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 63;
  /**
   * Register the messages from this class in the core server.
   *
   */
@@ -607,6 +613,9 @@
        "The Replication is configured for suffix  %s "
        + "but was not able to connect to any Replication Server");
    registerMessage(MSGID_NOW_FOUND_CHANGELOG,
        "A Replication Server was found for suffix %s");
        "Replication Server %s now used for Replication Domain %s");
    registerMessage(MSGID_DISCONNECTED_FROM_CHANGELOG,
        "The connection to Replication Server %s has been dropped by the "
        + "Replication Server");
  }
}
opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
@@ -54,7 +54,6 @@
  public HistoricalCsnOrderingMatchingRule()
  {
    super();
    // TODO Auto-generated constructor stub
  }
  /**
@@ -81,7 +80,7 @@
  @Override
  public void initializeMatchingRule(OrderingMatchingRuleCfg configuration)
  {
    // TODO Auto-generated method stub
    // No implementation needed here.
  }
  /**
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -106,7 +106,7 @@
   *
   * @return The number of update currently in the list.
   */
  public synchronized int size()
  public int size()
  {
    return pendingChanges.size();
  }
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -44,6 +44,7 @@
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -60,6 +61,7 @@
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ErrorLogCategory;
@@ -83,7 +85,6 @@
  private boolean shutdown = false;
  private Collection<String> servers;
  private boolean connected = false;
  private final Object lock = new Object();
  private String replicationServer = "Not connected";
  private TreeSet<FakeOperation> replayOperations;
  private ProtocolSession session = null;
@@ -120,7 +121,7 @@
  private int numLostConnections = 0;
  /**
   * When the broker cannort connect to any replication server
   * When the broker cannot connect to any replication server
   * it log an error and keeps continuing every second.
   * This boolean is set when the first failure happens and is used
   * to avoid repeating the error message for further failure to connect
@@ -129,6 +130,8 @@
   */
  private boolean connectionError = false;
  private Object connectPhaseLock = new Object();
  /**
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
   *
@@ -217,234 +220,243 @@
    boolean checkState = true;
    boolean receivedResponse = true;
    while ((!connected) && (!shutdown) && (receivedResponse))
    synchronized (connectPhaseLock)
    {
      receivedResponse = false;
      for (String server : servers)
      while ((!connected) && (!shutdown) && (receivedResponse))
      {
        int separator = server.lastIndexOf(':');
        String port = server.substring(separator + 1);
        String hostname = server.substring(0, separator);
        try
        receivedResponse = false;
        for (String server : servers)
        {
          /*
           * Open a socket connection to the next candidate.
           */
          InetSocketAddress ServerAddr = new InetSocketAddress(
              InetAddress.getByName(hostname), Integer.parseInt(port));
          Socket socket = new Socket();
          socket.setReceiveBufferSize(1000000);
          socket.setTcpNoDelay(true);
          socket.connect(ServerAddr, 500);
          session = new SocketSession(socket);
          int separator = server.lastIndexOf(':');
          String port = server.substring(separator + 1);
          String hostname = server.substring(0, separator);
          /*
           * Send our ServerStartMessage.
           */
          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
              halfRcvWindow*2, heartbeatInterval, state,
              protocolVersion);
          session.publish(msg);
          /*
           * Read the ReplServerStartMessage that should come back.
           */
          session.setSoTimeout(1000);
          startMsg = (ReplServerStartMessage) session.receive();
          receivedResponse = true;
          /*
           * We have sent our own protocol version to the replication server.
           * The replication server will use the same one (or an older one
           * if it is an old replication server).
           */
          protocolVersion = ProtocolVersion.minWithCurrent(
              startMsg.getVersion());
          session.setSoTimeout(timeout);
          /*
           * We must not publish changes to a replicationServer that has not
           * seen all our previous changes because this could cause some
           * other ldap servers to miss those changes.
           * Check that the ReplicationServer has seen all our previous changes.
           * If not, try another replicationServer.
           * If no other replicationServer has seen all our changes, recover
           * those changes and send them again to any replicationServer.
           */
          ChangeNumber replServerMaxChangeNumber =
            startMsg.getServerState().getMaxChangeNumber(serverID);
          if (replServerMaxChangeNumber == null)
            replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
          ChangeNumber ourMaxChangeNumber =  state.getMaxChangeNumber(serverID);
          if ((ourMaxChangeNumber == null) ||
              (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
          try
          {
            replicationServer = ServerAddr.toString();
            maxSendWindow = startMsg.getWindowSize();
            this.sendWindow = new Semaphore(maxSendWindow);
            connected = true;
            startHeartBeat();
            break;
          }
          else
          {
            if (checkState == true)
            /*
             * Open a socket connection to the next candidate.
             */
            InetSocketAddress ServerAddr = new InetSocketAddress(
                InetAddress.getByName(hostname), Integer.parseInt(port));
            Socket socket = new Socket();
            socket.setReceiveBufferSize(1000000);
            socket.setTcpNoDelay(true);
            socket.connect(ServerAddr, 500);
            session = new SocketSession(socket);
            /*
             * Send our ServerStartMessage.
             */
            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                halfRcvWindow*2, heartbeatInterval, state,
                protocolVersion);
            session.publish(msg);
            /*
             * Read the ReplServerStartMessage that should come back.
             */
            session.setSoTimeout(1000);
            startMsg = (ReplServerStartMessage) session.receive();
            receivedResponse = true;
            /*
             * We have sent our own protocol version to the replication server.
             * The replication server will use the same one (or an older one
             * if it is an old replication server).
             */
            protocolVersion = ProtocolVersion.minWithCurrent(
                startMsg.getVersion());
            session.setSoTimeout(timeout);
            /*
             * We must not publish changes to a replicationServer that has not
             * seen all our previous changes because this could cause some
             * other ldap servers to miss those changes.
             * Check that the ReplicationServer has seen all our previous
             * changes.
             * If not, try another replicationServer.
             * If no other replicationServer has seen all our changes, recover
             * those changes and send them again to any replicationServer.
             */
            ChangeNumber replServerMaxChangeNumber =
              startMsg.getServerState().getMaxChangeNumber(serverID);
            if (replServerMaxChangeNumber == null)
              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
            ChangeNumber ourMaxChangeNumber =
              state.getMaxChangeNumber(serverID);
            if ((ourMaxChangeNumber == null) ||
                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
            {
              /* This replicationServer is missing some
               * of our changes, we are going to try another server
               * but before log a notice message
               */
              int    msgID   = MSGID_CHANGELOG_MISSING_CHANGES;
              String message = getMessage(msgID, server);
              logError(ErrorLogCategory.SYNCHRONIZATION,
                       ErrorLogSeverity.NOTICE,
                       message, msgID);
              replicationServer = ServerAddr.toString();
              maxSendWindow = startMsg.getWindowSize();
              connected = true;
              startHeartBeat();
              break;
            }
            else
            {
              replayOperations.clear();
              /*
               * Get all the changes that have not been seen by this
               * replicationServer and update it
               */
              InternalClientConnection conn =
                  InternalClientConnection.getRootConnection();
              LDAPFilter filter = LDAPFilter.decode(
                  "("+ Historical.HISTORICALATTRIBUTENAME +
                  ">=dummy:" + replServerMaxChangeNumber + ")");
              LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
              attrs.add(Historical.HISTORICALATTRIBUTENAME);
              InternalSearchOperation op = conn.processSearch(
                  new ASN1OctetString(baseDn.toString()),
                  SearchScope.WHOLE_SUBTREE,
                  DereferencePolicy.NEVER_DEREF_ALIASES,
                  0, 0, false, filter,
                  attrs, this);
              if (op.getResultCode() != ResultCode.SUCCESS)
              if (checkState == true)
              {
                /*
                 * An error happened trying to search for the updates
                 * This server therefore can't start acepting new updates.
                 * TODO : should stop the LDAP server (how to ?)
                /* This replicationServer is missing some
                 * of our changes, we are going to try another server
                 * but before log a notice message
                 */
                int    msgID   = MSGID_CANNOT_RECOVER_CHANGES;
                String message = getMessage(msgID);
                int    msgID   = MSGID_CHANGELOG_MISSING_CHANGES;
                String message = getMessage(msgID, server);
                logError(ErrorLogCategory.SYNCHRONIZATION,
                         ErrorLogSeverity.FATAL_ERROR,
                         message, msgID);
                    ErrorLogSeverity.NOTICE,
                    message, msgID);
              }
              else
              {
                replicationServer = ServerAddr.toString();
                maxSendWindow = startMsg.getWindowSize();
                this.sendWindow = new Semaphore(maxSendWindow);
                connected = true;
                for (FakeOperation replayOp : replayOperations)
                replayOperations.clear();
                logError(ErrorLogCategory.SYNCHRONIZATION,
                    ErrorLogSeverity.NOTICE,
                    "going to search for changes", 1);
                /*
                 * Get all the changes that have not been seen by this
                 * replicationServer and update it
                 */
                InternalClientConnection conn =
                  InternalClientConnection.getRootConnection();
                LDAPFilter filter = LDAPFilter.decode(
                    "("+ Historical.HISTORICALATTRIBUTENAME +
                    ">=dummy:" + replServerMaxChangeNumber + ")");
                LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
                attrs.add(Historical.HISTORICALATTRIBUTENAME);
                InternalSearchOperation op = conn.processSearch(
                    new ASN1OctetString(baseDn.toString()),
                    SearchScope.WHOLE_SUBTREE,
                    DereferencePolicy.NEVER_DEREF_ALIASES,
                    0, 0, false, filter,
                    attrs, this);
                if (op.getResultCode() != ResultCode.SUCCESS)
                {
                  publish(replayOp.generateMessage());
                  /*
                   * An error happened trying to search for the updates
                   * This server will start acepting again new updates but
                   * some inconsistencies will stay between servers.
                   * TODO : REPAIR : log an error for the repair tool
                   * that will need to resynchronize the servers.
                   */
                  int    msgID   = MSGID_CANNOT_RECOVER_CHANGES;
                  String message = getMessage(msgID);
                  logError(ErrorLogCategory.SYNCHRONIZATION,
                      ErrorLogSeverity.FATAL_ERROR,
                      message, msgID);
                }
                startHeartBeat();
                break;
                else
                {
                  replicationServer = ServerAddr.toString();
                  maxSendWindow = startMsg.getWindowSize();
                  connected = true;
                  for (FakeOperation replayOp : replayOperations)
                  {
                    logError(ErrorLogCategory.SYNCHRONIZATION,
                        ErrorLogSeverity.NOTICE,
                        "sendingChange", 1);
                    session.publish(replayOp.generateMessage());
                  }
                  startHeartBeat();
                  logError(ErrorLogCategory.SYNCHRONIZATION,
                      ErrorLogSeverity.NOTICE,
                      "changes sent", 1);
                  break;
                }
              }
            }
          }
          catch (ConnectException e)
          {
            /*
             * There was no server waiting on this host:port
             * Log a notice and try the next replicationServer in the list
             */
            if (!connectionError )
            {
              // the error message is only logged once to avoid overflowing
              // the error log
              int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
              String message = getMessage(msgID, server);
              logError(ErrorLogCategory.SYNCHRONIZATION,
                  ErrorLogSeverity.NOTICE,
                  message, msgID);
            }
          }
          catch (Exception e)
          {
            int    msgID   = MSGID_EXCEPTION_STARTING_SESSION;
            String message = getMessage(msgID);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.SEVERE_ERROR,
                message + stackTraceToSingleLineString(e), msgID);
          }
          finally
          {
            if (connected == false)
            {
              if (session != null)
              {
                session.close();
                session = null;
              }
            }
          }
        }
        catch (ConnectException e)
        if ((!connected) && (checkState == true) && receivedResponse)
        {
          /*
           * There was no server waiting on this host:port
           * Log a notice and try the next replicationServer in the list
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any replicationServer.
           */
          if (!connectionError )
          {
            // the error message is only logged once to avoid overflowing
            // the error log
            int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
            String message = getMessage(msgID, server);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.NOTICE,
                message, msgID);
          }
        }
        catch (Exception e)
        {
          int    msgID   = MSGID_EXCEPTION_STARTING_SESSION;
          String message = getMessage(msgID)  + stackTraceToSingleLineString(e);
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
          String message = getMessage(msgID);
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
        }
        finally
        {
          if (connected == false)
          {
            if (session != null)
            {
              logError(ErrorLogCategory.SYNCHRONIZATION,
                  ErrorLogSeverity.NOTICE,
                  "Broker : connect closing session" , 1);
              session.close();
              session = null;
            }
          }
              ErrorLogSeverity.NOTICE,
              message, msgID);
          checkState = false;
        }
      }
      if ((!connected) && (checkState == true) && receivedResponse)
      if (connected)
      {
        // This server has connected correctly.
        // Log a message to let the administrator know that the failure was
        // resolved.
        // wakeup all the thread that were waiting on the window
        // on the previous connection.
        connectionError = false;
        if (sendWindow != null)
          sendWindow.release(Integer.MAX_VALUE);
        this.sendWindow = new Semaphore(maxSendWindow);
        connectPhaseLock.notify();
        int    msgID   = MSGID_NOW_FOUND_CHANGELOG;
        String message =
          getMessage(msgID, replicationServer, baseDn.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE, message, msgID);
      }
      else
      {
        /*
         * We could not find a replicationServer that has seen all the
         * changes that this server has already processed, start again
         * the loop looking for any replicationServer.
         * This server could not find any replicationServer
         * It's going to start in degraded mode.
         * Log a message
         */
        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
        String message = getMessage(msgID);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            message, msgID);
        try
        if (!connectionError)
        {
          Thread.sleep(500);
        } catch (InterruptedException e)
        {
          checkState = false;
          connectionError = true;
          connectPhaseLock.notify();
          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
          String message = getMessage(msgID, baseDn.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE, message, msgID);
        }
        checkState = false;
      }
    }
    if (connected)
    {
      // This server has connected correctly.
      // let's check if it was previosuly on error, in this case log
      // a message to let the administratot know that the failure was resolved.
      if (connectionError)
      {
        connectionError = false;
        int    msgID   = MSGID_NOW_FOUND_CHANGELOG;
        String message = getMessage(msgID, baseDn.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE, message, msgID);
      }
    }
    else
    {
      /*
       * This server could not find any replicationServer
       * It's going to start in degraded mode.
       * Log a message
       */
      if (!connectionError)
      {
        checkState = false;
        connectionError = true;
        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
        String message = getMessage(msgID, baseDn.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE, message, msgID);
      }
    }
  }
@@ -530,30 +542,90 @@
  public void publish(ReplicationMessage msg)
  {
    boolean done = false;
    ProtocolSession failingSession = session;
    while (!done)
    {
      if (connectionError)
        return;
      synchronized (lock)
      {
        try
        // It was not possible to connect to any replication server.
        // Since the operation was already processed, we have no other
        // choice than to return without sending the ReplicationMessage
        // and relying on the resend procedure of the connect phase to
        // fix the problem when we finally connect.
        return;
      }
      try
      {
        boolean credit;
        ProtocolSession current_session;
        Semaphore currentWindowSemaphore;
        // save the session at the time when we acquire the
        // sendwindow credit so that we can make sure later
        // that the session did not change in between.
        // This is necessary to make sure that we don't publish a message
        // on a session with a credit that was acquired from a previous
        // session.
        synchronized (connectPhaseLock)
        {
          if (this.connected == false)
            this.reStart(failingSession);
          if (msg instanceof UpdateMessage)
            sendWindow.acquire();
          session.publish(msg);
          done = true;
        } catch (IOException e)
        {
          this.reStart(failingSession);
          current_session = session;
          currentWindowSemaphore = sendWindow;
        }
        catch (InterruptedException e)
        if (msg instanceof UpdateMessage)
        {
          this.reStart(failingSession);
          // Acquiring the window credit must be done outside of the
          // connectPhaseLock because it can be blocking and we don't
          // want to hold off reconnection in case the connection dropped.
          credit =
            currentWindowSemaphore.tryAcquire(
                (long) 500, TimeUnit.MILLISECONDS);
        }
        else
        {
          credit = true;
        }
        if (credit)
        {
          synchronized (connectPhaseLock)
          {
            // check the session. If it has changed, some
            // deconnection/reconnection happened and we need to restart from
            // scratch.
            if (session == current_session)
            {
              session.publish(msg);
              done = true;
            }
          }
        }
        if (!credit)
        {
          // the window is still closed.
          // Send a WindowProbe message to wakeup the receiver in case the
          // window update message was lost somehow...
          // then loop to check again if connection was closed.
          session.publish(new WindowProbe());
        }
      } catch (IOException e)
      {
        // The receive threads should handle reconnection or
        // mark this broker in error. Just retry.
        synchronized (connectPhaseLock)
        {
          try
          {
            connectPhaseLock.wait(100);
          } catch (InterruptedException e1)
          {
            // ignore
          }
        }
      }
      catch (InterruptedException e)
      {
        // just loop.
      }
    }
  }
@@ -561,6 +633,10 @@
  /**
   * Receive a message.
   * This method is not multithread safe and should either always be
   * called in a single thread or protected by a locking mechanism
   * before being called.
   *
   * @return the received message
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
@@ -603,10 +679,12 @@
      {
        if (shutdown == false)
        {
          synchronized (lock)
          {
            this.reStart(failingSession);
          }
          int    msgID   = MSGID_DISCONNECTED_FROM_CHANGELOG;
          String message = getMessage(msgID, replicationServer);
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.NOTICE,
              message + " " + e.getMessage(), msgID);
          this.reStart(failingSession);
        }
      }
    }
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -561,7 +561,7 @@
  {
    if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
    {
      // this policy imply that we always aceept updates.
      // this policy imply that we always accept updates.
      return true;
    }
    if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
@@ -587,7 +587,7 @@
      }
    }
    // we should never get there as the only possible policies are
    // ACCEPT_UPDATES and DENY_UPDATES
    // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
    return true;
  }
opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
@@ -59,7 +59,7 @@
  @Override()
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
  {
    // TODO Auto-generated method stub
    // no implementation needed.
  }
  /**
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -52,6 +52,7 @@
  static final byte MSG_TYPE_ENTRY = 12;
  static final byte MSG_TYPE_DONE = 13;
  static final byte MSG_TYPE_ERROR = 14;
  static final byte MSG_TYPE_WINDOW_PROBE = 15;
  // Adding a new type of message here probably requires to
  // change accordingly generateMsg method below
@@ -136,6 +137,9 @@
      case MSG_TYPE_ERROR:
        msg = new ErrorMessage(buffer);
      break;
      case MSG_TYPE_WINDOW_PROBE:
        msg = new WindowProbe(buffer);
      break;
      default:
        throw new DataFormatException("received message with unknown type");
    }
opends/src/server/org/opends/server/replication/protocol/WindowMessage.java
@@ -32,9 +32,13 @@
/**
 * This message is used by LDAP server when they first connect.
 * to a replication server to let them know who they are and what is their state
 * (their RUV)
 * This message is used by LDAP server or by Replication Servers to
 * update the send window of the remote entities.
 *
 * A receiving entity should create such a message with a given credit
 * when it wants to open the send window of the remote entity.
 * A LDAP or Replication Server should increase its send window when receiving
 * such a message.
 */
public class WindowMessage extends ReplicationMessage implements
    Serializable
@@ -47,7 +51,7 @@
   * Create a new WindowMessage.
   *
   * @param numAck The number of acknowledged messages.
   *               The window will be increase by this number.
   *               The window will be increase by this credit number.
   */
  public WindowMessage(int numAck)
  {
opends/src/server/org/opends/server/replication/protocol/WindowProbe.java
New file
@@ -0,0 +1,84 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
import java.io.Serializable;
import java.util.zip.DataFormatException;
/**
 * This message is used by LDAP or Replication Server that have been
 * out of credit for a while and want to check that the remote servers.
 *
 * A sending entity that is blocked because its send window is closed
 * for a while should create such a message to check that the window
 * closure is valid.
 *
 * An entity that received such a message should respond with a
 * WindowUpdate message indicating the curent credit available.
 */
public class WindowProbe extends ReplicationMessage implements
    Serializable
{
  private static final long serialVersionUID = 8442267608764026867L;
  /**
   * Create a new WindowProbe message.
   */
  public WindowProbe()
  {
  }
  /**
   * Creates a new WindowProbe from its encoded form.
   *
   * @param in The byte array containing the encoded form of the
   *           WindowMessage.
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the WindowMessage.
   */
  public WindowProbe(byte[] in) throws DataFormatException
  {
    // WindowProbe Message only contains its type.
    if (in[0] != MSG_TYPE_WINDOW_PROBE)
      throw new DataFormatException("input is not a valid Window Message");
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes()
  {
    // WindowProbe Message only contains its type.
    byte[] resultByteArray = new byte[1];
    resultByteArray[0] = MSG_TYPE_WINDOW_PROBE;
    return resultByteArray;
  }
}
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -60,6 +60,7 @@
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
@@ -1372,4 +1373,34 @@
    session.publish(msg);
  }
  /**
   * Process the reception of a WindowProbe message.
   *
   * @param  windowProbeMsg The message to process.
   *
   * @throws IOException    When the session becomes unavailable.
   */
  public void process(WindowProbe windowProbeMsg) throws IOException
  {
    if (rcvWindow > 0)
    {
      // The LDAP server believes that its window is closed
      // while it is not, this means that some problem happened in the
      // window exchange procedure !
      // lets update the LDAP server with out current window size and hope
      // that everything will work better in the futur.
      // TODO also log an error message.
      WindowMessage msg = new WindowMessage(rcvWindow);
      session.publish(msg);
      outAckCount++;
    }
    else
    {
      // Both the LDAP server and the replication server believes that the
      // window is closed. Lets check the flowcontrol in case we
      // can now resume operations and send a windowMessage if necessary.
      checkWindow();
    }
  }
}
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -45,6 +45,7 @@
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.loggers.debug.DebugTracer;
@@ -159,6 +160,11 @@
          ErrorMessage errorMsg = (ErrorMessage) msg;
          handler.process(errorMsg);
        }
        else if (msg instanceof WindowProbe)
        {
          WindowProbe windowProbeMsg = (WindowProbe) msg;
          handler.process(windowProbeMsg);
        }
        else if (msg == null)
        {
          /*
@@ -184,7 +190,7 @@
      String message = getMessage(msgID, handler.toString());
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.NOTICE,
               message + e.getMessage(), msgID);
               message + ": " + e.getMessage(), msgID);
    } catch (ClassNotFoundException e)
    {
      /*
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -534,7 +534,7 @@
  /**
   * Test that WindowMessageTest encoding and decoding works
   * by checking that : msg == new WindowMessageTest(msg.getBytes()).
   * by checking that : msg == new WindowMessage(msg.getBytes()).
   */
  @Test()
  public void WindowMessageTest() throws Exception
@@ -543,6 +543,18 @@
    WindowMessage newMsg = new WindowMessage(msg.getBytes());
    assertEquals(msg.getNumAck(), newMsg.getNumAck());
  }
  /**
   * Test that WindowProbe encoding and decoding works
   * by checking that : new WindowProbe(msg.getBytes()) does not throws
   * an exception.
   */
  @Test()
  public void WindowProbeTest() throws Exception
  {
    WindowProbe msg = new WindowProbe();
    new WindowProbe(msg.getBytes());
  }
  /**
   * Test that EntryMessage encoding and decoding works
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -27,11 +27,15 @@
package org.opends.server.replication.server;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import static org.opends.server.replication.protocol.OperationContext.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
@@ -49,7 +53,13 @@
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ServerStartMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
@@ -748,6 +758,56 @@
  }
  /**
   * Test that the Replication sends back correctly WindowsUpdate
   * when we send a WindowProbe.
   */
  @Test()
  public void windowProbeTest() throws Exception
  {
    final int WINDOW = 10;
    /*
     * Open a socket connection to the replication server
     */
    InetSocketAddress ServerAddr = new InetSocketAddress(
        InetAddress.getByName("localhost"), replicationServerPort);
    Socket socket = new Socket();
    socket.setReceiveBufferSize(1000000);
    socket.setTcpNoDelay(true);
    socket.connect(ServerAddr, 500);
    SocketSession session = new SocketSession(socket);
    /*
     * Send our ServerStartMessage.
     */
    ServerStartMessage msg =
      new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"),
          0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(),
          ProtocolVersion.currentVersion());
    session.publish(msg);
    /*
     * Read the ReplServerStartMessage that should come back.
     */
    session.setSoTimeout(10000);
    ReplServerStartMessage replStartMsg =
      (ReplServerStartMessage) session.receive();
    int serverwindow = replStartMsg.getWindowSize();
    // push a WindowProbe message
    session.publish(new WindowProbe());
    WindowMessage windowMsg = (WindowMessage) session.receive();
    assertEquals(serverwindow, windowMsg.getNumAck());
    // check that this did not change the window by sending a probe again.
    session.publish(new WindowProbe());
    windowMsg = (WindowMessage) session.receive();
    assertEquals(serverwindow, windowMsg.getNumAck());
  }
  /**
   * After the tests stop the replicationServer.
   */
  @AfterClass()