opends/src/messages/messages/replication.properties
@@ -542,4 +542,5 @@ SEVERE_WARN_INVALID_SYNC_HIST_VALUE_214=The attribute value '%s' is not a valid \ synchronization history value SEVERE_ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD_215=Replication server RS(%d) \ failed to parse change record with changenumber %s from the database. Error: %s failed to parse change record with changenumber %s from the database. Error: %s SEVERE_ERR_SESSION_STARTUP_INTERRUPTED_216=%s was interrupted in the startup phase opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -86,19 +86,7 @@ import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.common.StatusMachineEvent; import org.opends.server.replication.protocol.AddContext; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DeleteContext; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.LDAPUpdateMsg; import org.opends.server.replication.protocol.ModifyContext; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.ModifyDnContext; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.protocol.OperationContext; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.RoutableMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.protocol.*; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.replication.service.ReplicationDomain; import org.opends.server.replication.service.ReplicationMonitor; @@ -4581,7 +4569,7 @@ ServerStatus initStatus, ServerState replicationServerState, long generationID, ProtocolSession session) Session session) { // Check domain fractional configuration consistency with local // configuration variables @@ -4876,7 +4864,7 @@ @Override public boolean processUpdate(UpdateMsg updateMsg) { // Ignore message if fractional configuration is inconcsistent and // Ignore message if fractional configuration is inconsistent and // we have been passed into bad data set status if (forceBadDataSet) { opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,7 +23,7 @@ * * * Copyright 2008 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -57,7 +57,7 @@ /** * The session on which heartbeats are to be sent. */ private final ProtocolSession session; private final Session session; /** @@ -80,7 +80,7 @@ * @param heartbeatInterval The desired interval between heartbeats in * milliseconds. */ public HeartbeatThread(String threadName, ProtocolSession session, public HeartbeatThread(String threadName, Session session, long heartbeatInterval) { super(threadName); opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
File was deleted opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -161,7 +161,7 @@ * If the protocol session could not be established for some other * reason. */ public ProtocolSession createClientSession(final Socket socket, public Session createClientSession(final Socket socket, final int soTimeout) throws ConfigException, IOException { boolean hasCompleted = false; @@ -197,7 +197,7 @@ // Force TLS negotiation now. secureSocket.startHandshake(); hasCompleted = true; return new TLSSocketSession(socket, secureSocket); return new Session(socket, secureSocket); } finally { @@ -244,7 +244,7 @@ * If the protocol session could not be established for some other * reason. */ public ProtocolSession createServerSession(final Socket socket, public Session createServerSession(final Socket socket, final int soTimeout) throws ConfigException, IOException { boolean hasCompleted = false; @@ -281,7 +281,7 @@ // Force TLS negotiation now. secureSocket.startHandshake(); hasCompleted = true; return new TLSSocketSession(socket, secureSocket); return new Session(socket, secureSocket); } catch (final SSLException e) { opends/src/server/org/opends/server/replication/protocol/Session.java
File was renamed from opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java @@ -40,21 +40,25 @@ import java.io.OutputStream; import java.net.Socket; import java.net.SocketException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.DataFormatException; import javax.net.ssl.SSLSocket; import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.util.StaticUtils; /** * This class implements a protocol session using TLS. * This class defines a replication session using TLS. */ public final class TLSSocketSession implements ProtocolSession public final class Session extends DirectoryThread { /** * The tracer object for the debug logger. @@ -86,7 +90,7 @@ * connections. */ private final Object stateLock = new Object(); private boolean closeInitiated = false; private volatile boolean closeInitiated = false; private Throwable sessionError = null; /* @@ -113,10 +117,13 @@ */ private BufferedOutputStream output; private final LinkedBlockingQueue<byte[]> sendQueue = new LinkedBlockingQueue<byte[]>(4000); private AtomicBoolean isRunning = new AtomicBoolean(false); private final CountDownLatch latch = new CountDownLatch(1); /** * Creates a new TLSSocketSession. * Creates a new Session. * * @param socket * The regular Socket on which the SocketSession will be based. @@ -125,13 +132,15 @@ * @throws IOException * When an IException happens on the socket. */ public TLSSocketSession(final Socket socket, final SSLSocket secureSocket) throws IOException public Session(final Socket socket, final SSLSocket secureSocket) throws IOException { super("Replication Session from "+ socket.getLocalSocketAddress() + " to " + socket.getRemoteSocketAddress()); if (debugEnabled()) { TRACER.debugInfo( "Creating TLSSocketSession from %s to %s in %s", "Creating Session from %s to %s in %s", socket.getLocalSocketAddress(), socket.getRemoteSocketAddress(), stackTraceToSingleLineString(new Exception())); @@ -153,9 +162,9 @@ /** * {@inheritDoc} * This method is called when the session with the remote must be closed. * This object won't be used anymore after this method is called. */ @Override public void close() { Throwable localSessionError; @@ -171,13 +180,21 @@ closeInitiated = true; } try { this.interrupt(); this.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // Perform close outside of critical section. if (debugEnabled()) { if (localSessionError == null) { TRACER.debugInfo( "Closing TLSSocketSession from %s to %s in %s", "Closing Session from %s to %s in %s", plainSocket.getLocalSocketAddress(), plainSocket.getRemoteSocketAddress(), stackTraceToSingleLineString(new Exception())); @@ -185,7 +202,7 @@ else { TRACER.debugInfo( "Aborting TLSSocketSession from %s to %s in %s due to the " "Aborting Session from %s to %s in %s due to the " + "following error: %s", plainSocket.getLocalSocketAddress(), plainSocket.getRemoteSocketAddress(), @@ -199,20 +216,13 @@ { if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { if (publishLock.tryLock()) try { try { publish(new StopMsg()); } catch (final IOException ignored) { // Ignore errors on close. } finally { publishLock.unlock(); } publish(new StopMsg()); } catch (final IOException ignored) { // Ignore errors on close. } } } @@ -223,9 +233,12 @@ /** * {@inheritDoc} * This methods allows to determine if the session close was initiated * on this Session. * * @return A boolean allowing to determine if the session close was initiated * on this Session. */ @Override public boolean closeInitiated() { synchronized (stateLock) @@ -237,9 +250,10 @@ /** * {@inheritDoc} * Gets the time the last replication message was published on this * session. * @return The timestamp in milliseconds of the last message published. */ @Override public long getLastPublishTime() { return lastPublishTime; @@ -248,9 +262,10 @@ /** * {@inheritDoc} * Gets the time the last replication message was received on this * session. * @return The timestamp in milliseconds of the last message received. */ @Override public long getLastReceiveTime() { if (lastReceiveTime == 0) @@ -263,9 +278,10 @@ /** * {@inheritDoc} * Retrieve the local URL in the form host:port. * * @return The local URL. */ @Override public String getLocalUrl() { return localUrl; @@ -274,9 +290,10 @@ /** * {@inheritDoc} * Retrieve the human readable address of the remote server. * * @return The human readable address of the remote server. */ @Override public String getReadableRemoteAddress() { return readableRemoteAddress; @@ -285,9 +302,10 @@ /** * {@inheritDoc} * Retrieve the IP address of the remote server. * * @return The IP address of the remote server. */ @Override public String getRemoteAddress() { return remoteAddress; @@ -296,9 +314,9 @@ /** * {@inheritDoc} * Determine whether the session is using a security layer. * @return true if the connection is encrypted, false otherwise. */ @Override public boolean isEncrypted() { return isEncrypted; @@ -307,12 +325,38 @@ /** * {@inheritDoc} * Sends a replication message to the remote peer. * * @param msg * The message to be sent. * @throws IOException * If an IO error occurred. */ @Override public void publish(final ReplicationMsg msg) throws IOException { final byte[] buffer = msg.getBytes(protocolVersion); if (isRunning.get()) { try { sendQueue.put(buffer); } catch (final InterruptedException e) { setSessionError(e); throw new IOException(e.getMessage()); } } else { send(buffer); } } /** Sends a replication message already encoded to the socket. * * @param buffer * the encoded buffer * @throws IOException if the message could not be sent */ private void send(final byte[] buffer) throws IOException { final String str = String.format("%08x", buffer.length); final byte[] sendLengthBuf = str.getBytes(); @@ -326,9 +370,7 @@ output.write(sendLengthBuf); output.write(buffer); output.flush(); } catch (final IOException e) { } catch (final IOException e) { setSessionError(e); throw e; } @@ -343,9 +385,20 @@ /** * {@inheritDoc} * Attempt to receive a ReplicationMsg. * This method should block the calling thread until a * ReplicationMsg is available or until an error condition. * * This method can only be called by a single thread and therefore does not * need to implement any replication. * * @return The ReplicationMsg that was received. * @throws IOException When error happened during IO process. * @throws DataFormatException When the data received is not formatted as a * ReplicationMsg. * @throws NotSupportedOldVersionPDUException If the received PDU is part of * an old protocol version and we do not support it. */ @Override public ReplicationMsg receive() throws IOException, DataFormatException, NotSupportedOldVersionPDUException { @@ -432,22 +485,23 @@ } /** * {@inheritDoc} * This method is called at the establishment of the session and can * be used to record the version of the protocol that is currently used. * * @param version The version of the protocol that is currently used. */ @Override public void setProtocolVersion(final short version) { protocolVersion = version; } /** * {@inheritDoc} * Returns the version of the protocol that is currently used. * * @return The version of the protocol that is currently used. */ @Override public short getProtocolVersion() { return protocolVersion; @@ -456,9 +510,16 @@ /** * {@inheritDoc} * Set a timeout value. * With this option set to a non-zero value, calls to the receive() method * block for only this amount of time after which a * java.net.SocketTimeoutException is raised. * The Broker is valid and usable even after such an Exception is raised. * * @param timeout the specified timeout, in milliseconds. * @throws SocketException if there is an error in the underlying protocol, * such as a TCP error. */ @Override public void setSoTimeout(final int timeout) throws SocketException { plainSocket.setSoTimeout(timeout); @@ -467,10 +528,8 @@ /** * {@inheritDoc} * Stop using the security layer, if there is any. */ @SuppressWarnings("unused") @Override public void stopEncryption() { /* @@ -500,4 +559,59 @@ } } } /** * Run method for the Session. * Loops waiting for buffers from the queue and sends them when available. */ public void run() { isRunning.set(true); latch.countDown(); if (debugEnabled()) { TRACER.debugInfo(this.getName() + " starting."); } boolean needClosing = false; while (!closeInitiated) { byte[] buffer; try { buffer = sendQueue.take(); } catch (InterruptedException ie) { break; } try { send(buffer); } catch (IOException e) { setSessionError(e); needClosing = true; } } isRunning.set(false); if (needClosing) { close(); } if (debugEnabled()) { TRACER.debugInfo(this.getName() + " stopped."); } } /** * This method can be called to wait until the session thread is * properly started. * @throws InterruptedException when interrupted */ public void waitForStartup() throws InterruptedException { latch.await(); } } opends/src/server/org/opends/server/replication/protocol/package-info.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2008 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ @@ -35,8 +36,8 @@ * The main classes of this packages are : * <br> * <ul> * <li><A HREF="SocketSession.html"><B>SocketSession</B></A> * implements the ProtocolSession interface that is * <li><A HREF="Session.html"><B>Session</B></A> * implements the session and protocol that is * used by the replication server and the directory server to communicate. * This is done by using the innate encoding/decoding capabilities of the * ReplicationMessages objects. This class is used by both the opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -90,7 +90,7 @@ * @param rcvWindowSize The receiving window size. */ public DataServerHandler( ProtocolSession session, Session session, int queueSize, String replicationServerURL, int replicationServerId, opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -391,7 +391,7 @@ * @param rcvWindowSize The receiving window size. */ public ECLServerHandler( ProtocolSession session, Session session, int queueSize, String replicationServerURL, int replicationServerId, opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -41,7 +41,7 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.protocol.DoneMsg; import org.opends.server.replication.protocol.ECLUpdateMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.protocol.StartECLSessionMsg; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; @@ -61,7 +61,7 @@ */ private static final DebugTracer TRACER = getTracer(); private ProtocolSession session; private Session session; private ECLServerHandler handler; private ReplicationServerDomain replicationServerDomain; private boolean suspended; @@ -71,12 +71,12 @@ /** * Create a ServerWriter. * * @param session the ProtocolSession that will be used to send updates. * @param session the Session that will be used to send updates. * @param handler ECL handler for which the ServerWriter is created. * @param replicationServerDomain the ReplicationServerDomain of this * ServerWriter. */ public ECLServerWriter(ProtocolSession session, ECLServerHandler handler, public ECLServerWriter(Session session, ECLServerHandler handler, ReplicationServerDomain replicationServerDomain) { super(session, handler, replicationServerDomain); opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -304,7 +304,7 @@ try { ProtocolSession session; Session session; Socket newSocket = null; try { @@ -485,7 +485,7 @@ " connects to " + remoteServerURL); Socket socket = new Socket(); ProtocolSession session = null; Session session = null; try { InetSocketAddress ServerAddr = new InetSocketAddress( opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -187,7 +187,7 @@ // The timer used to run the timeout code (timer tasks) for the assured update // messages we are waiting acks for. private Timer assuredTimeoutTimer = null; // Counter used to purge the timer tasks referemces in assuredTimeoutTimer, // Counter used to purge the timer tasks references in assuredTimeoutTimer, // every n number of treated assured messages private int assuredTimeoutTimerPurgeCounter = 0; @@ -588,17 +588,16 @@ if (serverStatus == ServerStatus.DEGRADED_STATUS) { wrongStatusServers.add(handler.getServerId()); } else { /** * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS: * We do not want this to be reported as an error to the update * maker -> no pollution or potential misunderstanding when * reading logs or monitoring and it was just administration (for * instance new server is being configured in topo: it goes in bad * gen then then full full update). */ } /** * else * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS: * We do not want this to be reported as an error to the update * maker -> no pollution or potential misunderstanding when * reading logs or monitoring and it was just administration (for * instance new server is being configured in topo: it goes in bad * gen then then full full update). */ } } } @@ -685,19 +684,12 @@ } } else { // A RS sent us the safe data message, for sure no further ack to wait if (safeDataLevel == (byte) 1) /** * Level 1 has already been reached so no further acks to wait. * Just deal with level > 1 */ if (safeDataLevel > (byte) 1) { /** * The original level was 1 so the RS that sent us this message * should have already sent his ack to the sender DS. Level 1 has * already been reached so no further acks to wait. * This should not happen in theory as the sender RS server should * have sent us a matching not assured message so we should not come * to here. */ } else { // level > 1, so Ack this message to originator RS sourceHandler.send(new AckMsg(cn)); } } @@ -815,11 +807,10 @@ expectedAcksInfo.completed(); } } } else { // The timeout occurred for the update matching this change number and the // ack with timeout error has probably already been sent. } /* Else the timeout occurred for the update matching this change number * and the ack with timeout error has probably already been sent. */ } /** @@ -934,10 +925,8 @@ expectedServerInTimeout. incrementAssuredSdSentUpdatesTimeout(); } } else { // Server disappeared ? Let's forget about it. } /* else server disappeared ? Let's forget about it. */ } } // Mark the ack info object as completed to prevent potential opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -136,7 +136,7 @@ * @param rcvWindowSize The receiving window size. */ public ReplicationServerHandler( ProtocolSession session, Session session, int queueSize, String replicationServerURL, int replicationServerId, opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,11 +50,11 @@ import org.opends.server.replication.protocol.AckMsg; import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; import org.opends.server.replication.protocol.HeartbeatThread; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.ResetGenerationIdMsg; import org.opends.server.replication.protocol.RoutableMsg; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.protocol.StartECLSessionMsg; import org.opends.server.replication.protocol.StartMsg; import org.opends.server.replication.protocol.StartSessionMsg; @@ -88,7 +88,7 @@ * @param providedMsg The provided error message. * @param handler The handler that manages that session. */ static protected void closeSession(ProtocolSession providedSession, static protected void closeSession(Session providedSession, Message providedMsg, ServerHandler handler) { if (providedMsg != null) @@ -118,7 +118,7 @@ /** * The session opened with the remote server. */ protected ProtocolSession session; protected Session session; /** * The serverURL of the remote server. @@ -237,7 +237,7 @@ /** * Creates a new server handler instance with the provided socket. * * @param session The ProtocolSession used by the ServerHandler to * @param session The Session used by the ServerHandler to * communicate with the remote entity. * @param queueSize The maximum number of update that will be kept * in memory by this ServerHandler. @@ -247,7 +247,7 @@ * @param rcvWindowSize The window size to receive from the remote server. */ public ServerHandler( ProtocolSession session, Session session, int queueSize, String replicationServerURL, int replicationServerId, @@ -271,7 +271,7 @@ // We did not recognize the message, close session as what // can happen after is undetermined and we do not want the server to // be disturbed ProtocolSession localSession = session; Session localSession = session; if (localSession != null) { closeSession(localSession, reason, this); @@ -366,6 +366,22 @@ replicationServerDomain); reader = new ServerReader(session, this); session.setName("Replication server RS(" + this.getReplicationServerId() + ") session thread to " + this.toString() + " at " + session.getReadableRemoteAddress()); session.start(); try { session.waitForStartup(); } catch (InterruptedException e) { final Message message = ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName()); throw new DirectoryException(ResultCode.OTHER, message, e); } reader.start(); writer.start(); opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; @@ -58,7 +58,7 @@ * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private final ProtocolSession session; private final Session session; private final ServerHandler handler; private final String remoteAddress; @@ -68,11 +68,11 @@ * Constructor for the LDAP server reader part of the replicationServer. * * @param session * The ProtocolSession from which to read the data. * The Session from which to read the data. * @param handler * The server handler for this server reader. */ public ServerReader(ProtocolSession session, ServerHandler handler) public ServerReader(Session session, ServerHandler handler) { super("Replication server RS(" + handler.getReplicationServerId() + ") reading from " + handler.toString() + " at " @@ -314,18 +314,6 @@ logError(errMessage); } } catch (ClassNotFoundException e) { if (debugEnabled()) TRACER.debugInfo( "In " + this.getName() + " " + stackTraceToSingleLineString(e)); /* * The remote server has sent an unknown message, * close the connection. */ errMessage = ERR_UNKNOWN_MESSAGE.get(handler.toString()); logError(errMessage); } catch (Exception e) { if (debugEnabled()) opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -40,7 +40,7 @@ import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.protocol.UpdateMsg; @@ -55,7 +55,7 @@ */ private static final DebugTracer TRACER = getTracer(); private final ProtocolSession session; private final Session session; private final ServerHandler handler; private final ReplicationServerDomain replicationServerDomain; @@ -66,13 +66,13 @@ * for new updates and forward them to the server * * @param session * the ProtocolSession that will be used to send updates. * the Session that will be used to send updates. * @param handler * handler for which the ServerWriter is created. * @param replicationServerDomain * The ReplicationServerDomain of this ServerWriter. */ public ServerWriter(ProtocolSession session, ServerHandler handler, public ServerWriter(Session session, ServerHandler handler, ReplicationServerDomain replicationServerDomain) { // Session may be null for ECLServerWriter. opends/src/server/org/opends/server/replication/server/package-info.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2008 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ @@ -46,13 +47,6 @@ * The main classes of this packages are : * <br> * <ul> * <li><A HREF="SocketSession.html"><B>SocketSession</B></A> * implements the ProtocolSession interface that is * used by the replication server and the directory server to communicate. * This is done by using the innate encoding/decoding capabilities of the * ReplicationMessages objects. This class is used by both the * replicationServer and the replication package. * </li> * <li><A HREF="ReplicationServerDomain.html"><B>ReplicationServerDomain</B></A> * implements the multiplexing part of the replication * server. It contains method for forwarding all the received messages to opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,7 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.service; @@ -34,7 +34,7 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.Session; import org.opends.server.types.DebugLogLevel; import org.opends.server.util.TimeThread; @@ -55,7 +55,7 @@ /** * The session on which heartbeats are to be sent. */ private final ProtocolSession session; private final Session session; /** * The time in milliseconds between heartbeats. @@ -77,7 +77,7 @@ * (in milliseconds). * @param serverId2 The serverId of the sender domain. */ public CTHeartbeatPublisherThread(String threadName, ProtocolSession session, public CTHeartbeatPublisherThread(String threadName, Session session, long heartbeatInterval, int serverId2) { super(threadName); opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
@@ -23,7 +23,7 @@ * * * Copyright 2007-2009 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.service; @@ -33,7 +33,7 @@ import static org.opends.server.loggers.debug.DebugLogger.*; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.Session; import org.opends.server.types.DebugLogLevel; import org.opends.server.api.DirectoryThread; @@ -54,7 +54,7 @@ /** * The session on which heartbeats are to be monitored. */ private final ProtocolSession session; private final Session session; /** @@ -93,7 +93,7 @@ * milliseconds). */ HeartbeatMonitor(int serverID, int replicationServerID, String baseDN, ProtocolSession session, long heartbeatInterval) String baseDN, Session session, long heartbeatInterval) { super("Replica DS(" + serverID + ") heartbeat monitor for domain \"" opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -95,7 +95,7 @@ */ public final static String NO_CONNECTED_SERVER = "Not connected"; private volatile String replicationServer = NO_CONNECTED_SERVER; private volatile ProtocolSession session = null; private volatile Session session = null; private final ServerState state; private final String baseDn; private final int serverId; @@ -1230,7 +1230,7 @@ String port = server.substring(separator + 1); String hostname = server.substring(0, separator); ProtocolSession localSession = null; Session localSession = null; Socket socket = null; boolean hasConnected = false; Message errorMessage = null; @@ -2180,7 +2180,7 @@ * @param failingSession the socket which failed * @param infiniteTry the socket which failed */ public void reStart(ProtocolSession failingSession, boolean infiniteTry) public void reStart(Session failingSession, boolean infiniteTry) { if (failingSession != null) { @@ -2308,7 +2308,7 @@ try { boolean credit; ProtocolSession current_session; Session current_session; Semaphore currentWindowSemaphore; /* @@ -2465,7 +2465,7 @@ // Save session information for later in case we need it for log messages // after the session has been closed and/or failed. final ProtocolSession savedSession = session; final Session savedSession = session; if (savedSession == null) { // Must be shutting down. @@ -2612,7 +2612,7 @@ if (!shutdown) { final ProtocolSession tmpSession = session; final Session tmpSession = session; if (tmpSession == null || !tmpSession.closeInitiated()) { /* @@ -2879,7 +2879,7 @@ */ public boolean isSessionEncrypted() { final ProtocolSession tmp = session; final Session tmp = session; return tmp != null ? tmp.isEncrypted() : false; } @@ -3127,7 +3127,7 @@ */ String getLocalUrl() { final ProtocolSession tmp = session; final Session tmp = session; return tmp != null ? tmp.getLocalUrl() : ""; } @@ -3142,12 +3142,12 @@ return monitor; } private void setSession(final ProtocolSession newSession) private void setSession(final Session newSession) { // De-register the monitor with the old name. deregisterReplicationMonitor(); final ProtocolSession oldSession = session; final Session oldSession = session; if (oldSession != null) { oldSession.close(); opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -68,7 +68,7 @@ import org.opends.server.replication.protocol.InitializeRcvAckMsg; import org.opends.server.replication.protocol.InitializeRequestMsg; import org.opends.server.replication.protocol.InitializeTargetMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; @@ -401,13 +401,13 @@ * @param generationID The current generationID of the * ReplicationServer with which the session * was established. * @param session The ProtocolSession that is currently used. * @param session The Session that is currently used. */ public void sessionInitiated( ServerStatus initStatus, ServerState replicationServerState, long generationID, ProtocolSession session) Session session) { // Sanity check: is it a valid initial status? if (!isValidInitialStatus(initStatus)) opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -27,25 +27,6 @@ */ package org.opends.server.replication; import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME; import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES; import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.io.File; import java.net.SocketException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.locks.Lock; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; @@ -66,34 +47,34 @@ import org.opends.server.replication.plugin.LDAPReplicationDomain; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.plugin.PersistentServerState; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.replication.service.ReplicationDomain; import org.opends.server.schema.DirectoryStringSyntax; import org.opends.server.schema.IntegerSyntax; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.AttributeValues; import org.opends.server.types.Attributes; import org.opends.server.types.ByteString; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.LockManager; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.SearchScope; import org.opends.server.types.*; import org.opends.server.util.StaticUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.io.File; import java.net.SocketException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.locks.Lock; import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.testng.Assert.*; /** * An abstract class that all Replication unit test should extend. */ @@ -900,7 +881,7 @@ * Add a task to the configuration of the current running DS. * @param taskEntry The task to add. * @param expectedResult The expected result code for the ADD. * @param errorMessageID The expected error messageID when the expected * @param errorMessage The expected error message when the expected * result code is not SUCCESS */ protected void addTask(Entry taskEntry, ResultCode expectedResult, @@ -1168,7 +1149,7 @@ * @param msgType Class of the message we are waiting for. * @return The expected message if it comes in time or fails (assertion). */ protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) { protected static ReplicationMsg waitForSpecificMsg(Session session, String msgType) { ReplicationMsg replMsg = null; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -52,39 +52,17 @@ import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.AssuredMode; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.common.DSInfo; import org.opends.server.replication.common.RSInfo; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.AckMsg; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ReplServerStartMsg; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.ServerStartMsg; import org.opends.server.replication.protocol.StartSessionMsg; import org.opends.server.replication.protocol.StopMsg; import org.opends.server.replication.protocol.TopologyMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeValue; import org.opends.server.types.ByteString; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.Operation; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.SearchScope; import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; import org.opends.server.types.*; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.testng.Assert.*; /** @@ -279,7 +257,7 @@ private ServerSocket listenSocket; private boolean shutdown = false; private ProtocolSession session = null; private Session session = null; // Parameters given at constructor time private final int port; @@ -626,8 +604,7 @@ session.publish(addMsg); // Read and return matching ack AckMsg ackMsg = (AckMsg)session.receive(); return ackMsg; return (AckMsg)session.receive(); } catch(SocketTimeoutException e) { @@ -889,7 +866,7 @@ replicationServer.setAssured(false); replicationServer.start(TIMEOUT_SCENARIO); long startTime = System.currentTimeMillis(); long startTime; // Create a safe data assured domain if (rsGroupId == (byte)1) { @@ -998,7 +975,7 @@ { int TIMEOUT = 5000; String testcase = "testSafeReadModeTimeout" + rsGroupId;; String testcase = "testSafeReadModeTimeout" + rsGroupId; try { // Create and start a RS expecting clients in safe read assured mode @@ -1008,7 +985,7 @@ replicationServer.setAssured(false); replicationServer.start(TIMEOUT_SCENARIO); long startTime = 0; long startTime; // Create a safe data assured domain if (rsGroupId == (byte)1) @@ -1363,10 +1340,9 @@ "objectClass: organizationalUnit\n"; Entry entry = TestCaseUtils.entryFromLdifString(entryStr); String parentUid = getEntryUUID(DN.decode(SAFE_READ_DN)); AckMsg ackMsg = null; try { ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid); AckMsg ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid); if (rsGroupId == (byte)2) fail("Should only go here for RS with same group id as DS"); @@ -1461,7 +1437,7 @@ Entry entry = TestCaseUtils.entryFromLdifString(entryStr); String parentUid = getEntryUUID(DN.decode(SAFE_DATA_DN)); AckMsg ackMsg = null; AckMsg ackMsg; try { ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid); @@ -1853,7 +1829,7 @@ /* * Find the multi valued attribute matching the requested assured mode */ String assuredAttr = null; String assuredAttr; switch(assuredMode) { case SAFE_READ_MODE: @@ -1873,19 +1849,15 @@ return resultMap; // Empty map Attribute attr = attrs.get(0); Iterator<AttributeValue> attValIt = attr.iterator(); // Parse and store values while (attValIt.hasNext()) { String srvStr = attValIt.next().toString(); for (AttributeValue val : attr) { String srvStr = val.toString(); StringTokenizer strtok = new StringTokenizer(srvStr, ":"); String token = strtok.nextToken(); if (token != null) { if (token != null) { int serverId = Integer.valueOf(token); token = strtok.nextToken(); if (token != null) { if (token != null) { Integer nerrors = Integer.valueOf(token); resultMap.put(serverId, nerrors); } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -62,7 +62,7 @@ import org.opends.server.replication.protocol.AckMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.ErrorMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.protocol.ReplServerStartMsg; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; @@ -673,7 +673,7 @@ ServerStatus initStatus, ServerState replicationServerState, long generationId, ProtocolSession session) Session session) { super.sessionInitiated(initStatus, replicationServerState, generationId, session); } @@ -813,7 +813,7 @@ { private boolean shutdown = false; private ProtocolSession session = null; private Session session = null; /** Parameters given at constructor time */ private int port; @@ -1942,7 +1942,7 @@ } else { // Already errors for this server, increment the value int newVal = prevInt.intValue() + 1; int newVal = prevInt + 1; prevServerErrors.put(serverId, newVal); } } @@ -2018,11 +2018,8 @@ */ private boolean areGroupAndGenerationIdOk(int fakeRsGid, long fakeRsGenId) { if ((fakeRsGid != -1) && (fakeRsGenId != -1L)) { return ( (fakeRsGid == DEFAULT_GID) && (fakeRsGenId == DEFAULT_GENID) ); } return false; return (fakeRsGid != -1) && (fakeRsGenId != -1L) && ((fakeRsGid == DEFAULT_GID) && (fakeRsGenId == DEFAULT_GENID)); } /** @@ -2030,7 +2027,10 @@ * data assured update and that are expected to effectively ack the update. If * -1 is used, the server is out of scope */ private List<Integer> computeExpectedServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen, int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen) private List<Integer> computeExpectedServersSafeData( int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen, int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen) { List<Integer> exptectedServers = new ArrayList<Integer>(); if (areGroupAndGenerationIdOk(fakeRs1Gid, fakeRs1GenId)) @@ -2650,7 +2650,7 @@ */ fakeRd3 = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID, otherFakeDsGenId, ((otherFakeDsGid == DEFAULT_GID) ? true : false), otherFakeDsGenId, (otherFakeDsGid == DEFAULT_GID), AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, otherFakeDsScen); assertNotNull(fakeRd3); @@ -2669,7 +2669,7 @@ */ fakeRs2 = createFakeReplicationServer(FRS2_ID, otherFakeRsGid, RS1_ID, otherFakeRsGenId, ((otherFakeRsGid == DEFAULT_GID) ? true : false), otherFakeRsGenId, (otherFakeRsGid == DEFAULT_GID), AssuredMode.SAFE_READ_MODE, 1, new ServerState(), otherFakeRsScen); assertNotNull(fakeRs2); @@ -2684,13 +2684,14 @@ long sendUpdateTime = System.currentTimeMillis() - startTime; // Compute some thing that will help determine what to check according to // the current test configurarion: compute if DS and RS subject to conf // the current test configuration: compute if DS and RS subject to conf // change are eligible and expected for safe read assured // eligible: the server should receive the ack request // expected: the server should send back an ack (with or without error) boolean dsIsEligible = areGroupAndGenerationIdOk(otherFakeDsGid, otherFakeDsGenId); boolean rsIsEligible = areGroupAndGenerationIdOk(otherFakeRsGid, otherFakeRsGenId); boolean dsIsExpected = false; boolean rsIsExpected = false; // Booleans to tell if we expect to see the timeout, wrong status and replay error flags boolean shouldSeeTimeout = false; boolean shouldSeeWrongStatus = false; @@ -2723,6 +2724,7 @@ switch (otherFakeRsScen) { case REPLY_OK_RS_SCENARIO: rsIsExpected = true; break; case TIMEOUT_RS_SCENARIO: shouldSeeRsIdInError = true; @@ -3443,7 +3445,7 @@ // DS 2 connected to RS 2 fakeRd2 = createFakeReplicationDomain(FDS2_ID, fakeDsGid, RS2_ID, fakeDsGenId, (fakeDsGid == DEFAULT_GID ? true : false), fakeDsGenId, (fakeDsGid == DEFAULT_GID), AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, fakeDsScen); assertNotNull(fakeRd2); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -60,28 +60,14 @@ import org.opends.server.protocols.ldap.LDAPControl; import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.protocol.*; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.plugin.ReplicationServerListener; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.ModifyDnContext; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ReplServerStartDSMsg; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.ServerStartMsg; import org.opends.server.replication.protocol.StartSessionMsg; import org.opends.server.replication.protocol.TopologyMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.protocol.WindowMsg; import org.opends.server.replication.protocol.WindowProbeMsg; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.tools.LDAPModify; import org.opends.server.tools.LDAPSearch; import org.opends.server.types.*; @@ -975,7 +961,7 @@ int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); socket.connect(ServerAddr, timeoutMS); ReplSessionSecurity replSessionSecurity = getReplSessionSecurity(); ProtocolSession session = replSessionSecurity.createClientSession(socket, Session session = replSessionSecurity.createClientSession(socket, timeoutMS); boolean sslEncryption =