opendj3-server-dev/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -634,7 +634,6 @@ */ public void receiveNewStatus(ChangeStatusMsg csMsg) { if (replicationServerDomain!=null) replicationServerDomain.processNewStatus(this, csMsg); replicationServerDomain.processNewStatus(this, csMsg); } } opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -842,8 +842,7 @@ */ private void registerIntoDomain() { if (replicationServerDomain != null) replicationServerDomain.registerHandler(this); replicationServerDomain.registerHandler(this); } /** opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -59,7 +59,7 @@ * LocalizableMessage are buffered into a queue. * Consumers are expected to come and consume the UpdateMsg from the queue. */ public class MessageHandler extends MonitorProvider<MonitorProviderCfg> class MessageHandler extends MonitorProvider<MonitorProviderCfg> { /** The logger of this class. */ protected static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); @@ -84,11 +84,11 @@ /** * Number of update sent to the server. */ protected int outCount = 0; private int outCount = 0; /** * Number of updates received from the server. */ protected int inCount = 0; private int inCount = 0; /** * Specifies the max queue size for this handler. */ @@ -96,7 +96,7 @@ /** * Specifies the max queue size in bytes for this handler. */ protected int maxQueueBytesSize = maxQueueSize * 100; private int maxQueueBytesSize = maxQueueSize * 100; /** * Specifies whether the consumer is following the producer (is not late). */ @@ -126,7 +126,7 @@ * in memory by this ServerHandler. * @param replicationServer The hosting replication server. */ public MessageHandler(int queueSize, ReplicationServer replicationServer) MessageHandler(int queueSize, ReplicationServer replicationServer) { this.maxQueueSize = queueSize; this.maxQueueBytesSize = queueSize * 100; @@ -140,7 +140,7 @@ * @param update The update that must be added to the list of updates of * this handler. */ public void add(UpdateMsg update) void add(UpdateMsg update) { synchronized (msgQueue) { @@ -149,7 +149,9 @@ * waiting for some changes, wake it up */ if (msgQueue.isEmpty()) { msgQueue.notify(); } msgQueue.add(update); @@ -179,7 +181,7 @@ * Set the shut down flag to true and returns the previous value of the flag. * @return The previous value of the shut down flag */ public boolean engageShutdown() boolean engageShutdown() { return shuttingDown.getAndSet(true); } @@ -188,7 +190,7 @@ * Returns the shutdown flag. * @return The shutdown flag value. */ public boolean shuttingDown() boolean shuttingDown() { return shuttingDown.get(); } @@ -198,9 +200,8 @@ * * @param waitConnections Waits for the Connections with other RS to * be established before returning. * @return The replication server domain. */ public ReplicationServerDomain getDomain(boolean waitConnections) private void setDomain(boolean waitConnections) { if (replicationServerDomain == null) { @@ -211,14 +212,13 @@ replicationServer.waitConnections(); } } return replicationServerDomain; } /** * Get the count of updates received from the server. * @return the count of update received from the server. */ public int getInCount() int getInCount() { return inCount; } @@ -372,10 +372,14 @@ while (msgQueue.isEmpty() && following) { if (!synchronous) { return null; } msgQueue.wait(500); if (!activeConsumer) { return null; } } } catch (InterruptedException e) { @@ -475,7 +479,7 @@ * Get the count of updates sent to this server. * @return The count of update sent to this server. */ public int getOutCount() int getOutCount() { return outCount; } @@ -542,7 +546,7 @@ /** * Increase the counter of updates received from the server. */ public void incrementInCount() void incrementInCount() { inCount++; } @@ -550,14 +554,12 @@ /** * Increase the counter of updates sent to the server. */ public void incrementOutCount() void incrementOutCount() { outCount++; } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public void initializeMonitorProvider(MonitorProviderCfg configuration) throws ConfigException, InitializationException @@ -616,10 +618,8 @@ else { this.baseDN = baseDN; if (!baseDN.toNormalizedString().equals("cn=changelog")) { getDomain(isDataServer); } setDomain(!"cn=changelog".equals(baseDN.toNormalizedString()) && isDataServer); } } @@ -644,7 +644,7 @@ * @param msg the last update sent. * @return boolean indicating if the update was meaningful. */ public boolean updateServerState(UpdateMsg msg) boolean updateServerState(UpdateMsg msg) { return serverState.update(msg.getCSN()); } opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -256,7 +256,7 @@ // We did not recognize the message, close session as what // can happen after is undetermined and we do not want the server to // be disturbed ServerHandler.closeSession(session, null, null); session.close(); return; } } @@ -267,7 +267,8 @@ // Just log debug information and loop. // Do not log the message during shutdown. logger.traceException(e); if (!shutdown) { if (!shutdown) { logger.error(ERR_EXCEPTION_LISTENING, e.getLocalizedMessage()); } } opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -732,8 +732,7 @@ public void receiveTopoInfoFromRS(TopologyMsg topoMsg) throws DirectoryException, IOException { if (replicationServerDomain != null) replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true); replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true); } } opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -27,7 +27,6 @@ package org.opends.server.replication.server; import java.io.IOException; import org.forgerock.i18n.slf4j.LocalizedLogger; import java.util.List; @@ -67,34 +66,6 @@ private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; /** * Close the session and log the provided error message * Log nothing if message is null. * @param providedSession The provided closing session. * @param providedMsg The provided error message. * @param handler The handler that manages that session. */ static protected void closeSession(Session providedSession, LocalizableMessage providedMsg, ServerHandler handler) { if (providedMsg != null) { if (logger.isTraceEnabled()) logger.trace("In %s closing session with err=%s", ((handler != null) ? handler : "Replication Server"), providedMsg); logger.error(providedMsg); } if (providedSession != null) { // This method is only called when aborting a failing handshake and // not StopMsg should be sent in such situation. StopMsg are only // expected when full handshake has been performed, or at end of // handshake phase 1, when DS was just gathering available RS info providedSession.close(); } } /** * The serverId of the remote server. */ protected int serverId; @@ -246,7 +217,20 @@ Session localSession = session; if (localSession != null) { closeSession(localSession, reason, this); if (reason != null) { if (logger.isTraceEnabled()) { logger.trace("In " + this + " closing session with err=" + reason); } logger.error(reason); } // This method is only called when aborting a failing handshake and // not StopMsg should be sent in such situation. StopMsg are only // expected when full handshake has been performed, or at end of // handshake phase 1, when DS was just gathering available RS info localSession.close(); } releaseDomainLock(); @@ -255,7 +239,7 @@ // We may have changed it as it was -1 and we received a value >0 from peer // server and the last topo message sent may have failed being sent: in that // case retrieve old value of generation id for replication server domain if (oldGenerationId != -100 && replicationServerDomain != null) if (oldGenerationId != -100) { replicationServerDomain.changeGenerationId(oldGenerationId); } @@ -266,7 +250,7 @@ */ protected void releaseDomainLock() { if (replicationServerDomain != null && replicationServerDomain.hasLock()) if (replicationServerDomain.hasLock()) { replicationServerDomain.release(); } @@ -336,8 +320,7 @@ { final LocalizableMessage message = ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName()); throw new DirectoryException(ResultCode.OTHER, message, e); throw new DirectoryException(ResultCode.OTHER, message, e); } reader.start(); writer.start(); @@ -369,7 +352,7 @@ public void send(ReplicationMsg msg) throws IOException { // avoid logging anything for unit tests that include a null domain. if (logger.isTraceEnabled() && replicationServerDomain != null) if (logger.isTraceEnabled()) { logger.trace("In " + replicationServerDomain.getLocalRSMonitorInstanceName() + " " @@ -518,16 +501,6 @@ return heartbeatInterval; } /** * Get the count of updates received from the server. * @return the count of update received from the server. */ @Override public int getInCount() { return inCount; } /** {@inheritDoc} */ @Override public List<Attribute> getMonitorData() @@ -600,16 +573,6 @@ public abstract String getMonitorInstanceName(); /** * Get the count of updates sent to this server. * @return The count of update sent to this server. */ @Override public int getOutCount() { return outCount; } /** * Gets the protocol version used with this remote server. * @return The protocol version used with this remote server. */ @@ -717,9 +680,7 @@ assuredSrSentUpdatesTimeout.incrementAndGet(); } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public void initializeMonitorProvider(MonitorProviderCfg configuration) throws ConfigException, InitializationException @@ -825,16 +786,11 @@ public void lockDomainWithTimeout() throws DirectoryException, InterruptedException { if (replicationServerDomain == null) { return; } Random random = new Random(); int randomTime = random.nextInt(6); // Random from 0 to 5 final Random random = new Random(); final int randomTime = random.nextInt(6); // Random from 0 to 5 // Wait at least 3 seconds + (0 to 5 seconds) long timeout = 3000 + (randomTime * 1000); boolean lockAcquired = replicationServerDomain.tryLock(timeout); final long timeout = 3000 + (randomTime * 1000); final boolean lockAcquired = replicationServerDomain.tryLock(timeout); if (!lockAcquired) { LocalizableMessage message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get( @@ -1200,8 +1156,7 @@ */ void processAck(AckMsg ack) { if (replicationServerDomain!=null) replicationServerDomain.processAck(ack, this); replicationServerDomain.processAck(ack, this); } /** @@ -1210,9 +1165,7 @@ */ public long getReferenceGenId() { if (replicationServerDomain != null) return replicationServerDomain.getGenerationId(); return -1; return replicationServerDomain.getGenerationId(); } /** @@ -1221,8 +1174,7 @@ */ void processResetGenId(ResetGenerationIdMsg msg) { if (replicationServerDomain!=null) replicationServerDomain.resetGenerationId(this, msg); replicationServerDomain.resetGenerationId(this, msg); } /** @@ -1233,8 +1185,7 @@ public void put(UpdateMsg update) throws IOException { decAndCheckWindow(); if (replicationServerDomain!=null) replicationServerDomain.put(update, this); replicationServerDomain.put(update, this); } /** @@ -1242,8 +1193,7 @@ */ public void doStop() { if (replicationServerDomain!=null) replicationServerDomain.stopServer(this, false); replicationServerDomain.stopServer(this, false); } /**