opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -229,14 +229,6 @@ return newStatus; } private void createStatusAnalyzer() { if (!replicationServerDomain.isRunningStatusAnalyzer()) { replicationServerDomain.startStatusAnalyzer(); } } /** * Retrieves a set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is requested. @@ -457,8 +449,7 @@ localGenerationId = replicationServerDomain.getGenerationId(); oldGenerationId = localGenerationId; // Duplicate server ? if (!replicationServerDomain.checkForDuplicateDS(this)) if (replicationServerDomain.isAlreadyConnectedToDS(this)) { abortStart(null); return; @@ -468,7 +459,6 @@ { StartMsg outStartMsg = sendStartToRemote(); // log logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg); // The session initiator decides whether to use SSL. @@ -508,11 +498,8 @@ throw new DirectoryException(ResultCode.OTHER, null, null); } // Create the status analyzer for the domain if not already started createStatusAnalyzer(); // Create the monitoring publisher for the domain if not already started createMonitoringPublisher(); replicationServerDomain.startStatusAnalyzer(); replicationServerDomain.startMonitoringPublisher(); registerIntoDomain(); opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -399,11 +399,7 @@ for (ReplicationServerDomain domain : getReplicationServerDomains()) { // Create a normalized set of server URLs. final Set<String> connectedReplServers = new HashSet<String>(); for (String url : domain.getChangelogs()) { connectedReplServers.add(normalizeServerURL(url)); } final Set<String> connectedRSUrls = getConnectedRSUrls(domain); /* * check that all replication server in the config are in the @@ -440,7 +436,7 @@ // Don't connect to a server if it is already connected. final String normalizedServerURL = normalizeServerURL(aServerURL); if (connectedReplServers.contains(normalizedServerURL)) if (connectedRSUrls.contains(normalizedServerURL)) { continue; } @@ -472,6 +468,16 @@ } } private Set<String> getConnectedRSUrls(ReplicationServerDomain domain) { Set<String> results = new LinkedHashSet<String>(); for (ReplicationServerHandler handler : domain.getConnectedRSs().values()) { results.add(normalizeServerURL(handler.getServerAddressURL())); } return results; } /** * Establish a connection to the server with the address and port. * @@ -1039,59 +1045,23 @@ // Update threshold value for status analyzers (stop them if requested // value is 0) if (degradedStatusThreshold != configuration .getDegradedStatusThreshold()) if (degradedStatusThreshold != configuration.getDegradedStatusThreshold()) { int oldThresholdValue = degradedStatusThreshold; degradedStatusThreshold = configuration .getDegradedStatusThreshold(); degradedStatusThreshold = configuration.getDegradedStatusThreshold(); for (ReplicationServerDomain domain : getReplicationServerDomains()) { if (degradedStatusThreshold == 0) { // Requested to stop analyzers domain.stopStatusAnalyzer(); } else if (domain.isRunningStatusAnalyzer()) { // Update the threshold value for this running analyzer domain.updateStatusAnalyzer(degradedStatusThreshold); } else if (oldThresholdValue == 0) { // Requested to start analyzers with provided threshold value if (domain.getConnectedDSs().size() > 0) domain.startStatusAnalyzer(); } domain.updateDegradedStatusThreshold(degradedStatusThreshold); } } // Update period value for monitoring publishers (stop them if requested // value is 0) if (monitoringPublisherPeriod != configuration .getMonitoringPeriod()) if (monitoringPublisherPeriod != configuration.getMonitoringPeriod()) { long oldMonitoringPeriod = monitoringPublisherPeriod; monitoringPublisherPeriod = configuration.getMonitoringPeriod(); for (ReplicationServerDomain domain : getReplicationServerDomains()) { if (monitoringPublisherPeriod == 0L) { // Requested to stop monitoring publishers domain.stopMonitoringPublisher(); } else if (domain.isRunningMonitoringPublisher()) { // Update the threshold value for this running monitoring publisher domain.updateMonitoringPublisher(monitoringPublisherPeriod); } else if (oldMonitoringPeriod == 0L) { // Requested to start monitoring publishers with provided period value if ((domain.getConnectedDSs().size() > 0) || (domain.getConnectedRSs().size() > 0)) domain.startMonitoringPublisher(); } domain.updateMonitoringPeriod(monitoringPublisherPeriod); } } @@ -1126,7 +1096,7 @@ return new ConfigChangeResult(ResultCode.SUCCESS, false); } /* /** * Try and set a sensible URL for this replication server. Since we are * listening on all addresses there are a couple of potential candidates: 1) a * matching server url in the replication server's configuration, 2) hostname @@ -1666,8 +1636,7 @@ } if (debugEnabled()) TRACER.debugInfo("In " + this + " getEligibleCN() ends with " + TRACER.debugInfo("In " + this + " getEligibleCN() ends with " + " the following domainEligibleCN for each domain :" + debugLog + " thus CrossDomainEligibleCN=" + eligibleCN + " ts=" + new Date(eligibleCN.getTime()).toString()); opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -940,14 +940,14 @@ /** * Stop operations with a list of replication servers. * * @param replServers the replication servers for which * we want to stop operations * @param replServerURLs * the replication servers URLs for which we want to stop operations */ public void stopReplicationServers(Collection<String> replServers) public void stopReplicationServers(Collection<String> replServerURLs) { for (ReplicationServerHandler handler : connectedRSs.values()) { if (replServers.contains(handler.getServerAddressURL())) if (replServerURLs.contains(handler.getServerAddressURL())) { stopServer(handler, false); } @@ -976,12 +976,13 @@ } /** * Checks that a DS is not connected with same id. * Checks whether it is already connected to a DS with same id. * * @param handler the DS we want to check * @return true if this is not a duplicate server * @param handler * the DS we want to check * @return true if this DS is already connected to the current server */ public boolean checkForDuplicateDS(DataServerHandler handler) public boolean isAlreadyConnectedToDS(DataServerHandler handler) { if (connectedDSs.containsKey(handler.getServerId())) { @@ -991,9 +992,9 @@ connectedDSs.get(handler.getServerId()).toString(), handler.toString(), handler.getServerId()); logError(message); return false; return true; } return true; return false; } /** @@ -1005,6 +1006,7 @@ */ public void stopServer(ServerHandler handler, boolean shutdown) { // TODO JNR merge with stopServer(MessageHandler) if (debugEnabled()) { TRACER.debugInfo("In " @@ -1055,22 +1057,9 @@ stopMonitoringPublisher(); } if (handler.isReplicationServer()) if (connectedRSs.containsKey(handler.getServerId())) { if (connectedRSs.containsKey(handler.getServerId())) { unregisterServerHandler(handler); handler.shutdown(); // Check if generation id has to be reset mayResetGenerationId(); if (!shutdown) { // Warn our DSs that a RS or DS has quit (does not use this // handler as already removed from list) buildAndSendTopoInfoToDSs(null); } } unregisterServerHandler(handler, shutdown, false); } else if (connectedDSs.containsKey(handler.getServerId())) { // If this is the last DS for the domain, @@ -1086,25 +1075,10 @@ } stopStatusAnalyzer(); } unregisterServerHandler(handler); handler.shutdown(); // Check if generation id has to be reset mayResetGenerationId(); if (!shutdown) { // Update the remote replication servers with our list // of connected LDAP servers buildAndSendTopoInfoToRSs(); // Warn our DSs that a RS or DS has quit (does not use this // handler as already removed from list) buildAndSendTopoInfoToDSs(null); } unregisterServerHandler(handler, shutdown, true); } else if (otherHandlers.contains(handler)) { unRegisterHandler(handler); handler.shutdown(); unregisterOtherHandler(handler); } } catch(Exception e) @@ -1122,12 +1096,41 @@ } } private void unregisterOtherHandler(MessageHandler handler) { unRegisterHandler(handler); handler.shutdown(); } private void unregisterServerHandler(ServerHandler handler, boolean shutdown, boolean isDirectoryServer) { unregisterServerHandler(handler); handler.shutdown(); // Check if generation id has to be reset mayResetGenerationId(); if (!shutdown) { if (isDirectoryServer) { // Update the remote replication servers with our list // of connected LDAP servers buildAndSendTopoInfoToRSs(); } // Warn our DSs that a RS or DS has quit (does not use this // handler as already removed from list) buildAndSendTopoInfoToDSs(null); } } /** * Stop the handler. * @param handler The handler to stop. */ public void stopServer(MessageHandler handler) { // TODO JNR merge with stopServer(ServerHandler, boolean) if (debugEnabled()) { TRACER.debugInfo("In " @@ -1163,8 +1166,7 @@ { if (otherHandlers.contains(handler)) { unRegisterHandler(handler); handler.shutdown(); unregisterOtherHandler(handler); } } catch(Exception e) @@ -1269,39 +1271,40 @@ } /** * Checks that a remote RS is not already connected to this hosting RS. * @param handler The handler for the remote RS. * Checks whether a remote RS is already connected to this hosting RS. * * @param handler * The handler for the remote RS. * @return flag specifying whether the remote RS is already connected. * @throws DirectoryException when a problem occurs. * @throws DirectoryException * when a problem occurs. */ public boolean checkForDuplicateRS(ReplicationServerHandler handler) throws DirectoryException public boolean isAlreadyConnectedToRS(ReplicationServerHandler handler) throws DirectoryException { ReplicationServerHandler oldHandler = connectedRSs.get(handler.getServerId()); if (oldHandler != null) connectedRSs.get(handler.getServerId()); if (oldHandler == null) { if (oldHandler.getServerAddressURL().equals( handler.getServerAddressURL())) { // this is the same server, this means that our ServerStart messages // have been sent at about the same time and 2 connections // have been established. // Silently drop this connection. return false; } else { // looks like two replication servers have the same serverId // log an error message and drop this connection. Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get( localReplicationServer.getMonitorInstanceName(), oldHandler. getServerAddressURL(), handler.getServerAddressURL(), handler.getServerId()); throw new DirectoryException(ResultCode.OTHER, message); } return false; } return true; if (oldHandler.getServerAddressURL().equals(handler.getServerAddressURL())) { // this is the same server, this means that our ServerStart messages // have been sent at about the same time and 2 connections // have been established. // Silently drop this connection. return true; } // looks like two replication servers have the same serverId // log an error message and drop this connection. Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get( localReplicationServer.getMonitorInstanceName(), oldHandler.getServerAddressURL(), handler.getServerAddressURL(), handler.getServerId()); throw new DirectoryException(ResultCode.OTHER, message); } /** @@ -1327,21 +1330,6 @@ } /** * Return a Set of String containing the lists of Replication servers * connected to this server. * @return the set of connected servers */ public Set<String> getChangelogs() { Set<String> results = new LinkedHashSet<String>(); for (ReplicationServerHandler handler : connectedRSs.values()) { results.add(handler.getServerAddressURL()); } return results; } /** * Return a set containing the server that produced update and known by * this replicationServer from all over the topology, * whatever directly connected of connected to another RS. @@ -2861,11 +2849,11 @@ } /** * Starts the status analyzer for the domain. * Starts the status analyzer for the domain if not already started. */ public void startStatusAnalyzer() { if (statusAnalyzer == null) if (!isRunningStatusAnalyzer()) { int degradedStatusThreshold = localReplicationServer.getDegradedStatusThreshold(); @@ -2880,9 +2868,9 @@ /** * Stops the status analyzer for the domain. */ public void stopStatusAnalyzer() private void stopStatusAnalyzer() { if (statusAnalyzer != null) if (isRunningStatusAnalyzer()) { statusAnalyzer.shutdown(); statusAnalyzer.waitForShutdown(); @@ -2894,32 +2882,19 @@ * Tests if the status analyzer for this domain is running. * @return True if the status analyzer is running, false otherwise. */ public boolean isRunningStatusAnalyzer() private boolean isRunningStatusAnalyzer() { return statusAnalyzer != null; } /** * Update the status analyzer with the new threshold value. * @param degradedStatusThreshold The new threshold value. */ public void updateStatusAnalyzer(int degradedStatusThreshold) { if (statusAnalyzer != null) { statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold); } } /** * Starts the monitoring publisher for the domain. * Starts the monitoring publisher for the domain if not already started. */ public void startMonitoringPublisher() { if (monitoringPublisher == null) if (!isRunningMonitoringPublisher()) { long period = localReplicationServer.getMonitoringPublisherPeriod(); long period = localReplicationServer.getMonitoringPublisherPeriod(); if (period > 0) // 0 means no monitoring publisher { monitoringPublisher = new MonitoringPublisher(this, period); @@ -2931,9 +2906,9 @@ /** * Stops the monitoring publisher for the domain. */ public void stopMonitoringPublisher() private void stopMonitoringPublisher() { if (monitoringPublisher != null) if (isRunningMonitoringPublisher()) { monitoringPublisher.shutdown(); monitoringPublisher.waitForShutdown(); @@ -2945,24 +2920,12 @@ * Tests if the monitoring publisher for this domain is running. * @return True if the monitoring publisher is running, false otherwise. */ public boolean isRunningMonitoringPublisher() private boolean isRunningMonitoringPublisher() { return monitoringPublisher != null; } /** * Update the monitoring publisher with the new period value. * @param period The new period value. */ public void updateMonitoringPublisher(long period) { if (monitoringPublisher != null) { monitoringPublisher.setPeriod(period); } } /** * {@inheritDoc} */ @Override @@ -3384,4 +3347,52 @@ { return this.localReplicationServer.getServerId(); } /** * Update the status analyzer with the new threshold value. * * @param degradedStatusThreshold * The new threshold value. */ void updateDegradedStatusThreshold(int degradedStatusThreshold) { if (degradedStatusThreshold == 0) { // Requested to stop analyzers stopStatusAnalyzer(); } else if (isRunningStatusAnalyzer()) { statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold); } else if (getConnectedDSs().size() > 0) { // Requested to start analyzers with provided threshold value startStatusAnalyzer(); } } /** * Update the monitoring publisher with the new period value. * * @param period * The new period value. */ void updateMonitoringPeriod(long period) { if (period == 0) { // Requested to stop monitoring publishers stopMonitoringPublisher(); } else if (isRunningMonitoringPublisher()) { monitoringPublisher.setPeriod(period); } else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0) { // Requested to start monitoring publishers with provided period value startMonitoringPublisher(); } } } opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -160,7 +160,6 @@ { lockDomain(false); // no timeout // Send start ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(); // Wait answer @@ -174,22 +173,19 @@ // Remote replication server is probably shutting down or simultaneous // cross-connect detected. abortStart(null); return; } else { Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg .getClass().getCanonicalName(), "ReplServerStartMsg"); abortStart(message); return; } return; } // Process hello from remote. processStartFromRemote((ReplServerStartMsg)msg); processStartFromRemote((ReplServerStartMsg) msg); // Duplicate server ? if (!replicationServerDomain.checkForDuplicateRS(this)) if (replicationServerDomain.isAlreadyConnectedToRS(this)) { // Simultaneous cross connect. abortStart(null); @@ -207,10 +203,9 @@ generationId, false); } // Log logStartHandshakeSNDandRCV(outReplServerStartMsg,(ReplServerStartMsg)msg); // Until here session is encrypted then it depends on the negociation // Until here session is encrypted then it depends on the negotiation // The session initiator decides whether to use SSL. if (!this.sslEncryption) session.stopEncryption(); @@ -239,8 +234,7 @@ logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg); // Create the monitoring publisher for the domain if not already started createMonitoringPublisher(); replicationServerDomain.startMonitoringPublisher(); /* FIXME: i think this should be done for all protocol version !! @@ -292,7 +286,6 @@ } finally { // Release domain if (replicationServerDomain != null && replicationServerDomain.hasLock()) replicationServerDomain.release(); @@ -316,8 +309,7 @@ // lock with timeout lockDomain(true); // Duplicate server ? if (!replicationServerDomain.checkForDuplicateRS(this)) if (replicationServerDomain.isAlreadyConnectedToRS(this)) { abortStart(null); return; @@ -326,7 +318,6 @@ this.localGenerationId = replicationServerDomain.getGenerationId(); ReplServerStartMsg outReplServerStartMsg = sendStartToRemote(); // log logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg); /* @@ -358,7 +349,6 @@ .createTopologyMsgForRS(); sendTopoInfo(outTopoMsg); // log logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg); } else @@ -390,9 +380,7 @@ */ } // Create the monitoring publisher for the domain if not already started createMonitoringPublisher(); replicationServerDomain.startMonitoringPublisher(); registerIntoDomain(); @@ -616,9 +604,7 @@ public void shutdown() { super.shutdown(); /* * Stop the remote LSHandler */ // Stop the remote LSHandler synchronized (remoteDirectoryServers) { for (LightweightServerHandler lsh : remoteDirectoryServers.values()) @@ -755,7 +741,7 @@ attributes.add(Attributes.create("missing-changes", String.valueOf(md.getMissingChangesRS(serverId)))); /* get the Server State */ // get the Server State AttributeBuilder builder = new AttributeBuilder("server-state"); ServerState state = md.getRSStates(serverId); if (state != null) @@ -769,6 +755,7 @@ return attributes; } /** * {@inheritDoc} */ opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -78,8 +78,7 @@ if (debugEnabled()) TRACER.debugInfo("In " + ((handler != null) ? handler.toString() : "Replication Server") + " closing session with err=" + providedMsg.toString()); " closing session with err=" + providedMsg); logError(providedMsg); } @@ -125,7 +124,7 @@ */ protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger(); /** // Number of updates received from the server in assured safe data mode. * Number of updates received from the server in assured safe data mode. */ protected int assuredSdReceivedUpdates = 0; /** @@ -169,7 +168,7 @@ /** * The initial size of the sending window. */ int sendWindowSize; protected int sendWindowSize; /** * remote generation id. */ @@ -185,7 +184,7 @@ /** * Group id of this remote server. */ protected byte groupId = (byte) -1; protected byte groupId = -1; /** * The SSL encryption after the negotiation with the peer. */ @@ -254,8 +253,7 @@ closeSession(localSession, reason, this); } if ((replicationServerDomain != null) && replicationServerDomain.hasLock()) if (replicationServerDomain != null && replicationServerDomain.hasLock()) replicationServerDomain.release(); // If generation id of domain was changed, set it back to old value @@ -263,10 +261,9 @@ // peer server and the last topo message sent may have failed being // sent: in that case retrieve old value of generation id for // replication server domain if (oldGenerationId != -100) if (oldGenerationId != -100 && replicationServerDomain != null) { if (replicationServerDomain!=null) replicationServerDomain.changeGenerationId(oldGenerationId, false); replicationServerDomain.changeGenerationId(oldGenerationId, false); } } @@ -304,7 +301,6 @@ @Override public boolean engageShutdown() { // Use thread safe boolean return shuttingDown.getAndSet(true); } @@ -340,13 +336,11 @@ // sendWindow MUST be created before starting the writer sendWindow = new Semaphore(sendWindowSize); writer = new ServerWriter(session, this, replicationServerDomain); writer = new ServerWriter(session, this, replicationServerDomain); reader = new ServerReader(session, this); session.setName("Replication server RS(" + this.getReplicationServerId() + ") session thread to " + this.toString() + " at " session.setName("Replication server RS(" + getReplicationServerId() + ") session thread to " + this + " at " + session.getReadableRemoteAddress()); session.start(); try @@ -366,9 +360,8 @@ // Create a thread to send heartbeat messages. if (heartbeatInterval > 0) { String threadName = "Replication server RS(" + this.getReplicationServerId() + ") heartbeat publisher to " + this.toString() + " at " String threadName = "Replication server RS(" + getReplicationServerId() + ") heartbeat publisher to " + this + " at " + session.getReadableRemoteAddress(); heartbeatThread = new HeartbeatThread(threadName, session, heartbeatInterval / 3); @@ -788,7 +781,7 @@ */ public boolean isReplicationServer() { return (!this.isDataServer()); return !this.isDataServer(); } @@ -827,62 +820,58 @@ // it will be created and locked later in the method if (!timedout) { // !timedout if (!replicationServerDomain.hasLock()) replicationServerDomain.lock(); return; } else /** * Take the lock on the domain. * WARNING: Here we try to acquire the lock with a timeout. This * is for preventing a deadlock that may happen if there are cross * connection attempts (for same domain) from this replication * server and from a peer one: * Here is the scenario: * - RS1 connect thread takes the domain lock and starts * connection to RS2 * - at the same time RS2 connect thread takes his domain lock and * start connection to RS2 * - RS2 listen thread starts processing received * ReplServerStartMsg from RS1 and wants to acquire the lock on * the domain (here) but cannot as RS2 connect thread already has * it * - RS1 listen thread starts processing received * ReplServerStartMsg from RS2 and wants to acquire the lock on * the domain (here) but cannot as RS1 connect thread already has * it * => Deadlock: 4 threads are locked. * So to prevent that in such situation, the listen threads here * will both timeout trying to acquire the lock. The random time * for the timeout should allow on connection attempt to be * aborted whereas the other one should have time to finish in the * same time. * Warning: the minimum time (3s) should be big enough to allow * normal situation connections to terminate. The added random * time should represent a big enough range so that the chance to * have one listen thread timing out a lot before the peer one is * great. When the first listen thread times out, the remote * connect thread should release the lock and allow the peer * listen thread to take the lock it was waiting for and process * the connection attempt. */ Random random = new Random(); int randomTime = random.nextInt(6); // Random from 0 to 5 // Wait at least 3 seconds + (0 to 5 seconds) long timeout = 3000 + (randomTime * 1000); boolean lockAcquired = replicationServerDomain.tryLock(timeout); if (!lockAcquired) { // timedout /** * Take the lock on the domain. * WARNING: Here we try to acquire the lock with a timeout. This * is for preventing a deadlock that may happen if there are cross * connection attempts (for same domain) from this replication * server and from a peer one: * Here is the scenario: * - RS1 connect thread takes the domain lock and starts * connection to RS2 * - at the same time RS2 connect thread takes his domain lock and * start connection to RS2 * - RS2 listen thread starts processing received * ReplServerStartMsg from RS1 and wants to acquire the lock on * the domain (here) but cannot as RS2 connect thread already has * it * - RS1 listen thread starts processing received * ReplServerStartMsg from RS2 and wants to acquire the lock on * the domain (here) but cannot as RS1 connect thread already has * it * => Deadlock: 4 threads are locked. * So to prevent that in such situation, the listen threads here * will both timeout trying to acquire the lock. The random time * for the timeout should allow on connection attempt to be * aborted whereas the other one should have time to finish in the * same time. * Warning: the minimum time (3s) should be big enough to allow * normal situation connections to terminate. The added random * time should represent a big enough range so that the chance to * have one listen thread timing out a lot before the peer one is * great. When the first listen thread times out, the remote * connect thread should release the lock and allow the peer * listen thread to take the lock it was waiting for and process * the connection attempt. */ Random random = new Random(); int randomTime = random.nextInt(6); // Random from 0 to 5 // Wait at least 3 seconds + (0 to 5 seconds) long timeout = 3000 + (randomTime * 1000); boolean noTimeout = replicationServerDomain.tryLock(timeout); if (!noTimeout) { // Timeout Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get( getBaseDN(), serverId, session.getReadableRemoteAddress(), getReplicationServerId()); throw new DirectoryException(ResultCode.OTHER, message); } Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get( getBaseDN(), serverId, session.getReadableRemoteAddress(), getReplicationServerId()); throw new DirectoryException(ResultCode.OTHER, message); } } @@ -1011,9 +1000,7 @@ session.close(); } /* * Stop the heartbeat thread. */ // Stop the heartbeat thread. if (heartbeatThread != null) { heartbeatThread.shutdown(); @@ -1028,12 +1015,11 @@ */ try { if ((writer != null) && (!(Thread.currentThread().equals(writer)))) if (writer != null && !Thread.currentThread().equals(writer)) { writer.join(SHUTDOWN_JOIN_TIMEOUT); } if ((reader != null) && (!(Thread.currentThread().equals(reader)))) if (reader != null && !Thread.currentThread().equals(reader)) { reader.join(SHUTDOWN_JOIN_TIMEOUT); } @@ -1068,7 +1054,7 @@ { // loop until not interrupted } } while (((interrupted) || (!acquired)) && (!shutdownWriter)); } while ((interrupted || !acquired) && !shutdownWriter); if (msg != null) { incrementOutCount(); @@ -1078,10 +1064,9 @@ if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) { incrementAssuredSrSentUpdates(); } else } else if (!isDataServer()) { if (!isDataServer()) incrementAssuredSdSentUpdates(); incrementAssuredSdSentUpdates(); } } } @@ -1094,22 +1079,10 @@ */ public RSInfo toRSInfo() { return new RSInfo(serverId, serverURL, generationId, groupId, weight); return new RSInfo(serverId, serverURL, generationId, groupId, weight); } /** * Starts the monitoring publisher for the domain if not already started. */ protected void createMonitoringPublisher() { if (!replicationServerDomain.isRunningMonitoringPublisher()) { replicationServerDomain.startMonitoringPublisher(); } } /** * Update the send window size based on the credit specified in the * given window message. * @@ -1132,11 +1105,10 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + this.getClass().getSimpleName() + " " + this + ":" + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+ "\nAND REPLIED:\n" + outStartMsg.toString()); TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + getClass().getSimpleName() + " " + this + ":" + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg + "\nAND REPLIED:\n" + outStartMsg); } } @@ -1151,12 +1123,10 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + this.getClass().getSimpleName() + " " + this + ":" + "\nSH START HANDSHAKE SENT("+ this + "):\n" + outStartMsg.toString()+ "\nAND RECEIVED:\n" + inStartMsg.toString()); TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + getClass().getSimpleName() + " " + this + ":" + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n" + inStartMsg); } } @@ -1171,11 +1141,10 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + this.getClass().getSimpleName() + " " + this + ":" + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() + "\nAND REPLIED:\n" + outTopoMsg.toString()); TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + getClass().getSimpleName() + " " + this + ":" + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n" + outTopoMsg); } } @@ -1190,11 +1159,10 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + this.getClass().getSimpleName() + " " + this + ":" + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() + "\nAND RECEIVED:\n" + inTopoMsg.toString()); TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + getClass().getSimpleName() + " " + this + ":" + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n" + inTopoMsg); } } @@ -1209,11 +1177,10 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + this.getClass().getSimpleName() + " " + this + " :" + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() + "\nAND REPLIED:\n" + outTopoMsg.toString()); TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + getClass().getSimpleName() + " " + this + " :" + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg + "\nAND REPLIED:\n" + outTopoMsg); } } @@ -1224,10 +1191,9 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + this.getClass().getSimpleName() + " " + this + " :" + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE"); TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + getClass().getSimpleName() + " " + this + " :" + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE"); } } @@ -1240,11 +1206,9 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + this.getClass().getSimpleName() + " " + this + " :" + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg.toString()); TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() + ", " + getClass().getSimpleName() + " " + this + " :" + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg); } } @@ -1264,10 +1228,9 @@ */ public long getReferenceGenId() { long refgenid = -1; if (replicationServerDomain!=null) refgenid = replicationServerDomain.getGenerationId(); return refgenid; if (replicationServerDomain != null) return replicationServerDomain.getGenerationId(); return -1; } /** @@ -1285,8 +1248,7 @@ * @param update the update message received. * @throws IOException when it occurs. */ public void put(UpdateMsg update) throws IOException public void put(UpdateMsg update) throws IOException { if (replicationServerDomain!=null) replicationServerDomain.put(update, this); opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -29,11 +29,12 @@ import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.common.StatusMachineEvent; import org.opends.server.types.DebugLogLevel; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.common.ServerStatus.*; import static org.opends.server.replication.common.StatusMachineEvent.*; /** * This thread is in charge of periodically determining if the connected @@ -85,7 +86,6 @@ } /** * Run method for the StatusAnalyzer. * Analyzes if servers are late or not, and change their status accordingly. */ @Override @@ -93,13 +93,11 @@ { if (debugEnabled()) { TRACER.debugInfo("Directory server status analyzer starting for dn " + replicationServerDomain.getBaseDn()); TRACER.debugInfo( getMessage("Directory server status analyzer starting.")); } final int localRsId = replicationServerDomain.getLocalRSServerId(); boolean interrupted = false; while (!shutdown && !interrupted) while (!shutdown) { synchronized (shutdownLock) { @@ -126,22 +124,21 @@ // for it and change status accordingly if threshold value is // crossed/uncrossed for (DataServerHandler serverHandler : replicationServerDomain.getConnectedDSs(). values()) replicationServerDomain.getConnectedDSs().values()) { // Get number of pending changes for this server int nChanges = serverHandler.getRcvMsgQueueSize(); if (debugEnabled()) { TRACER.debugInfo("Status analyzer for dn " + replicationServerDomain.getBaseDn() + " DS " TRACER.debugInfo(getMessage("Status analyzer: DS " + serverHandler.getServerId() + " has " + nChanges + " message(s) in writer queue. This is in RS " + localRsId); + " message(s) in writer queue.")); } // Check status to know if it is relevant to change the status. Do not // take RSD lock to test. If we attempt to change the status whereas // we are in a status that do not allows that, this will be noticed by // the changeStatusFromStatusAnalyzer method. This allows to take the // the current status does allow it, this will be noticed by // the changeStatusFromStatusAnalyzer() method. This allows to take the // lock roughly only when needed versus every sleep time timeout. if (degradedStatusThreshold > 0) // Threshold value = 0 means no status analyzer (no degrading system) @@ -151,39 +148,18 @@ { if (nChanges >= degradedStatusThreshold) { if (serverHandler.getStatus() == ServerStatus.NORMAL_STATUS) if (serverHandler.getStatus() == NORMAL_STATUS && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT)) { interrupted = replicationServerDomain.changeStatusFromStatusAnalyzer( serverHandler, StatusMachineEvent.TO_DEGRADED_STATUS_EVENT); if (interrupted) { // Finish job and let thread die TRACER.debugInfo("Status analyzer for dn " + replicationServerDomain.getBaseDn() + " has been interrupted and will die. This is in RS " + localRsId); break; } break; } } else } else { if (serverHandler.getStatus() == ServerStatus.DEGRADED_STATUS) if (serverHandler.getStatus() == DEGRADED_STATUS && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT)) { interrupted = replicationServerDomain.changeStatusFromStatusAnalyzer( serverHandler, StatusMachineEvent.TO_NORMAL_STATUS_EVENT); if (interrupted) { // Finish job and let thread die TRACER.debugInfo("Status analyzer for dn " + replicationServerDomain.getBaseDn() + " has been interrupted and will die. This is in RS " + localRsId); break; } break; } } } @@ -191,9 +167,28 @@ } done = true; TRACER.debugInfo("Status analyzer for dn " + replicationServerDomain.getBaseDn() + " is terminated." + " This is in RS " + localRsId); TRACER.debugInfo(getMessage("Status analyzer is terminated.")); } private String getMessage(String message) { return "In RS " + replicationServerDomain.getLocalRSServerId() + ", for base dn " + replicationServerDomain.getBaseDn() + ": " + message; } private boolean isInterrupted(DataServerHandler serverHandler, StatusMachineEvent event) { if (replicationServerDomain.changeStatusFromStatusAnalyzer(serverHandler, event)) { // Finish job and let thread die TRACER.debugInfo( getMessage("Status analyzer has been interrupted and will die.")); return true; } return false; } /** @@ -208,9 +203,7 @@ if (debugEnabled()) { TRACER.debugInfo("Shutting down status analyzer for dn " + replicationServerDomain.getBaseDn() + " in RS " + replicationServerDomain.getLocalRSServerId()); TRACER.debugInfo(getMessage("Shutting down status analyzer.")); } } } @@ -231,9 +224,7 @@ n++; if (n >= FACTOR) { TRACER.debugInfo("Interrupting status analyzer for dn " + replicationServerDomain.getBaseDn() + " in RS " + replicationServerDomain.getLocalRSServerId()); TRACER.debugInfo(getMessage("Interrupting status analyzer.")); interrupt(); } } @@ -251,9 +242,9 @@ { if (debugEnabled()) { TRACER.debugInfo("Directory server status analyzer for dn " + replicationServerDomain.getBaseDn() + " changing threshold value to " + degradedStatusThreshold); TRACER.debugInfo(getMessage( "Directory server status analyzer changing threshold value to " + degradedStatusThreshold)); } this.degradedStatusThreshold = degradedStatusThreshold;