opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -22,7 +22,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS * Portions copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.server; @@ -654,7 +654,6 @@ */ public void receiveNewStatus(ChangeStatusMsg csMsg) { if (replicationServerDomain!=null) replicationServerDomain.processNewStatus(this, csMsg); replicationServerDomain.processNewStatus(this, csMsg); } } opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -838,8 +838,7 @@ */ private void registerIntoDomain() { if (replicationServerDomain != null) replicationServerDomain.registerHandler(this); replicationServerDomain.registerHandler(this); } /** opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -22,7 +22,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS * Portions copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.server; @@ -60,7 +60,7 @@ * Message are buffered into a queue. * Consumers are expected to come and consume the UpdateMsg from the queue. */ public class MessageHandler extends MonitorProvider<MonitorProviderCfg> class MessageHandler extends MonitorProvider<MonitorProviderCfg> { /** @@ -88,11 +88,11 @@ /** * Number of update sent to the server. */ protected int outCount = 0; private int outCount = 0; /** * Number of updates received from the server. */ protected int inCount = 0; private int inCount = 0; /** * Specifies the max queue size for this handler. */ @@ -100,7 +100,7 @@ /** * Specifies the max queue size in bytes for this handler. */ protected int maxQueueBytesSize = maxQueueSize * 100; private int maxQueueBytesSize = maxQueueSize * 100; /** * Specifies whether the consumer is following the producer (is not late). */ @@ -130,7 +130,7 @@ * in memory by this ServerHandler. * @param replicationServer The hosting replication server. */ public MessageHandler(int queueSize, ReplicationServer replicationServer) MessageHandler(int queueSize, ReplicationServer replicationServer) { this.maxQueueSize = queueSize; this.maxQueueBytesSize = queueSize * 100; @@ -144,7 +144,7 @@ * @param update The update that must be added to the list of updates of * this handler. */ public void add(UpdateMsg update) void add(UpdateMsg update) { synchronized (msgQueue) { @@ -153,7 +153,9 @@ * waiting for some changes, wake it up */ if (msgQueue.isEmpty()) { msgQueue.notify(); } msgQueue.add(update); @@ -183,7 +185,7 @@ * Set the shut down flag to true and returns the previous value of the flag. * @return The previous value of the shut down flag */ public boolean engageShutdown() boolean engageShutdown() { return shuttingDown.getAndSet(true); } @@ -192,7 +194,7 @@ * Returns the shutdown flag. * @return The shutdown flag value. */ public boolean shuttingDown() boolean shuttingDown() { return shuttingDown.get(); } @@ -202,9 +204,8 @@ * * @param waitConnections Waits for the Connections with other RS to * be established before returning. * @return The replication server domain. */ public ReplicationServerDomain getDomain(boolean waitConnections) private void setDomain(boolean waitConnections) { if (replicationServerDomain == null) { @@ -214,14 +215,13 @@ replicationServer.waitConnections(); } } return replicationServerDomain; } /** * Get the count of updates received from the server. * @return the count of update received from the server. */ public int getInCount() int getInCount() { return inCount; } @@ -375,10 +375,14 @@ while (msgQueue.isEmpty() && following) { if (!synchronous) { return null; } msgQueue.wait(500); if (!activeConsumer) { return null; } } } catch (InterruptedException e) { @@ -478,7 +482,7 @@ * Get the count of updates sent to this server. * @return The count of update sent to this server. */ public int getOutCount() int getOutCount() { return outCount; } @@ -545,7 +549,7 @@ /** * Increase the counter of updates received from the server. */ public void incrementInCount() void incrementInCount() { inCount++; } @@ -553,14 +557,12 @@ /** * Increase the counter of updates sent to the server. */ public void incrementOutCount() void incrementOutCount() { outCount++; } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public void initializeMonitorProvider(MonitorProviderCfg configuration) throws ConfigException, InitializationException @@ -619,8 +621,8 @@ else { this.baseDN = baseDN; if (!baseDN.toNormalizedString().equals("cn=changelog")) this.replicationServerDomain = getDomain(isDataServer); setDomain(!"cn=changelog".equals(baseDN.toNormalizedString()) && isDataServer); } } @@ -645,7 +647,7 @@ * @param msg the last update sent. * @return boolean indicating if the update was meaningful. */ public boolean updateServerState(UpdateMsg msg) boolean updateServerState(UpdateMsg msg) { return serverState.update(msg.getCSN()); } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -261,7 +261,7 @@ // We did not recognize the message, close session as what // can happen after is undetermined and we do not want the server to // be disturbed ServerHandler.closeSession(session, null, null); session.close(); return; } } @@ -275,10 +275,9 @@ { TRACER.debugCaught(DebugLogLevel.ERROR, e); } if (!shutdown) { Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()); logError(message); if (!shutdown) { logError(ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage())); } } } opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -756,8 +756,7 @@ public void receiveTopoInfoFromRS(TopologyMsg topoMsg) throws DirectoryException, IOException { if (replicationServerDomain != null) replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true); replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true); } } opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -27,7 +27,6 @@ package org.opends.server.replication.server; import java.io.IOException; import java.util.List; import java.util.Random; import java.util.concurrent.Semaphore; @@ -63,35 +62,6 @@ private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; /** * Close the session and log the provided error message * Log nothing if message is null. * @param providedSession The provided closing session. * @param providedMsg The provided error message. * @param handler The handler that manages that session. */ static protected void closeSession(Session providedSession, Message providedMsg, ServerHandler handler) { if (providedMsg != null) { if (debugEnabled()) TRACER.debugInfo("In " + ((handler != null) ? handler.toString() : "Replication Server") + " closing session with err=" + providedMsg); logError(providedMsg); } if (providedSession != null) { // This method is only called when aborting a failing handshake and // not StopMsg should be sent in such situation. StopMsg are only // expected when full handshake has been performed, or at end of // handshake phase 1, when DS was just gathering available RS info providedSession.close(); } } /** * The serverId of the remote server. */ protected int serverId; @@ -243,7 +213,20 @@ Session localSession = session; if (localSession != null) { closeSession(localSession, reason, this); if (reason != null) { if (debugEnabled()) { TRACER.debugInfo("In " + this + " closing session with err=" + reason); } logError(reason); } // This method is only called when aborting a failing handshake and // not StopMsg should be sent in such situation. StopMsg are only // expected when full handshake has been performed, or at end of // handshake phase 1, when DS was just gathering available RS info localSession.close(); } releaseDomainLock(); @@ -252,7 +235,7 @@ // We may have changed it as it was -1 and we received a value >0 from peer // server and the last topo message sent may have failed being sent: in that // case retrieve old value of generation id for replication server domain if (oldGenerationId != -100 && replicationServerDomain != null) if (oldGenerationId != -100) { replicationServerDomain.changeGenerationId(oldGenerationId); } @@ -263,7 +246,7 @@ */ protected void releaseDomainLock() { if (replicationServerDomain != null && replicationServerDomain.hasLock()) if (replicationServerDomain.hasLock()) { replicationServerDomain.release(); } @@ -333,8 +316,7 @@ { final Message message = ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName()); throw new DirectoryException(ResultCode.OTHER, message, e); throw new DirectoryException(ResultCode.OTHER, message, e); } reader.start(); writer.start(); @@ -366,7 +348,7 @@ public void send(ReplicationMsg msg) throws IOException { // avoid logging anything for unit tests that include a null domain. if (debugEnabled() && replicationServerDomain != null) if (debugEnabled()) { TRACER.debugInfo("In " + replicationServerDomain.getLocalRSMonitorInstanceName() + " " @@ -515,16 +497,6 @@ return heartbeatInterval; } /** * Get the count of updates received from the server. * @return the count of update received from the server. */ @Override public int getInCount() { return inCount; } /** {@inheritDoc} */ @Override public List<Attribute> getMonitorData() @@ -597,16 +569,6 @@ public abstract String getMonitorInstanceName(); /** * Get the count of updates sent to this server. * @return The count of update sent to this server. */ @Override public int getOutCount() { return outCount; } /** * Gets the protocol version used with this remote server. * @return The protocol version used with this remote server. */ @@ -714,9 +676,7 @@ assuredSrSentUpdatesTimeout.incrementAndGet(); } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public void initializeMonitorProvider(MonitorProviderCfg configuration) throws ConfigException, InitializationException @@ -822,16 +782,11 @@ public void lockDomainWithTimeout() throws DirectoryException, InterruptedException { if (replicationServerDomain == null) { return; } Random random = new Random(); int randomTime = random.nextInt(6); // Random from 0 to 5 final Random random = new Random(); final int randomTime = random.nextInt(6); // Random from 0 to 5 // Wait at least 3 seconds + (0 to 5 seconds) long timeout = 3000 + (randomTime * 1000); boolean lockAcquired = replicationServerDomain.tryLock(timeout); final long timeout = 3000 + (randomTime * 1000); final boolean lockAcquired = replicationServerDomain.tryLock(timeout); if (!lockAcquired) { Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get( @@ -1197,8 +1152,7 @@ */ void processAck(AckMsg ack) { if (replicationServerDomain!=null) replicationServerDomain.processAck(ack, this); replicationServerDomain.processAck(ack, this); } /** @@ -1207,9 +1161,7 @@ */ public long getReferenceGenId() { if (replicationServerDomain != null) return replicationServerDomain.getGenerationId(); return -1; return replicationServerDomain.getGenerationId(); } /** @@ -1218,8 +1170,7 @@ */ void processResetGenId(ResetGenerationIdMsg msg) { if (replicationServerDomain!=null) replicationServerDomain.resetGenerationId(this, msg); replicationServerDomain.resetGenerationId(this, msg); } /** @@ -1230,8 +1181,7 @@ public void put(UpdateMsg update) throws IOException { decAndCheckWindow(); if (replicationServerDomain!=null) replicationServerDomain.put(update, this); replicationServerDomain.put(update, this); } /** @@ -1239,8 +1189,7 @@ */ public void doStop() { if (replicationServerDomain!=null) replicationServerDomain.stopServer(this, false); replicationServerDomain.stopServer(this, false); } /**