opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
@@ -23,6 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.plugin; @@ -91,8 +92,8 @@ /** * Constructor. * @param fractionalConfig * @param domain * @param fractionalConfig The fractional configuration. * @param domain The replication domain. */ public ImportFractionalContext(FractionalConfig fractionalConfig, LDAPReplicationDomain domain) opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -35,8 +35,6 @@ import org.opends.server.loggers.debug.DebugTracer; import java.io.IOException; import org.opends.server.api.DirectoryThread; /** @@ -71,27 +69,18 @@ private volatile boolean shutdown = false; /** * Send StopMsg before session closure or not. */ private final boolean sendStopBeforeClose; /** * Create a heartbeat monitor thread. * @param threadName The name of the heartbeat thread. * @param session The session on which heartbeats are to be monitored. * @param heartbeatInterval The expected interval between heartbeats received * (in milliseconds). * @param sendStopBeforeClose Should we send a StopMsg before closing the * session ? */ public HeartbeatMonitor(String threadName, ProtocolSession session, long heartbeatInterval, boolean sendStopBeforeClose) long heartbeatInterval) { super(threadName); this.session = session; this.heartbeatInterval = heartbeatInterval; this.sendStopBeforeClose = sendStopBeforeClose; } /** @@ -126,18 +115,6 @@ { // Heartbeat is well overdue so the server is assumed to be dead. logError(NOTE_HEARTBEAT_FAILURE.get(currentThread().getName())); if (sendStopBeforeClose) { // V4 protocol introduces a StopMsg to properly end communications try { session.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } session.close(); break; } @@ -160,10 +137,6 @@ } } } catch (IOException e) { // Hope that's OK. } finally { if (debugEnabled()) opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -44,10 +45,8 @@ /** * This method is called when the session with the remote must be closed. * This object won't be used anymore after this method is called. * * @throws IOException If an error happen during the close process. */ public abstract void close() throws IOException; public abstract void close(); /** * This method is called when a ReplicationMsg must be sent to opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -28,77 +28,104 @@ package org.opends.server.replication.protocol; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import org.opends.messages.Message; import org.opends.server.types.DirectoryConfig; import org.opends.server.types.CryptoManager; import org.opends.server.config.ConfigException; import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.util.SortedSet; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import java.util.SortedSet; import java.net.Socket; import java.net.InetAddress; import java.io.IOException; import org.opends.messages.Message; import org.opends.server.config.ConfigException; import org.opends.server.types.CryptoManager; import org.opends.server.types.DirectoryConfig; /** * This class represents the security configuration for replication protocol * sessions. It contains all the configuration required to use SSL, and it * determines whether encryption should be enabled for a session to a given * replication server. * */ public class ReplSessionSecurity public final class ReplSessionSecurity { /** * Whether replication sessions use SSL encryption. */ private boolean sslEncryption; private final boolean sslEncryption; /** * The name of the local certificate to use, or null if none is specified. */ private String sslCertNickname; private final String sslCertNickname; /** * The set of enabled SSL protocols, or null for the default set. */ private String sslProtocols[]; private final String sslProtocols[]; /** * The set of enabled SSL cipher suites, or null for the default set. */ private String sslCipherSuites[]; private final String sslCipherSuites[]; /** * The default soTimeout value to be used at handshake phases. * (DS<->RS and RS<->RS) * The default soTimeout value to be used at handshake phases. (DS<->RS and * RS<->RS) */ public static final int HANDSHAKE_TIMEOUT = 4000; /** * Create a ReplSessionSecurity instance from a provided multimaster domain * configuration. * * @throws ConfigException * If the supplied configuration was not valid. */ public ReplSessionSecurity() throws ConfigException { // Currently use global settings from the crypto manager. this(DirectoryConfig.getCryptoManager().getSslCertNickname(), DirectoryConfig.getCryptoManager().getSslProtocols(), DirectoryConfig.getCryptoManager().getSslCipherSuites(), DirectoryConfig.getCryptoManager().isSslEncryption()); } /** * Create a ReplSessionSecurity instance from the supplied configuration * values. * * @param sslCertNickname The name of the local certificate to use, or null * if none is specified. * @param sslProtocols The protocols that should be enabled, or null if * the default protocols should be used. * @param sslCipherSuites The cipher suites that should be enabled, or null * if the default cipher suites should be used. * @param sslEncryption Whether replication sessions use SSL encryption. * * @throws ConfigException If the supplied configuration was not valid. * @param sslCertNickname * The name of the local certificate to use, or null if none is * specified. * @param sslProtocols * The protocols that should be enabled, or null if the default * protocols should be used. * @param sslCipherSuites * The cipher suites that should be enabled, or null if the default * cipher suites should be used. * @param sslEncryption * Whether replication sessions use SSL encryption. * @throws ConfigException * If the supplied configuration was not valid. */ public ReplSessionSecurity(String sslCertNickname, SortedSet<String> sslProtocols, SortedSet<String> sslCipherSuites, boolean sslEncryption) throws ConfigException public ReplSessionSecurity(final String sslCertNickname, final SortedSet<String> sslProtocols, final SortedSet<String> sslCipherSuites, final boolean sslEncryption) throws ConfigException { if (sslProtocols == null || sslProtocols.size() == 0) { @@ -124,76 +151,46 @@ this.sslCertNickname = sslCertNickname; } /** * Create a ReplSessionSecurity instance from a provided multimaster domain * configuration. * * @throws ConfigException If the supplied configuration was not valid. */ public ReplSessionSecurity() throws ConfigException { // Currently use global settings from the crypto manager. this(DirectoryConfig.getCryptoManager().getSslCertNickname(), DirectoryConfig.getCryptoManager().getSslProtocols(), DirectoryConfig.getCryptoManager().getSslCipherSuites(), DirectoryConfig.getCryptoManager().isSslEncryption()); } /** * Determine whether a given replication server is listening on a secure * port. * @param serverURL The replication server URL. * @return true if the given replication server is listening on a secure * port, or false if it is listening on a non-secure port. */ private boolean isSecurePort(String serverURL) { // Always true unless changed for test purposes. return true; } /** * Determine whether sessions to a given replication server should be * encrypted. * @param serverURL The replication server URL. * @return true if sessions to the given replication server should be * encrypted, or false if they should not be encrypted. */ public boolean isSslEncryption(String serverURL) { // Currently use global settings from the crypto manager. return sslEncryption; } /** * Create a new protocol session in the client role on the provided socket. * @param serverURL The remote replication server to which the socket is * connected. * @param socket The connected socket. * @param soTimeout The socket timeout option to use for the protocol session. * * @param serverURL * The remote replication server to which the socket is connected. * @param socket * The connected socket. * @param soTimeout * The socket timeout option to use for the protocol session. * @return The new protocol session. * @throws ConfigException If the protocol session could not be established * due to a configuration problem. * @throws IOException If the protocol session could not be established * for some other reason. * @throws ConfigException * If the protocol session could not be established due to a * configuration problem. * @throws IOException * If the protocol session could not be established for some other * reason. */ public ProtocolSession createClientSession(String serverURL, Socket socket, int soTimeout) throws ConfigException, IOException public ProtocolSession createClientSession(final String serverURL, final Socket socket, final int soTimeout) throws ConfigException, IOException { boolean useSSL = isSecurePort(serverURL); if (useSSL) boolean hasCompleted = false; SSLSocket secureSocket = null; try { // Create a new SSL context every time to make sure we pick up the // latest contents of the trust store. CryptoManager cryptoManager = DirectoryConfig.getCryptoManager(); SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname); SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); final CryptoManager cryptoManager = DirectoryConfig .getCryptoManager(); final SSLContext sslContext = cryptoManager .getSslContext(sslCertNickname); final SSLSocketFactory sslSocketFactory = sslContext .getSocketFactory(); SSLSocket secureSocket = (SSLSocket) sslSocketFactory.createSocket(socket, socket.getInetAddress().getHostName(), socket.getPort(), false); secureSocket = (SSLSocket) sslSocketFactory.createSocket( socket, socket.getInetAddress().getHostName(), socket.getPort(), false); secureSocket.setUseClientMode(true); secureSocket.setSoTimeout(soTimeout); @@ -209,39 +206,73 @@ // Force TLS negotiation now. secureSocket.startHandshake(); hasCompleted = true; return new TLSSocketSession(socket, secureSocket); } else finally { return new SocketSession(socket); if (!hasCompleted) { try { socket.close(); } catch (final Exception ignored) { // Ignore. } if (secureSocket != null) { try { secureSocket.close(); } catch (final Exception ignored) { // Ignore. } } } } } /** * Create a new protocol session in the server role on the provided socket. * @param socket The connected socket. * * @param socket * The connected socket. * @param soTimeout * The socket timeout option to use for the protocol session. * @return The new protocol session. * @param soTimeout The socket timeout option to use for the protocol session. * @throws ConfigException If the protocol session could not be established * due to a configuration problem. * @throws IOException If the protocol session could not be established * for some other reason. * @throws ConfigException * If the protocol session could not be established due to a * configuration problem. * @throws IOException * If the protocol session could not be established for some other * reason. */ public ProtocolSession createServerSession(Socket socket, int soTimeout) throws ConfigException, IOException public ProtocolSession createServerSession(final Socket socket, final int soTimeout) throws ConfigException, IOException { boolean hasCompleted = false; SSLSocket secureSocket = null; try { // Create a new SSL context every time to make sure we pick up the // latest contents of the trust store. CryptoManager cryptoManager = DirectoryConfig.getCryptoManager(); SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname); SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); final CryptoManager cryptoManager = DirectoryConfig .getCryptoManager(); final SSLContext sslContext = cryptoManager .getSslContext(sslCertNickname); final SSLSocketFactory sslSocketFactory = sslContext .getSocketFactory(); SSLSocket secureSocket = (SSLSocket) sslSocketFactory.createSocket(socket, socket.getInetAddress().getHostName(), secureSocket = (SSLSocket) sslSocketFactory.createSocket( socket, socket.getInetAddress().getHostName(), socket.getPort(), false); secureSocket.setUseClientMode(false); secureSocket.setNeedClientAuth(true); @@ -259,23 +290,63 @@ // Force TLS negotiation now. secureSocket.startHandshake(); // SSLSession sslSession = secureSocket.getSession(); // System.out.println("Peer = " + sslSession.getPeerHost() + ":" + // sslSession.getPeerPort()); // System.out.println("Principal = " + sslSession.getPeerPrincipal()); hasCompleted = true; return new TLSSocketSession(socket, secureSocket); } catch (SSLException e) } catch (final SSLException e) { // This is probably a connection attempt from an unexpected client // log that to warn the administrator. InetAddress remHost = socket.getInetAddress(); Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(remHost. getHostName(), remHost.getHostAddress(), e.getLocalizedMessage()); final InetAddress remHost = socket.getInetAddress(); final Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get( remHost.getHostName(), remHost.getHostAddress(), e.getLocalizedMessage()); logError(message); return null; } finally { if (!hasCompleted) { try { socket.close(); } catch (final Exception ignored) { // Ignore. } if (secureSocket != null) { try { secureSocket.close(); } catch (final Exception ignored) { // Ignore. } } } } } /** * Determine whether sessions to a given replication server should be * encrypted. * * @param serverURL * The replication server URL. * @return true if sessions to the given replication server should be * encrypted, or false if they should not be encrypted. */ public boolean isSslEncryption(final String serverURL) { // Currently use global settings from the crypto manager. return sslEncryption; } } opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -27,135 +27,283 @@ */ package org.opends.server.replication.protocol; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.SocketException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.zip.DataFormatException; import org.opends.server.loggers.debug.DebugTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; /** * This class Implement a protocol session using a basic socket and relying on * the innate encoding/decoding capabilities of the ReplicationMsg * by using the getBytes() and generateMsg() methods of those classes. * the innate encoding/decoding capabilities of the ReplicationMsg by using the * getBytes() and generateMsg() methods of those classes. */ public class SocketSession implements ProtocolSession public final class SocketSession implements ProtocolSession { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private Socket socket; private InputStream input; private OutputStream output; byte[] rcvLengthBuf = new byte[8]; private final Socket socket; private final InputStream input; private final OutputStream output; private final byte[] rcvLengthBuf = new byte[8]; /** * The time the last message published to this session. */ private volatile long lastPublishTime = 0; /** * The time the last message was received on this session. */ private long lastReceiveTime = 0; private volatile long lastReceiveTime = 0; // Close guarded by closeLock: use a different lock to publish since // publishing can block, and we don't want to block while closing failed // connections. private final Object closeLock = new Object(); private boolean closeInitiated = false; // Publish guarded by publishLock: use a full lock here so that we can // optionally publish StopMsg during close. private final Lock publishLock = new ReentrantLock(); // Does not need protecting: updated only during single threaded handshake. private short protocolVersion = ProtocolVersion.getCurrentVersion(); /** * Creates a new SocketSession based on the provided socket. * * @param socket The Socket on which the SocketSession will be based. * @throws IOException When an IException happens on the socket. * @param socket * The Socket on which the SocketSession will be based. * @throws IOException * When an IException happens on the socket. */ public SocketSession(Socket socket) throws IOException public SocketSession(final Socket socket) throws IOException { this.socket = socket; /* * Use a window instead of the TCP flow control. * Therefore set a very large value for send and receive buffer sizes. */ input = socket.getInputStream(); output = socket.getOutputStream(); } /** * {@inheritDoc} */ public void close() throws IOException { closeInitiated = true; if (debugEnabled()) { TRACER.debugInfo("Closing SocketSession." + stackTraceToSingleLineString(new Exception())); TRACER.debugInfo("Creating SocketSession to %s from %s", socket .getRemoteSocketAddress().toString(), stackTraceToSingleLineString(new Exception())); } socket.close(); this.socket = socket; this.input = socket.getInputStream(); this.output = socket.getOutputStream(); } /** * {@inheritDoc} */ public synchronized void publish(ReplicationMsg msg) throws IOException @Override public void close() { synchronized (closeLock) { if (closeInitiated) { return; } closeInitiated = true; } // Perform close outside of critical section. if (debugEnabled()) { TRACER.debugInfo("Closing SocketSession to %s from %s", socket .getRemoteSocketAddress().toString(), stackTraceToSingleLineString(new Exception())); } if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly end communications. if (publishLock.tryLock()) { try { publish(new StopMsg()); } catch (final IOException ignored) { // Ignore errors on close. } finally { publishLock.unlock(); } } } if (socket != null && !socket.isClosed()) { try { socket.close(); } catch (final IOException ignored) { // Ignore errors on close. } } } /** * {@inheritDoc} */ @Override public boolean closeInitiated() { synchronized (closeLock) { return closeInitiated; } } /** * {@inheritDoc} */ @Override public long getLastPublishTime() { return lastPublishTime; } /** * {@inheritDoc} */ @Override public long getLastReceiveTime() { if (lastReceiveTime == 0) { return System.currentTimeMillis(); } return lastReceiveTime; } /** * {@inheritDoc} */ @Override public String getReadableRemoteAddress() { return socket.getRemoteSocketAddress().toString(); } /** * {@inheritDoc} */ @Override public String getRemoteAddress() { return socket.getInetAddress().getHostAddress(); } /** * {@inheritDoc} */ @Override public boolean isEncrypted() { return false; } /** * {@inheritDoc} */ @Override public void publish(final ReplicationMsg msg) throws IOException { publish(msg, ProtocolVersion.getCurrentVersion()); } /** * {@inheritDoc} */ public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion) throws IOException @Override public void publish(final ReplicationMsg msg, final short reqProtocolVersion) throws IOException { byte[] buffer = msg.getBytes(reqProtocolVersion); String str = String.format("%08x", buffer.length); final byte[] buffer = msg.getBytes(reqProtocolVersion); final String str = String.format("%08x", buffer.length); final byte[] sendLengthBuf = str.getBytes(); if (debugEnabled()) publishLock.lock(); try { TRACER.debugInfo("SocketSession publish <" + str + ">"); output.write(sendLengthBuf); output.write(buffer); output.flush(); } byte[] sendLengthBuf = str.getBytes(); output.write(sendLengthBuf); output.write(buffer); output.flush(); finally { publishLock.unlock(); } lastPublishTime = System.currentTimeMillis(); } /** * {@inheritDoc} */ @Override public ReplicationMsg receive() throws IOException, ClassNotFoundException, DataFormatException, NotSupportedOldVersionPDUException { /* Read the first 8 bytes containing the packet length */ // Read the first 8 bytes containing the packet length int length = 0; /* Let's start the stop-watch before waiting on read */ /* for the heartbeat check to be operationnal */ // Let's start the stop-watch before waiting on read for the heartbeat check // to be operational lastReceiveTime = System.currentTimeMillis(); while (length<8) while (length < 8) { int read = input.read(rcvLengthBuf, length, 8-length); final int read = input.read(rcvLengthBuf, length, 8 - length); if (read == -1) { lastReceiveTime=0; lastReceiveTime = 0; throw new IOException("no more data"); } else @@ -164,101 +312,59 @@ } } int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16); final int totalLength = Integer.parseInt( new String(rcvLengthBuf), 16); try { length = 0; byte[] buffer = new byte[totalLength]; final byte[] buffer = new byte[totalLength]; while (length < totalLength) { length += input.read(buffer, length, totalLength - length); } /* We do not want the heartbeat to close the session when */ /* we are processing a message even a time consuming one. */ lastReceiveTime=0; // We do not want the heartbeat to close the session when we are // processing a message even a time consuming one. lastReceiveTime = 0; return ReplicationMsg.generateMsg(buffer, protocolVersion); } catch (OutOfMemoryError e) catch (final OutOfMemoryError e) { throw new IOException("Packet too large, can't allocate " + totalLength + " bytes."); + totalLength + " bytes."); } } /** * {@inheritDoc} */ public void stopEncryption() { // There is no security layer. } /** * {@inheritDoc} */ public boolean isEncrypted() @Override public void setProtocolVersion(final short version) { return false; protocolVersion = version; } /** * {@inheritDoc} */ public long getLastPublishTime() { return lastPublishTime; } /** * {@inheritDoc} */ public long getLastReceiveTime() { if (lastReceiveTime==0) { return System.currentTimeMillis(); } return lastReceiveTime; } /** * {@inheritDoc} */ public String getRemoteAddress() { return socket.getInetAddress().getHostAddress(); } /** * {@inheritDoc} */ public String getReadableRemoteAddress() { return socket.getRemoteSocketAddress().toString(); } /** * {@inheritDoc} */ public void setSoTimeout(int timeout) throws SocketException @Override public void setSoTimeout(final int timeout) throws SocketException { socket.setSoTimeout(timeout); } /** * {@inheritDoc} */ public boolean closeInitiated() { return closeInitiated; } /** * {@inheritDoc} */ public void setProtocolVersion(short version) @Override public void stopEncryption() { protocolVersion = version; // There is no security layer. } } opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -27,6 +27,8 @@ */ package org.opends.server.replication.protocol; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; @@ -36,241 +38,430 @@ import java.io.OutputStream; import java.net.Socket; import java.net.SocketException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.zip.DataFormatException; import javax.net.ssl.SSLSocket; import org.opends.server.loggers.debug.DebugTracer; import javax.net.ssl.SSLSocket; /** * This class implements a protocol session using TLS. */ public class TLSSocketSession implements ProtocolSession public final class TLSSocketSession implements ProtocolSession { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private Socket plainSocket; private SSLSocket secureSocket; private InputStream input; private OutputStream output; private InputStream plainInput; private OutputStream plainOutput; byte[] rcvLengthBuf = new byte[8]; private final Socket plainSocket; private final SSLSocket secureSocket; private final InputStream plainInput; private final OutputStream plainOutput; private final byte[] rcvLengthBuf = new byte[8]; /** * The time the last message published to this session. */ private volatile long lastPublishTime = 0; /** * The time the last message was received on this session. */ private long lastReceiveTime = 0; private volatile long lastReceiveTime = 0; // Close and error guarded by stateLock: use a different lock to publish since // publishing can block, and we don't want to block while closing failed // connections. private final Object stateLock = new Object(); private boolean closeInitiated = false; private Throwable sessionError = null; // Publish guarded by publishLock: use a full lock here so that we can // optionally publish StopMsg during close. private final Lock publishLock = new ReentrantLock(); // Does not need protecting: updated only during single threaded handshake. private short protocolVersion = ProtocolVersion.getCurrentVersion(); private InputStream input; private OutputStream output; /** * Creates a new TLSSocketSession. * * @param socket The regular Socket on which the SocketSession will be * based. * @param secureSocket The secure Socket on which the SocketSession will be * based. * @throws IOException When an IException happens on the socket. * @param socket * The regular Socket on which the SocketSession will be based. * @param secureSocket * The secure Socket on which the SocketSession will be based. * @throws IOException * When an IException happens on the socket. */ public TLSSocketSession(Socket socket, SSLSocket secureSocket) throws IOException public TLSSocketSession(final Socket socket, final SSLSocket secureSocket) throws IOException { plainSocket = socket; this.secureSocket = secureSocket; plainInput = plainSocket.getInputStream(); plainOutput = plainSocket.getOutputStream(); input = secureSocket.getInputStream(); output = secureSocket.getOutputStream(); } /** * {@inheritDoc} */ public void close() throws IOException { closeInitiated = true; if (debugEnabled()) { TRACER.debugInfo("Closing SocketSession." + stackTraceToSingleLineString(new Exception("Stack:"))); TRACER.debugInfo( "Creating TLSSocketSession from %s to %s in %s", socket.getLocalSocketAddress(), socket.getRemoteSocketAddress(), stackTraceToSingleLineString(new Exception())); } if (plainSocket != null && !plainSocket.isClosed()) { plainInput.close(); plainOutput.close(); plainSocket.close(); } if (secureSocket != null && !secureSocket.isClosed()) { input.close(); output.close(); secureSocket.close(); } this.plainSocket = socket; this.secureSocket = secureSocket; this.plainInput = plainSocket.getInputStream(); this.plainOutput = plainSocket.getOutputStream(); this.input = secureSocket.getInputStream(); this.output = secureSocket.getOutputStream(); } /** * {@inheritDoc} */ public synchronized void publish(ReplicationMsg msg) throws IOException @Override public void close() { publish(msg, ProtocolVersion.getCurrentVersion()); } Throwable localSessionError; /** * {@inheritDoc} */ public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion) throws IOException { byte[] buffer = msg.getBytes(reqProtocolVersion); String str = String.format("%08x", buffer.length); byte[] sendLengthBuf = str.getBytes(); output.write(sendLengthBuf); output.write(buffer); output.flush(); lastPublishTime = System.currentTimeMillis(); } /** * {@inheritDoc} */ public ReplicationMsg receive() throws IOException, ClassNotFoundException, DataFormatException, NotSupportedOldVersionPDUException { /* Read the first 8 bytes containing the packet length */ int length = 0; /* Let's start the stop-watch before waiting on read */ /* for the heartbeat check to be operationnal */ lastReceiveTime = System.currentTimeMillis(); while (length<8) synchronized (stateLock) { int read = input.read(rcvLengthBuf, length, 8-length); if (read == -1) if (closeInitiated) { lastReceiveTime=0; throw new IOException("no more data"); return; } localSessionError = sessionError; closeInitiated = true; } // Perform close outside of critical section. if (debugEnabled()) { if (localSessionError == null) { TRACER.debugInfo( "Closing TLSSocketSession from %s to %s in %s", plainSocket.getLocalSocketAddress(), plainSocket.getRemoteSocketAddress(), stackTraceToSingleLineString(new Exception())); } else { length += read; TRACER.debugInfo( "Aborting TLSSocketSession from %s to %s in %s due to the " + "following error: %s", plainSocket.getLocalSocketAddress(), plainSocket.getRemoteSocketAddress(), stackTraceToSingleLineString(new Exception()), stackTraceToSingleLineString(localSessionError)); } } int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16); // V4 protocol introduces a StopMsg to properly end communications. if (localSessionError == null) { if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { if (publishLock.tryLock()) { try { publish(new StopMsg()); } catch (final IOException ignored) { // Ignore errors on close. } finally { publishLock.unlock(); } } } } try { length = 0; byte[] buffer = new byte[totalLength]; while (length < totalLength) { length += input.read(buffer, length, totalLength - length); } /* We do not want the heartbeat to close the session when */ /* we are processing a message even a time consuming one. */ lastReceiveTime=0; return ReplicationMsg.generateMsg(buffer, protocolVersion); plainSocket.close(); } catch (OutOfMemoryError e) catch (final IOException ignored) { throw new IOException("Packet too large, can't allocate " + totalLength + " bytes."); // Ignore errors on close. } try { secureSocket.close(); } catch (final IOException ignored) { // Ignore errors on close. } } /** * {@inheritDoc} */ public void stopEncryption() { input = plainInput; output = plainOutput; } /** * {@inheritDoc} */ public boolean isEncrypted() @Override public boolean closeInitiated() { return !(input == plainInput); synchronized (stateLock) { return closeInitiated; } } /** * {@inheritDoc} */ @Override public long getLastPublishTime() { return lastPublishTime; } /** * {@inheritDoc} */ @Override public long getLastReceiveTime() { if (lastReceiveTime==0) if (lastReceiveTime == 0) { return System.currentTimeMillis(); } return lastReceiveTime; } /** * {@inheritDoc} */ public String getRemoteAddress() { return plainSocket.getInetAddress().getHostAddress(); } /** * {@inheritDoc} */ @Override public String getReadableRemoteAddress() { return plainSocket.getRemoteSocketAddress().toString(); } /** * {@inheritDoc} */ public void setSoTimeout(int timeout) throws SocketException @Override public String getRemoteAddress() { return plainSocket.getInetAddress().getHostAddress(); } /** * {@inheritDoc} */ @Override public boolean isEncrypted() { return input != plainInput; } /** * {@inheritDoc} */ @Override public void publish(final ReplicationMsg msg) throws IOException { publish(msg, ProtocolVersion.getCurrentVersion()); } /** * {@inheritDoc} */ @Override public void publish(final ReplicationMsg msg, final short reqProtocolVersion) throws IOException { final byte[] buffer = msg.getBytes(reqProtocolVersion); final String str = String.format("%08x", buffer.length); final byte[] sendLengthBuf = str.getBytes(); publishLock.lock(); try { output.write(sendLengthBuf); output.write(buffer); output.flush(); } catch (final IOException e) { setSessionError(e); throw e; } finally { publishLock.unlock(); } lastPublishTime = System.currentTimeMillis(); } /** * {@inheritDoc} */ @Override public ReplicationMsg receive() throws IOException, DataFormatException, NotSupportedOldVersionPDUException { try { // Read the first 8 bytes containing the packet length. int length = 0; // Let's start the stop-watch before waiting on read for the heartbeat // check // to be operational. lastReceiveTime = System.currentTimeMillis(); while (length < 8) { final int read = input.read(rcvLengthBuf, length, 8 - length); if (read == -1) { lastReceiveTime = 0; throw new IOException("no more data"); } else { length += read; } } final int totalLength = Integer.parseInt(new String( rcvLengthBuf), 16); try { length = 0; final byte[] buffer = new byte[totalLength]; while (length < totalLength) { final int read = input.read(buffer, length, totalLength - length); if (read == -1) { lastReceiveTime = 0; throw new IOException("no more data"); } else { length += read; } } // We do not want the heartbeat to close the session when we are // processing a message even a time consuming one. lastReceiveTime = 0; return ReplicationMsg.generateMsg(buffer, protocolVersion); } catch (final OutOfMemoryError e) { throw new IOException("Packet too large, can't allocate " + totalLength + " bytes."); } } catch (final IOException e) { setSessionError(e); throw e; } catch (final DataFormatException e) { setSessionError(e); throw e; } catch (final NotSupportedOldVersionPDUException e) { setSessionError(e); throw e; } } /** * {@inheritDoc} */ @Override public void setProtocolVersion(final short version) { protocolVersion = version; } /** * {@inheritDoc} */ @Override public void setSoTimeout(final int timeout) throws SocketException { plainSocket.setSoTimeout(timeout); } /** * {@inheritDoc} */ public boolean closeInitiated() { return closeInitiated; } /** * {@inheritDoc} */ public void setProtocolVersion(short version) @Override public void stopEncryption() { protocolVersion = version; // The secure socket has been configured not to auto close the underlying // plain socket. try { secureSocket.close(); } catch (IOException ignored) { // Ignore. } input = plainInput; output = plainOutput; } private void setSessionError(final Exception e) { synchronized (stateLock) { if (sessionError == null) { sessionError = e; } } } } opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -129,12 +129,16 @@ } catch (LockConflictException e) { if (txn != null) txn.abort(); txn = null; // Try again. } finally { if (txn != null) { // No effect if txn has committed. txn.abort(); txn = null; } dbCloseLock.readLock().unlock(); } } @@ -145,10 +149,6 @@ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); logError(mb.toMessage()); if (txn != null) { txn.abort(); } replicationServer.shutdown(); } } @@ -158,16 +158,6 @@ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } replicationServer.shutdown(); } catch (UnsupportedEncodingException e) @@ -177,17 +167,6 @@ mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } replicationServer.shutdown(); } } opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -386,9 +386,10 @@ else { ServerState startState = domain.getStartState(); // We don't use the endState but it's updating CN as reading ServerState endState = domain.getEligibleState(crossDomainEligibleCN, false); // We don't use the returned endState but it's updating CN as // reading domain.getEligibleState(crossDomainEligibleCN, false); ChangeNumber fcn = startState.getMaxChangeNumber( cn.getServerId()); opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -23,7 +23,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2010 ForgeRock AS * Portions Copyright 2010-2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -169,6 +169,7 @@ } /** * Provide a string representation of this object for debug purpose.. * @param buffer Append to this buffer. */ public void toString(StringBuilder buffer) { @@ -1550,9 +1551,10 @@ // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE searchPhase = PERSISTENT_PHASE; if (writer ==null) final ProtocolSession localSession = session; if (writer ==null && localSession != null) { writer = new ECLServerWriter(session,this,replicationServerDomain); writer = new ECLServerWriter(localSession,this,replicationServerDomain); writer.start(); // start suspended } opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -186,13 +186,7 @@ { if (session!=null) { try { session.close(); } catch (IOException e) { // Can't do much more : ignore } session.close(); } if (replicationServerDomain!=null) replicationServerDomain.stopServer(handler, false); opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -23,6 +23,7 @@ * * * Copyright 2007-2009 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; import static org.opends.messages.BackendMessages.*; @@ -36,12 +37,10 @@ import org.opends.server.replication.protocol.LDAPUpdateMsg; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.StringReader; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -1479,19 +1478,7 @@ return writer; } /** * Close the writer and get a string reader for the LDIF content. * * @return Returns the string contents of the writer. * @throws Exception * If an error occurred closing the writer. */ public BufferedReader getLDIFBufferedReader() throws Exception { writer.close(); String ldif = stream.toString("UTF-8"); StringReader reader = new StringReader(ldif); return new BufferedReader(reader); } /** * Close the writer and get an LDIF reader for the LDIF content. opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -252,12 +252,17 @@ } catch (LockConflictException e) { if (txn != null) txn.abort(); txn = null; // Try again. } finally { if (txn != null) { // No effect if txn has committed. txn.abort(); txn = null; } dbCloseLock.readLock().unlock(); } } @@ -268,10 +273,6 @@ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); logError(mb.toMessage()); if (txn != null) { txn.abort(); } replicationServer.shutdown(); } } @@ -281,16 +282,6 @@ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } replicationServer.shutdown(); } catch (UnsupportedEncodingException e) @@ -300,17 +291,6 @@ mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shuting down. } } replicationServer.shutdown(); } } opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -63,7 +63,6 @@ import org.opends.server.replication.protocol.StartECLSessionMsg; import org.opends.server.replication.protocol.StartMsg; import org.opends.server.replication.protocol.StartSessionMsg; import org.opends.server.replication.protocol.StopMsg; import org.opends.server.replication.protocol.TopologyMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.protocol.WindowMsg; @@ -107,17 +106,14 @@ providedMsg.toString()); logError(providedMsg); } try if (providedSession != null) { if (providedSession != null) // This method is only called when aborting a failing handshake and // not StopMsg should be sent in such situation. StopMsg are only // expected when full handshake has been performed, or at end of // handshake phase 1, when DS was just gathering available RS info providedSession.close(); } catch (IOException e) { // ignore // This method is only called when aborting a failing handshake and // not StopMsg should be sent in such situation. StopMsg are only // expected when full handshake has been performed, or at end of // handshake phase 1, when DS was just gathering available RS info providedSession.close(); } } @@ -286,9 +282,10 @@ // We did not recognize the message, close session as what // can happen after is undetermined and we do not want the server to // be disturbed if (session!=null) ProtocolSession localSession = session; if (localSession != null) { closeSession(session, reason, this); closeSession(localSession, reason, this); } if ((replicationServerDomain != null) && @@ -1101,26 +1098,7 @@ if (session != null) { if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly end // communications try { session.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } // Close session to end ServerReader or ServerWriter try { session.close(); } catch (IOException e) { // ignore. } session.close(); } /* opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -341,32 +341,15 @@ finally { /* * The thread only exits the loop above if some error condition * happen. * The thread only exits the loop above if some error condition happen. * Attempt to close the socket and stop the server handler. */ try if (debugEnabled()) { if (handler.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly end // communications try { session.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } if (debugEnabled()) TRACER.debugInfo("In " + this.getName() + " closing the session"); session.close(); } catch (IOException e) { // ignore TRACER.debugInfo("In " + this.getName() + " closing the session"); } session.close(); handler.doStop(); if (debugEnabled()) { opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -33,8 +33,6 @@ import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.opends.messages.ReplicationMessages.*; import java.io.IOException; import java.net.SocketException; import java.util.NoSuchElementException; @@ -42,8 +40,6 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.StopMsg; import org.opends.server.replication.protocol.UpdateMsg; @@ -229,25 +225,7 @@ logError(errMessage); } finally { if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly end // communications try { session.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } try { session.close(); } catch (IOException e) { // Can't do much more : ignore } session.close(); replicationServerDomain.stopServer(handler, false); if (debugEnabled()) { opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -85,6 +85,7 @@ import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.protocol.WindowMsg; import org.opends.server.replication.protocol.WindowProbeMsg; import org.opends.server.types.DebugLogLevel; import org.opends.server.util.ServerConstants; import org.opends.server.replication.server.ReplicationServer; @@ -673,6 +674,7 @@ this.weight = rsInfo.getWeight(); this.connectedDSs = connectedDSs; this.connectedDSNumber = connectedDSs.size(); this.serverState = new ServerState(); } /** @@ -1006,15 +1008,10 @@ { if (connected == false) { if (session != null) ProtocolSession localSession = session; if (localSession != null) { try { session.close(); } catch (IOException e) { // The session was already closed, just ignore. } localSession.close(); session = null; } } @@ -1287,30 +1284,11 @@ { if (localSession != null) { if (debugEnabled()) if (debugEnabled()) { debugInfo("In RB, closing session after phase 1"); } if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly end communications if (!error) { try { localSession.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } } try { localSession.close(); } catch (IOException e) { // The session was already closed, just ignore. } localSession.close(); localSession = null; } if (error) @@ -1459,27 +1437,10 @@ { if (localSession != null) { if (debugEnabled()) if (debugEnabled()) { debugInfo("In RB, closing session after phase 1"); // V4 protocol introduces a StopMsg to properly end communications if (!error) { try { localSession.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } try { localSession.close(); } catch (IOException e) { // The session was already closed, just ignore. } localSession.close(); localSession = null; } if (error) @@ -1545,13 +1506,7 @@ if (session != null) { try { session.close(); } catch (IOException ex) { // The session was already closed, just ignore. } session.close(); session = null; } // Be sure to return null. @@ -1625,13 +1580,7 @@ if (session != null) { try { session.close(); } catch (IOException ex) { // The session was already closed, just ignore. } session.close(); session = null; } // Be sure to return null. @@ -2255,8 +2204,7 @@ heartbeatMonitor = new HeartbeatMonitor( threadName, session, heartbeatInterval, (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)); heartbeatInterval); heartbeatMonitor.start(); } } @@ -2293,24 +2241,7 @@ if (failingSession != null) { if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly end communications try { failingSession.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } try { failingSession.close(); } catch (IOException e1) { // ignore } failingSession.close(); numLostConnections++; } @@ -2689,6 +2620,11 @@ throw e; } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } if (shutdown == false) { if ((session == null) || (!session.closeInitiated())) @@ -2790,18 +2726,9 @@ rsGroupId = (byte) -1; rsServerId = -1; rsServerUrl = null; try if (session != null) { if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly end communications session.publish(new StopMsg()); } session.close(); } catch (Exception e) { // Anyway, going to close session, so nothing to do } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -23,6 +23,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.plugin; @@ -440,15 +441,9 @@ /* * Shutdown any current client handling code */ try if (session != null) { if (session != null) { session.close(); } } catch (IOException e) { // ignore. session.close(); } try @@ -567,13 +562,7 @@ // Handle DS connexion if (!performHandshake()) { try { session.close(); } catch (IOException e) { // nothing to do } session.close(); return; } // If we come here, assured parameters sent by DS are as expected and @@ -731,7 +720,7 @@ */ private void checkUpdateAssuredParameters(UpdateMsg updateMsg) { assertEquals(updateMsg.isAssured(), isAssured, assertEquals(updateMsg.isAssured(), isAssured, "msg=" + ((updateMsg instanceof AddMsg)? ((AddMsg)updateMsg).getDn():updateMsg.getChangeNumber())); if (isAssured) @@ -1052,7 +1041,7 @@ String entry = "dn: ou=assured-sr-timeout-entry" + rsGroupId + "," + NOT_ASSURED_DN + "\n" + "objectClass: top\n" + "objectClass: organizationalUnit\n"; addEntry(TestCaseUtils.entryFromLdifString(entry)); addEntry(TestCaseUtils.entryFromLdifString(entry)); } long endTime = System.currentTimeMillis(); @@ -1919,8 +1908,8 @@ sleep(50); ii++; if (ii>10) assertEquals(operation.getResultCode(), expectedResult, operation.getErrorMessage().toString()); assertEquals(operation.getResultCode(), expectedResult, operation.getErrorMessage().toString()); } } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -23,10 +23,10 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; @@ -1123,15 +1123,9 @@ /* * Shutdown any current client handling code */ try if (session != null) { if (session != null) { session.close(); } } catch (IOException e) { // ignore. session.close(); } try