mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
23.27.2011 6ee1440f6f56ac066f97383315b2798287f0821a
Fix issue OpenDJ-95: Socket leak and constant disconnect/reconnect when a directory server can no longer reach its connected replication server
18 files modified
1489 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java 29 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java 307 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/SocketSession.java 334 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java 451 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDB.java 35 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java 17 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDB.java 36 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 44 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 27 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 24 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 113 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java 27 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 12 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.plugin;
@@ -91,8 +92,8 @@
    /**
     * Constructor.
     * @param fractionalConfig
     * @param domain
     * @param fractionalConfig The fractional configuration.
     * @param domain The replication domain.
     */
    public ImportFractionalContext(FractionalConfig fractionalConfig,
      LDAPReplicationDomain domain)
opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -35,8 +35,6 @@
import org.opends.server.loggers.debug.DebugTracer;
import java.io.IOException;
import org.opends.server.api.DirectoryThread;
/**
@@ -71,27 +69,18 @@
  private volatile boolean shutdown = false;
  /**
   * Send StopMsg before session closure or not.
   */
  private final boolean sendStopBeforeClose;
  /**
   * Create a heartbeat monitor thread.
   * @param threadName The name of the heartbeat thread.
   * @param session The session on which heartbeats are to be monitored.
   * @param heartbeatInterval The expected interval between heartbeats received
   * (in milliseconds).
   * @param sendStopBeforeClose Should we send a StopMsg before closing the
   *        session ?
   */
  public HeartbeatMonitor(String threadName, ProtocolSession session,
                          long heartbeatInterval, boolean sendStopBeforeClose)
                          long heartbeatInterval)
  {
    super(threadName);
    this.session = session;
    this.heartbeatInterval = heartbeatInterval;
    this.sendStopBeforeClose = sendStopBeforeClose;
  }
  /**
@@ -126,18 +115,6 @@
          {
            // Heartbeat is well overdue so the server is assumed to be dead.
            logError(NOTE_HEARTBEAT_FAILURE.get(currentThread().getName()));
            if (sendStopBeforeClose)
            {
              // V4 protocol introduces a StopMsg to properly end communications
              try
              {
                session.publish(new StopMsg());
              }
              catch (IOException ioe)
              {
                // Anyway, going to close session, so nothing to do
              }
            }
            session.close();
            break;
          }
@@ -160,10 +137,6 @@
        }
      }
    }
    catch (IOException e)
    {
      // Hope that's OK.
    }
    finally
    {
      if (debugEnabled())
opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.protocol;
@@ -44,10 +45,8 @@
  /**
   * This method is called when the session with the remote must be closed.
   * This object won't be used anymore after this method is called.
   *
   * @throws IOException If an error happen during the close process.
   */
  public abstract void close() throws IOException;
  public abstract void close();
  /**
   * This method is called when a ReplicationMsg must be sent to
opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -28,77 +28,104 @@
package org.opends.server.replication.protocol;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import org.opends.messages.Message;
import org.opends.server.types.DirectoryConfig;
import org.opends.server.types.CryptoManager;
import org.opends.server.config.ConfigException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.SortedSet;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import java.util.SortedSet;
import java.net.Socket;
import java.net.InetAddress;
import java.io.IOException;
import org.opends.messages.Message;
import org.opends.server.config.ConfigException;
import org.opends.server.types.CryptoManager;
import org.opends.server.types.DirectoryConfig;
/**
 * This class represents the security configuration for replication protocol
 * sessions. It contains all the configuration required to use SSL, and it
 * determines whether encryption should be enabled for a session to a given
 * replication server.
 *
 */
public class ReplSessionSecurity
public final class ReplSessionSecurity
{
  /**
   * Whether replication sessions use SSL encryption.
   */
  private boolean sslEncryption;
  private final boolean sslEncryption;
  /**
   * The name of the local certificate to use, or null if none is specified.
   */
  private String sslCertNickname;
  private final String sslCertNickname;
  /**
   * The set of enabled SSL protocols, or null for the default set.
   */
  private String sslProtocols[];
  private final String sslProtocols[];
  /**
   * The set of enabled SSL cipher suites, or null for the default set.
   */
  private String sslCipherSuites[];
  private final String sslCipherSuites[];
  /**
   * The default soTimeout value to be used at handshake phases.
   * (DS<->RS and RS<->RS)
   * The default soTimeout value to be used at handshake phases. (DS<->RS and
   * RS<->RS)
   */
  public static final int HANDSHAKE_TIMEOUT = 4000;
  /**
   * Create a ReplSessionSecurity instance from a provided multimaster domain
   * configuration.
   *
   * @throws ConfigException
   *           If the supplied configuration was not valid.
   */
  public ReplSessionSecurity() throws ConfigException
  {
    // Currently use global settings from the crypto manager.
    this(DirectoryConfig.getCryptoManager().getSslCertNickname(),
        DirectoryConfig.getCryptoManager().getSslProtocols(),
        DirectoryConfig.getCryptoManager().getSslCipherSuites(),
        DirectoryConfig.getCryptoManager().isSslEncryption());
  }
  /**
   * Create a ReplSessionSecurity instance from the supplied configuration
   * values.
   *
   * @param sslCertNickname The name of the local certificate to use, or null
   *                        if none is specified.
   * @param sslProtocols    The protocols that should be enabled, or null if
   *                        the default protocols should be used.
   * @param sslCipherSuites The cipher suites that should be enabled, or null
   *                        if the default cipher suites should be used.
   * @param sslEncryption   Whether replication sessions use SSL encryption.
   *
   * @throws ConfigException    If the supplied configuration was not valid.
   * @param sslCertNickname
   *          The name of the local certificate to use, or null if none is
   *          specified.
   * @param sslProtocols
   *          The protocols that should be enabled, or null if the default
   *          protocols should be used.
   * @param sslCipherSuites
   *          The cipher suites that should be enabled, or null if the default
   *          cipher suites should be used.
   * @param sslEncryption
   *          Whether replication sessions use SSL encryption.
   * @throws ConfigException
   *           If the supplied configuration was not valid.
   */
  public ReplSessionSecurity(String sslCertNickname,
    SortedSet<String> sslProtocols,
    SortedSet<String> sslCipherSuites,
    boolean sslEncryption)
    throws ConfigException
  public ReplSessionSecurity(final String sslCertNickname,
      final SortedSet<String> sslProtocols,
      final SortedSet<String> sslCipherSuites,
      final boolean sslEncryption) throws ConfigException
  {
    if (sslProtocols == null || sslProtocols.size() == 0)
    {
@@ -124,76 +151,46 @@
    this.sslCertNickname = sslCertNickname;
  }
  /**
   * Create a ReplSessionSecurity instance from a provided multimaster domain
   * configuration.
   *
   * @throws ConfigException If the supplied configuration was not valid.
   */
  public ReplSessionSecurity()
    throws ConfigException
  {
    // Currently use global settings from the crypto manager.
    this(DirectoryConfig.getCryptoManager().getSslCertNickname(),
      DirectoryConfig.getCryptoManager().getSslProtocols(),
      DirectoryConfig.getCryptoManager().getSslCipherSuites(),
      DirectoryConfig.getCryptoManager().isSslEncryption());
  }
  /**
   * Determine whether a given replication server is listening on a secure
   * port.
   * @param serverURL The replication server URL.
   * @return true if the given replication server is listening on a secure
   *         port, or false if it is listening on a non-secure port.
   */
  private boolean isSecurePort(String serverURL)
  {
    // Always true unless changed for test purposes.
    return true;
  }
  /**
   * Determine whether sessions to a given replication server should be
   * encrypted.
   * @param serverURL The replication server URL.
   * @return true if sessions to the given replication server should be
   *         encrypted, or false if they should not be encrypted.
   */
  public boolean isSslEncryption(String serverURL)
  {
    // Currently use global settings from the crypto manager.
    return sslEncryption;
  }
  /**
   * Create a new protocol session in the client role on the provided socket.
   * @param serverURL The remote replication server to which the socket is
   *                  connected.
   * @param socket The connected socket.
   * @param soTimeout The socket timeout option to use for the protocol session.
   *
   * @param serverURL
   *          The remote replication server to which the socket is connected.
   * @param socket
   *          The connected socket.
   * @param soTimeout
   *          The socket timeout option to use for the protocol session.
   * @return The new protocol session.
   * @throws ConfigException If the protocol session could not be established
   *                         due to a configuration problem.
   * @throws IOException     If the protocol session could not be established
   *                         for some other reason.
   * @throws ConfigException
   *           If the protocol session could not be established due to a
   *           configuration problem.
   * @throws IOException
   *           If the protocol session could not be established for some other
   *           reason.
   */
  public ProtocolSession createClientSession(String serverURL, Socket socket,
    int soTimeout)
    throws ConfigException, IOException
  public ProtocolSession createClientSession(final String serverURL,
      final Socket socket, final int soTimeout)
      throws ConfigException, IOException
  {
    boolean useSSL = isSecurePort(serverURL);
    if (useSSL)
    boolean hasCompleted = false;
    SSLSocket secureSocket = null;
    try
    {
      // Create a new SSL context every time to make sure we pick up the
      // latest contents of the trust store.
      CryptoManager cryptoManager = DirectoryConfig.getCryptoManager();
      SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname);
      SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
      final CryptoManager cryptoManager = DirectoryConfig
          .getCryptoManager();
      final SSLContext sslContext = cryptoManager
          .getSslContext(sslCertNickname);
      final SSLSocketFactory sslSocketFactory = sslContext
          .getSocketFactory();
      SSLSocket secureSocket = (SSLSocket) sslSocketFactory.createSocket(socket,
        socket.getInetAddress().getHostName(),
        socket.getPort(), false);
      secureSocket = (SSLSocket) sslSocketFactory.createSocket(
          socket, socket.getInetAddress().getHostName(),
          socket.getPort(), false);
      secureSocket.setUseClientMode(true);
      secureSocket.setSoTimeout(soTimeout);
@@ -209,39 +206,73 @@
      // Force TLS negotiation now.
      secureSocket.startHandshake();
      hasCompleted = true;
      return new TLSSocketSession(socket, secureSocket);
    }
    else
    finally
    {
      return new SocketSession(socket);
      if (!hasCompleted)
      {
        try
        {
          socket.close();
        }
        catch (final Exception ignored)
        {
          // Ignore.
        }
        if (secureSocket != null)
        {
          try
          {
            secureSocket.close();
          }
          catch (final Exception ignored)
          {
            // Ignore.
          }
        }
      }
    }
  }
  /**
   * Create a new protocol session in the server role on the provided socket.
   * @param socket The connected socket.
   *
   * @param socket
   *          The connected socket.
   * @param soTimeout
   *          The socket timeout option to use for the protocol session.
   * @return The new protocol session.
   * @param soTimeout The socket timeout option to use for the protocol session.
   * @throws ConfigException If the protocol session could not be established
   *                         due to a configuration problem.
   * @throws IOException     If the protocol session could not be established
   *                         for some other reason.
   * @throws ConfigException
   *           If the protocol session could not be established due to a
   *           configuration problem.
   * @throws IOException
   *           If the protocol session could not be established for some other
   *           reason.
   */
  public ProtocolSession createServerSession(Socket socket, int soTimeout)
    throws ConfigException, IOException
  public ProtocolSession createServerSession(final Socket socket,
      final int soTimeout) throws ConfigException, IOException
  {
    boolean hasCompleted = false;
    SSLSocket secureSocket = null;
    try
    {
      // Create a new SSL context every time to make sure we pick up the
      // latest contents of the trust store.
      CryptoManager cryptoManager = DirectoryConfig.getCryptoManager();
      SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname);
      SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
      final CryptoManager cryptoManager = DirectoryConfig
          .getCryptoManager();
      final SSLContext sslContext = cryptoManager
          .getSslContext(sslCertNickname);
      final SSLSocketFactory sslSocketFactory = sslContext
          .getSocketFactory();
      SSLSocket secureSocket = (SSLSocket)
      sslSocketFactory.createSocket(socket,
          socket.getInetAddress().getHostName(),
      secureSocket = (SSLSocket) sslSocketFactory.createSocket(
          socket, socket.getInetAddress().getHostName(),
          socket.getPort(), false);
      secureSocket.setUseClientMode(false);
      secureSocket.setNeedClientAuth(true);
@@ -259,23 +290,63 @@
      // Force TLS negotiation now.
      secureSocket.startHandshake();
      // SSLSession sslSession = secureSocket.getSession();
      // System.out.println("Peer      = " + sslSession.getPeerHost() + ":" +
      //   sslSession.getPeerPort());
      // System.out.println("Principal = " + sslSession.getPeerPrincipal());
      hasCompleted = true;
      return new TLSSocketSession(socket, secureSocket);
    } catch (SSLException e)
    }
    catch (final SSLException e)
    {
      // This is probably a connection attempt from an unexpected client
      // log that to warn the administrator.
      InetAddress remHost = socket.getInetAddress();
      Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(remHost.
          getHostName(), remHost.getHostAddress(), e.getLocalizedMessage());
      final InetAddress remHost = socket.getInetAddress();
      final Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(
          remHost.getHostName(), remHost.getHostAddress(),
          e.getLocalizedMessage());
      logError(message);
      return null;
    }
    finally
    {
      if (!hasCompleted)
      {
        try
        {
          socket.close();
        }
        catch (final Exception ignored)
        {
          // Ignore.
        }
        if (secureSocket != null)
        {
          try
          {
            secureSocket.close();
          }
          catch (final Exception ignored)
          {
            // Ignore.
          }
        }
      }
    }
  }
  /**
   * Determine whether sessions to a given replication server should be
   * encrypted.
   *
   * @param serverURL
   *          The replication server URL.
   * @return true if sessions to the given replication server should be
   *         encrypted, or false if they should not be encrypted.
   */
  public boolean isSslEncryption(final String serverURL)
  {
    // Currently use global settings from the crypto manager.
    return sslEncryption;
  }
}
opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -27,135 +27,283 @@
 */
package org.opends.server.replication.protocol;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
/**
 * This class Implement a protocol session using a basic socket and relying on
 * the innate encoding/decoding capabilities of the ReplicationMsg
 * by using the getBytes() and generateMsg() methods of those classes.
 * the innate encoding/decoding capabilities of the ReplicationMsg by using the
 * getBytes() and generateMsg() methods of those classes.
 */
public class SocketSession implements ProtocolSession
public final class SocketSession implements ProtocolSession
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private Socket socket;
  private InputStream input;
  private OutputStream output;
  byte[] rcvLengthBuf = new byte[8];
  private final Socket socket;
  private final InputStream input;
  private final OutputStream output;
  private final byte[] rcvLengthBuf = new byte[8];
  /**
   * The time the last message published to this session.
   */
  private volatile long lastPublishTime = 0;
  /**
   * The time the last message was received on this session.
   */
  private long lastReceiveTime = 0;
  private volatile long lastReceiveTime = 0;
  // Close guarded by closeLock: use a different lock to publish since
  // publishing can block, and we don't want to block while closing failed
  // connections.
  private final Object closeLock = new Object();
  private boolean closeInitiated = false;
  // Publish guarded by publishLock: use a full lock here so that we can
  // optionally publish StopMsg during close.
  private final Lock publishLock = new ReentrantLock();
  // Does not need protecting: updated only during single threaded handshake.
  private short protocolVersion = ProtocolVersion.getCurrentVersion();
  /**
   * Creates a new SocketSession based on the provided socket.
   *
   * @param socket The Socket on which the SocketSession will be based.
   * @throws IOException When an IException happens on the socket.
   * @param socket
   *          The Socket on which the SocketSession will be based.
   * @throws IOException
   *           When an IException happens on the socket.
   */
  public SocketSession(Socket socket) throws IOException
  public SocketSession(final Socket socket) throws IOException
  {
    this.socket = socket;
    /*
     * Use a window instead of the TCP flow control.
     * Therefore set a very large value for send and receive buffer sizes.
     */
    input = socket.getInputStream();
    output = socket.getOutputStream();
  }
  /**
   * {@inheritDoc}
   */
  public void close() throws IOException
  {
    closeInitiated = true;
    if (debugEnabled())
    {
      TRACER.debugInfo("Closing SocketSession."
          + stackTraceToSingleLineString(new Exception()));
      TRACER.debugInfo("Creating SocketSession to %s from %s", socket
          .getRemoteSocketAddress().toString(),
          stackTraceToSingleLineString(new Exception()));
    }
    socket.close();
    this.socket = socket;
    this.input = socket.getInputStream();
    this.output = socket.getOutputStream();
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void publish(ReplicationMsg msg)
         throws IOException
  @Override
  public void close()
  {
    synchronized (closeLock)
    {
      if (closeInitiated)
      {
        return;
      }
      closeInitiated = true;
    }
    // Perform close outside of critical section.
    if (debugEnabled())
    {
      TRACER.debugInfo("Closing SocketSession to %s from %s", socket
          .getRemoteSocketAddress().toString(),
          stackTraceToSingleLineString(new Exception()));
    }
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // V4 protocol introduces a StopMsg to properly end communications.
      if (publishLock.tryLock())
      {
        try
        {
          publish(new StopMsg());
        }
        catch (final IOException ignored)
        {
          // Ignore errors on close.
        }
        finally
        {
          publishLock.unlock();
        }
      }
    }
    if (socket != null && !socket.isClosed())
    {
      try
      {
        socket.close();
      }
      catch (final IOException ignored)
      {
        // Ignore errors on close.
      }
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean closeInitiated()
  {
    synchronized (closeLock)
    {
      return closeInitiated;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public long getLastPublishTime()
  {
    return lastPublishTime;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public long getLastReceiveTime()
  {
    if (lastReceiveTime == 0)
    {
      return System.currentTimeMillis();
    }
    return lastReceiveTime;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String getReadableRemoteAddress()
  {
    return socket.getRemoteSocketAddress().toString();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String getRemoteAddress()
  {
    return socket.getInetAddress().getHostAddress();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean isEncrypted()
  {
    return false;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void publish(final ReplicationMsg msg) throws IOException
  {
    publish(msg, ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion)
         throws IOException
  @Override
  public void publish(final ReplicationMsg msg,
      final short reqProtocolVersion) throws IOException
  {
    byte[] buffer = msg.getBytes(reqProtocolVersion);
    String str = String.format("%08x", buffer.length);
    final byte[] buffer = msg.getBytes(reqProtocolVersion);
    final String str = String.format("%08x", buffer.length);
    final byte[] sendLengthBuf = str.getBytes();
    if (debugEnabled())
    publishLock.lock();
    try
    {
      TRACER.debugInfo("SocketSession publish <" + str + ">");
      output.write(sendLengthBuf);
      output.write(buffer);
      output.flush();
    }
    byte[] sendLengthBuf = str.getBytes();
    output.write(sendLengthBuf);
    output.write(buffer);
    output.flush();
    finally
    {
      publishLock.unlock();
    }
    lastPublishTime = System.currentTimeMillis();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public ReplicationMsg receive() throws IOException,
      ClassNotFoundException, DataFormatException,
      NotSupportedOldVersionPDUException
  {
    /* Read the first 8 bytes containing the packet length */
    // Read the first 8 bytes containing the packet length
    int length = 0;
    /* Let's start the stop-watch before waiting on read */
    /* for the heartbeat check to be operationnal        */
    // Let's start the stop-watch before waiting on read for the heartbeat check
    // to be operational
    lastReceiveTime = System.currentTimeMillis();
    while (length<8)
    while (length < 8)
    {
      int read = input.read(rcvLengthBuf, length, 8-length);
      final int read = input.read(rcvLengthBuf, length, 8 - length);
      if (read == -1)
      {
        lastReceiveTime=0;
        lastReceiveTime = 0;
        throw new IOException("no more data");
      }
      else
@@ -164,101 +312,59 @@
      }
    }
    int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
    final int totalLength = Integer.parseInt(
        new String(rcvLengthBuf), 16);
    try
    {
      length = 0;
      byte[] buffer = new byte[totalLength];
      final byte[] buffer = new byte[totalLength];
      while (length < totalLength)
      {
        length += input.read(buffer, length, totalLength - length);
      }
      /* We do not want the heartbeat to close the session when */
      /* we are processing a message even a time consuming one. */
      lastReceiveTime=0;
      // We do not want the heartbeat to close the session when we are
      // processing a message even a time consuming one.
      lastReceiveTime = 0;
      return ReplicationMsg.generateMsg(buffer, protocolVersion);
    }
    catch (OutOfMemoryError e)
    catch (final OutOfMemoryError e)
    {
      throw new IOException("Packet too large, can't allocate "
                            + totalLength + " bytes.");
          + totalLength + " bytes.");
    }
  }
  /**
   * {@inheritDoc}
   */
  public void stopEncryption()
  {
    // There is no security layer.
  }
  /**
   * {@inheritDoc}
   */
  public boolean isEncrypted()
  @Override
  public void setProtocolVersion(final short version)
  {
    return false;
    protocolVersion = version;
  }
  /**
   * {@inheritDoc}
   */
  public long getLastPublishTime()
  {
    return lastPublishTime;
  }
  /**
   * {@inheritDoc}
   */
  public long getLastReceiveTime()
  {
    if (lastReceiveTime==0)
    {
      return System.currentTimeMillis();
    }
    return lastReceiveTime;
  }
  /**
   * {@inheritDoc}
   */
  public String getRemoteAddress()
  {
    return socket.getInetAddress().getHostAddress();
  }
  /**
   * {@inheritDoc}
   */
  public String getReadableRemoteAddress()
  {
    return socket.getRemoteSocketAddress().toString();
  }
  /**
   * {@inheritDoc}
   */
  public void setSoTimeout(int timeout) throws SocketException
  @Override
  public void setSoTimeout(final int timeout) throws SocketException
  {
    socket.setSoTimeout(timeout);
  }
  /**
   * {@inheritDoc}
   */
  public boolean closeInitiated()
  {
    return closeInitiated;
  }
  /**
   * {@inheritDoc}
   */
  public void setProtocolVersion(short version)
  @Override
  public void stopEncryption()
  {
    protocolVersion = version;
    // There is no security layer.
  }
}
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -27,6 +27,8 @@
 */
package org.opends.server.replication.protocol;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -36,241 +38,430 @@
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;
import javax.net.ssl.SSLSocket;
import org.opends.server.loggers.debug.DebugTracer;
import javax.net.ssl.SSLSocket;
/**
 * This class implements a protocol session using TLS.
 */
public class TLSSocketSession implements ProtocolSession
public final class TLSSocketSession implements ProtocolSession
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private Socket plainSocket;
  private SSLSocket secureSocket;
  private InputStream input;
  private OutputStream output;
  private InputStream plainInput;
  private OutputStream plainOutput;
  byte[] rcvLengthBuf = new byte[8];
  private final Socket plainSocket;
  private final SSLSocket secureSocket;
  private final InputStream plainInput;
  private final OutputStream plainOutput;
  private final byte[] rcvLengthBuf = new byte[8];
  /**
   * The time the last message published to this session.
   */
  private volatile long lastPublishTime = 0;
  /**
   * The time the last message was received on this session.
   */
  private long lastReceiveTime = 0;
  private volatile long lastReceiveTime = 0;
  // Close and error guarded by stateLock: use a different lock to publish since
  // publishing can block, and we don't want to block while closing failed
  // connections.
  private final Object stateLock = new Object();
  private boolean closeInitiated = false;
  private Throwable sessionError = null;
  // Publish guarded by publishLock: use a full lock here so that we can
  // optionally publish StopMsg during close.
  private final Lock publishLock = new ReentrantLock();
  // Does not need protecting: updated only during single threaded handshake.
  private short protocolVersion = ProtocolVersion.getCurrentVersion();
  private InputStream input;
  private OutputStream output;
  /**
   * Creates a new TLSSocketSession.
   *
   * @param socket       The regular Socket on which the SocketSession will be
   *                     based.
   * @param secureSocket The secure Socket on which the SocketSession will be
   *                     based.
   * @throws IOException When an IException happens on the socket.
   * @param socket
   *          The regular Socket on which the SocketSession will be based.
   * @param secureSocket
   *          The secure Socket on which the SocketSession will be based.
   * @throws IOException
   *           When an IException happens on the socket.
   */
  public TLSSocketSession(Socket socket, SSLSocket secureSocket)
       throws IOException
  public TLSSocketSession(final Socket socket,
      final SSLSocket secureSocket) throws IOException
  {
    plainSocket = socket;
    this.secureSocket = secureSocket;
    plainInput = plainSocket.getInputStream();
    plainOutput = plainSocket.getOutputStream();
    input = secureSocket.getInputStream();
    output = secureSocket.getOutputStream();
  }
  /**
   * {@inheritDoc}
   */
  public void close() throws IOException
  {
    closeInitiated = true;
    if (debugEnabled())
    {
      TRACER.debugInfo("Closing SocketSession." +
          stackTraceToSingleLineString(new Exception("Stack:")));
      TRACER.debugInfo(
          "Creating TLSSocketSession from %s to %s in %s",
          socket.getLocalSocketAddress(),
          socket.getRemoteSocketAddress(),
          stackTraceToSingleLineString(new Exception()));
    }
    if (plainSocket != null && !plainSocket.isClosed())
    {
      plainInput.close();
      plainOutput.close();
      plainSocket.close();
    }
    if (secureSocket != null && !secureSocket.isClosed())
    {
      input.close();
      output.close();
      secureSocket.close();
    }
    this.plainSocket = socket;
    this.secureSocket = secureSocket;
    this.plainInput = plainSocket.getInputStream();
    this.plainOutput = plainSocket.getOutputStream();
    this.input = secureSocket.getInputStream();
    this.output = secureSocket.getOutputStream();
  }
  /**
   * {@inheritDoc}
   */
  public synchronized void publish(ReplicationMsg msg)
         throws IOException
  @Override
  public void close()
  {
    publish(msg, ProtocolVersion.getCurrentVersion());
  }
    Throwable localSessionError;
  /**
   * {@inheritDoc}
   */
  public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion)
         throws IOException
  {
    byte[] buffer = msg.getBytes(reqProtocolVersion);
    String str = String.format("%08x", buffer.length);
    byte[] sendLengthBuf = str.getBytes();
    output.write(sendLengthBuf);
    output.write(buffer);
    output.flush();
    lastPublishTime = System.currentTimeMillis();
  }
  /**
   * {@inheritDoc}
   */
  public ReplicationMsg receive() throws IOException,
      ClassNotFoundException, DataFormatException,
      NotSupportedOldVersionPDUException
  {
    /* Read the first 8 bytes containing the packet length */
    int length = 0;
    /* Let's start the stop-watch before waiting on read */
    /* for the heartbeat check to be operationnal        */
    lastReceiveTime = System.currentTimeMillis();
    while (length<8)
    synchronized (stateLock)
    {
      int read = input.read(rcvLengthBuf, length, 8-length);
      if (read == -1)
      if (closeInitiated)
      {
        lastReceiveTime=0;
        throw new IOException("no more data");
        return;
      }
      localSessionError = sessionError;
      closeInitiated = true;
    }
    // Perform close outside of critical section.
    if (debugEnabled())
    {
      if (localSessionError == null)
      {
        TRACER.debugInfo(
            "Closing TLSSocketSession from %s to %s in %s",
            plainSocket.getLocalSocketAddress(),
            plainSocket.getRemoteSocketAddress(),
            stackTraceToSingleLineString(new Exception()));
      }
      else
      {
        length += read;
        TRACER.debugInfo(
            "Aborting TLSSocketSession from %s to %s in %s due to the "
                + "following error: %s",
            plainSocket.getLocalSocketAddress(),
            plainSocket.getRemoteSocketAddress(),
            stackTraceToSingleLineString(new Exception()),
            stackTraceToSingleLineString(localSessionError));
      }
    }
    int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
    // V4 protocol introduces a StopMsg to properly end communications.
    if (localSessionError == null)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        if (publishLock.tryLock())
        {
          try
          {
            publish(new StopMsg());
          }
          catch (final IOException ignored)
          {
            // Ignore errors on close.
          }
          finally
          {
            publishLock.unlock();
          }
        }
      }
    }
    try
    {
      length = 0;
      byte[] buffer = new byte[totalLength];
      while (length < totalLength)
      {
        length += input.read(buffer, length, totalLength - length);
      }
      /* We do not want the heartbeat to close the session when */
      /* we are processing a message even a time consuming one. */
      lastReceiveTime=0;
      return ReplicationMsg.generateMsg(buffer, protocolVersion);
      plainSocket.close();
    }
    catch (OutOfMemoryError e)
    catch (final IOException ignored)
    {
      throw new IOException("Packet too large, can't allocate "
                            + totalLength + " bytes.");
      // Ignore errors on close.
    }
    try
    {
      secureSocket.close();
    }
    catch (final IOException ignored)
    {
      // Ignore errors on close.
    }
  }
  /**
   * {@inheritDoc}
   */
  public void stopEncryption()
  {
    input = plainInput;
    output = plainOutput;
  }
  /**
   * {@inheritDoc}
   */
  public boolean isEncrypted()
  @Override
  public boolean closeInitiated()
  {
    return !(input == plainInput);
    synchronized (stateLock)
    {
      return closeInitiated;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public long getLastPublishTime()
  {
    return lastPublishTime;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public long getLastReceiveTime()
  {
    if (lastReceiveTime==0)
    if (lastReceiveTime == 0)
    {
      return System.currentTimeMillis();
    }
    return lastReceiveTime;
  }
  /**
   * {@inheritDoc}
   */
  public String getRemoteAddress()
  {
    return plainSocket.getInetAddress().getHostAddress();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public String getReadableRemoteAddress()
  {
    return plainSocket.getRemoteSocketAddress().toString();
  }
  /**
   * {@inheritDoc}
   */
  public void setSoTimeout(int timeout) throws SocketException
  @Override
  public String getRemoteAddress()
  {
    return plainSocket.getInetAddress().getHostAddress();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean isEncrypted()
  {
    return input != plainInput;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void publish(final ReplicationMsg msg) throws IOException
  {
    publish(msg, ProtocolVersion.getCurrentVersion());
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void publish(final ReplicationMsg msg,
      final short reqProtocolVersion) throws IOException
  {
    final byte[] buffer = msg.getBytes(reqProtocolVersion);
    final String str = String.format("%08x", buffer.length);
    final byte[] sendLengthBuf = str.getBytes();
    publishLock.lock();
    try
    {
      output.write(sendLengthBuf);
      output.write(buffer);
      output.flush();
    }
    catch (final IOException e)
    {
      setSessionError(e);
      throw e;
    }
    finally
    {
      publishLock.unlock();
    }
    lastPublishTime = System.currentTimeMillis();
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public ReplicationMsg receive() throws IOException,
      DataFormatException, NotSupportedOldVersionPDUException
  {
    try
    {
      // Read the first 8 bytes containing the packet length.
      int length = 0;
      // Let's start the stop-watch before waiting on read for the heartbeat
      // check
      // to be operational.
      lastReceiveTime = System.currentTimeMillis();
      while (length < 8)
      {
        final int read = input.read(rcvLengthBuf, length, 8 - length);
        if (read == -1)
        {
          lastReceiveTime = 0;
          throw new IOException("no more data");
        }
        else
        {
          length += read;
        }
      }
      final int totalLength = Integer.parseInt(new String(
          rcvLengthBuf), 16);
      try
      {
        length = 0;
        final byte[] buffer = new byte[totalLength];
        while (length < totalLength)
        {
          final int read = input.read(buffer, length, totalLength
              - length);
          if (read == -1)
          {
            lastReceiveTime = 0;
            throw new IOException("no more data");
          }
          else
          {
            length += read;
          }
        }
        // We do not want the heartbeat to close the session when we are
        // processing a message even a time consuming one.
        lastReceiveTime = 0;
        return ReplicationMsg.generateMsg(buffer, protocolVersion);
      }
      catch (final OutOfMemoryError e)
      {
        throw new IOException("Packet too large, can't allocate "
            + totalLength + " bytes.");
      }
    }
    catch (final IOException e)
    {
      setSessionError(e);
      throw e;
    }
    catch (final DataFormatException e)
    {
      setSessionError(e);
      throw e;
    }
    catch (final NotSupportedOldVersionPDUException e)
    {
      setSessionError(e);
      throw e;
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void setProtocolVersion(final short version)
  {
    protocolVersion = version;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void setSoTimeout(final int timeout) throws SocketException
  {
    plainSocket.setSoTimeout(timeout);
  }
  /**
   * {@inheritDoc}
   */
  public boolean closeInitiated()
  {
    return closeInitiated;
  }
  /**
   * {@inheritDoc}
   */
  public void setProtocolVersion(short version)
  @Override
  public void stopEncryption()
  {
    protocolVersion = version;
    // The secure socket has been configured not to auto close the underlying
    // plain socket.
    try
    {
      secureSocket.close();
    }
    catch (IOException ignored)
    {
      // Ignore.
    }
    input = plainInput;
    output = plainOutput;
  }
  private void setSessionError(final Exception e)
  {
    synchronized (stateLock)
    {
      if (sessionError == null)
      {
        sessionError = e;
      }
    }
  }
}
opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -129,12 +129,16 @@
        }
        catch (LockConflictException e)
        {
          if (txn != null)
            txn.abort();
          txn = null;
          // Try again.
        }
        finally
        {
          if (txn != null)
          {
            // No effect if txn has committed.
            txn.abort();
            txn = null;
          }
          dbCloseLock.readLock().unlock();
        }
      }
@@ -145,10 +149,6 @@
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        logError(mb.toMessage());
        if (txn != null)
        {
          txn.abort();
        }
        replicationServer.shutdown();
      }
    }
@@ -158,16 +158,6 @@
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The ReplicationServer is shuting down.
        }
      }
      replicationServer.shutdown();
    }
    catch (UnsupportedEncodingException e)
@@ -177,17 +167,6 @@
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The ReplicationServer is shuting down.
        }
      }
      replicationServer.shutdown();
    }
  }
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -386,9 +386,10 @@
            else
            {
              ServerState startState = domain.getStartState();
              // We don't use the endState but it's updating CN as reading
              ServerState endState =
                  domain.getEligibleState(crossDomainEligibleCN, false);
              // We don't use the returned endState but it's updating CN as
              // reading
              domain.getEligibleState(crossDomainEligibleCN, false);
              ChangeNumber fcn = startState.getMaxChangeNumber(
                  cn.getServerId());
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2010 ForgeRock AS
 *      Portions Copyright 2010-2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -169,6 +169,7 @@
    }
    /**
     * Provide a string representation of this object for debug purpose..
     * @param buffer Append to this buffer.
     */
    public void toString(StringBuilder buffer)
    {
@@ -1550,9 +1551,10 @@
      // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE
      searchPhase = PERSISTENT_PHASE;
      if (writer ==null)
      final ProtocolSession localSession = session;
      if (writer ==null && localSession != null)
      {
        writer = new ECLServerWriter(session,this,replicationServerDomain);
        writer = new ECLServerWriter(localSession,this,replicationServerDomain);
        writer.start();  // start suspended
      }
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -186,13 +186,7 @@
    {
      if (session!=null)
      {
        try
        {
          session.close();
        } catch (IOException e)
        {
          // Can't do much more : ignore
        }
        session.close();
      }
      if (replicationServerDomain!=null)
        replicationServerDomain.stopServer(handler, false);
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2007-2009 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
import static org.opends.messages.BackendMessages.*;
@@ -36,12 +37,10 @@
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -1479,19 +1478,7 @@
      return writer;
    }
    /**
     * Close the writer and get a string reader for the LDIF content.
     *
     * @return Returns the string contents of the writer.
     * @throws Exception
     *           If an error occurred closing the writer.
     */
    public BufferedReader getLDIFBufferedReader() throws Exception {
      writer.close();
      String ldif = stream.toString("UTF-8");
      StringReader reader = new StringReader(ldif);
      return new BufferedReader(reader);
    }
    /**
     * Close the writer and get an LDIF reader for the LDIF content.
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -252,12 +252,17 @@
        }
        catch (LockConflictException e)
        {
          if (txn != null)
            txn.abort();
          txn = null;
          // Try again.
        }
        finally
        {
          if (txn != null)
          {
            // No effect if txn has committed.
            txn.abort();
            txn = null;
          }
          dbCloseLock.readLock().unlock();
        }
      }
@@ -268,10 +273,6 @@
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        logError(mb.toMessage());
        if (txn != null)
        {
           txn.abort();
        }
        replicationServer.shutdown();
      }
    }
@@ -281,16 +282,6 @@
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The ReplicationServer is shuting down.
        }
      }
      replicationServer.shutdown();
    }
    catch (UnsupportedEncodingException e)
@@ -300,17 +291,6 @@
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The ReplicationServer is shuting down.
        }
      }
      replicationServer.shutdown();
    }
  }
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -63,7 +63,6 @@
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
@@ -107,17 +106,14 @@
          providedMsg.toString());
      logError(providedMsg);
    }
    try
    if (providedSession != null)
    {
      if (providedSession != null)
        // This method is only called when aborting a failing handshake and
        // not StopMsg should be sent in such situation. StopMsg are only
        // expected when full handshake has been performed, or at end of
        // handshake phase 1, when DS was just gathering available RS info
        providedSession.close();
    } catch (IOException e)
    {
      // ignore
      // This method is only called when aborting a failing handshake and
      // not StopMsg should be sent in such situation. StopMsg are only
      // expected when full handshake has been performed, or at end of
      // handshake phase 1, when DS was just gathering available RS info
      providedSession.close();
    }
  }
@@ -286,9 +282,10 @@
    // We did not recognize the message, close session as what
    // can happen after is undetermined and we do not want the server to
    // be disturbed
    if (session!=null)
    ProtocolSession localSession = session;
    if (localSession != null)
    {
      closeSession(session, reason, this);
      closeSession(localSession, reason, this);
    }
    if ((replicationServerDomain != null) &&
@@ -1101,26 +1098,7 @@
    if (session != null)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end
        // communications
        try
        {
          session.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      // Close session to end ServerReader or ServerWriter
      try
      {
        session.close();
      } catch (IOException e)
      {
        // ignore.
      }
      session.close();
    }
    /*
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -341,32 +341,15 @@
    finally
    {
      /*
       * The thread only exits the loop above if some error condition
       * happen.
       * The thread only exits the loop above if some error condition happen.
       * Attempt to close the socket and stop the server handler.
       */
      try
      if (debugEnabled())
      {
        if (handler.getProtocolVersion() >=
          ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // V4 protocol introduces a StopMsg to properly end
          // communications
          try
          {
            session.publish(new StopMsg());
          } catch (IOException ioe)
          {
            // Anyway, going to close session, so nothing to do
          }
        }
        if (debugEnabled())
          TRACER.debugInfo("In " + this.getName() + " closing the session");
        session.close();
      } catch (IOException e)
      {
      // ignore
        TRACER.debugInfo("In " + this.getName()
            + " closing the session");
      }
      session.close();
      handler.doStop();
      if (debugEnabled())
      {
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -33,8 +33,6 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.messages.ReplicationMessages.*;
import java.io.IOException;
import java.net.SocketException;
import java.util.NoSuchElementException;
@@ -42,8 +40,6 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -229,25 +225,7 @@
      logError(errMessage);
    }
    finally {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end
        // communications
        try
        {
          session.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      try
      {
        session.close();
      } catch (IOException e)
      {
       // Can't do much more : ignore
      }
      session.close();
      replicationServerDomain.stopServer(handler, false);
      if (debugEnabled())
      {
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -85,6 +85,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
import org.opends.server.replication.server.ReplicationServer;
@@ -673,6 +674,7 @@
      this.weight = rsInfo.getWeight();
      this.connectedDSs = connectedDSs;
      this.connectedDSNumber = connectedDSs.size();
      this.serverState = new ServerState();
    }
    /**
@@ -1006,15 +1008,10 @@
            {
              if (connected == false)
              {
                if (session != null)
                ProtocolSession localSession = session;
                if (localSession != null)
                {
                  try
                  {
                    session.close();
                  } catch (IOException e)
                  {
                    // The session was already closed, just ignore.
                  }
                  localSession.close();
                  session = null;
                }
              }
@@ -1287,30 +1284,11 @@
    {
      if (localSession != null)
      {
        if (debugEnabled())
        if (debugEnabled()) {
          debugInfo("In RB, closing session after phase 1");
        }
        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // V4 protocol introduces a StopMsg to properly end communications
          if (!error)
          {
            try
            {
              localSession.publish(new StopMsg());
            } catch (IOException ioe)
            {
              // Anyway, going to close session, so nothing to do
            }
          }
        }
        try
        {
          localSession.close();
        } catch (IOException e)
        {
          // The session was already closed, just ignore.
        }
        localSession.close();
        localSession = null;
      }
      if (error)
@@ -1459,27 +1437,10 @@
    {
      if (localSession != null)
      {
        if (debugEnabled())
        if (debugEnabled()) {
          debugInfo("In RB, closing session after phase 1");
        // V4 protocol introduces a StopMsg to properly end communications
        if (!error)
        {
          try
          {
            localSession.publish(new StopMsg());
          } catch (IOException ioe)
          {
            // Anyway, going to close session, so nothing to do
          }
        }
        try
        {
          localSession.close();
        } catch (IOException e)
        {
          // The session was already closed, just ignore.
        }
        localSession.close();
        localSession = null;
      }
      if (error)
@@ -1545,13 +1506,7 @@
      if (session != null)
      {
        try
        {
          session.close();
        } catch (IOException ex)
        {
          // The session was already closed, just ignore.
        }
        session.close();
        session = null;
      }
      // Be sure to return null.
@@ -1625,13 +1580,7 @@
      if (session != null)
      {
        try
        {
          session.close();
        } catch (IOException ex)
        {
          // The session was already closed, just ignore.
        }
        session.close();
        session = null;
      }
      // Be sure to return null.
@@ -2255,8 +2204,7 @@
      heartbeatMonitor = new HeartbeatMonitor(
          threadName,
          session,
          heartbeatInterval,
          (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4));
          heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
@@ -2293,24 +2241,7 @@
    if (failingSession != null)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end communications
        try
        {
          failingSession.publish(new StopMsg());
        } catch (IOException ioe)
        {
          // Anyway, going to close session, so nothing to do
        }
      }
      try
      {
        failingSession.close();
      } catch (IOException e1)
      {
        // ignore
      }
      failingSession.close();
      numLostConnections++;
    }
@@ -2689,6 +2620,11 @@
        throw e;
      } catch (Exception e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        if (shutdown == false)
        {
          if ((session == null) || (!session.closeInitiated()))
@@ -2790,18 +2726,9 @@
    rsGroupId = (byte) -1;
    rsServerId = -1;
    rsServerUrl = null;
    try
    if (session != null)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // V4 protocol introduces a StopMsg to properly end communications
          session.publish(new StopMsg());
      }
      session.close();
    } catch (Exception e)
    {
      // Anyway, going to close session, so nothing to do
    }
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.plugin;
@@ -440,15 +441,9 @@
      /*
       * Shutdown any current client handling code
       */
      try
      if (session != null)
      {
        if (session != null)
        {
          session.close();
        }
      } catch (IOException e)
      {
        // ignore.
        session.close();
      }
      try
@@ -567,13 +562,7 @@
      // Handle DS connexion
      if (!performHandshake())
      {
        try
        {
          session.close();
        } catch (IOException e)
        {
          // nothing to do
        }
        session.close();
        return;
      }
      // If we come here, assured parameters sent by DS are as expected and
@@ -731,7 +720,7 @@
     */
    private void checkUpdateAssuredParameters(UpdateMsg updateMsg)
    {
      assertEquals(updateMsg.isAssured(), isAssured,
      assertEquals(updateMsg.isAssured(), isAssured,
          "msg=" + ((updateMsg instanceof AddMsg)?
              ((AddMsg)updateMsg).getDn():updateMsg.getChangeNumber()));
      if (isAssured)
@@ -1052,7 +1041,7 @@
        String entry = "dn: ou=assured-sr-timeout-entry" + rsGroupId + "," + NOT_ASSURED_DN + "\n" +
        "objectClass: top\n" +
        "objectClass: organizationalUnit\n";
        addEntry(TestCaseUtils.entryFromLdifString(entry));
        addEntry(TestCaseUtils.entryFromLdifString(entry));
      }
      long endTime = System.currentTimeMillis();
@@ -1919,8 +1908,8 @@
      sleep(50);
      ii++;
      if (ii>10)
        assertEquals(operation.getResultCode(), expectedResult,
            operation.getErrorMessage().toString());
        assertEquals(operation.getResultCode(), expectedResult,
            operation.getErrorMessage().toString());
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -23,10 +23,10 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
@@ -1123,15 +1123,9 @@
      /*
       * Shutdown any current client handling code
       */
      try
      if (session != null)
      {
        if (session != null)
        {
          session.close();
        }
      } catch (IOException e)
      {
        // ignore.
        session.close();
      }
      try