opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
@@ -29,17 +29,16 @@ import java.util.zip.DataFormatException; /** * This message is used by LDAP or Replication Server that have been * out of credit for a while and want to check that the remote servers. * * A sending entity that is blocked because its send window is closed * for a while should create such a message to check that the window * closure is valid. * * This message is used by LDAP or Replication Server that have been out of * credit for a while and want to check if the remote servers is able to accept * more messages. * <p> * A sending entity that is blocked because its send window is closed for a * while should create such a message to check that the window closure is valid. * <p> * An entity that received such a message should respond with a * WindowUpdate message indicating the curent credit available. * {@link WindowMsg} indicating the current credit available. */ public class WindowProbeMsg extends ReplicationMsg { @@ -72,10 +71,6 @@ public byte[] getBytes(short protocolVersion) { // WindowProbeMsg Message only contains its type. byte[] resultByteArray = new byte[1]; resultByteArray[0] = MSG_TYPE_WINDOW_PROBE; return resultByteArray; return new byte[] { MSG_TYPE_WINDOW_PROBE }; } } opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -542,11 +542,7 @@ if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4) { // Peer DS uses protocol < V4 : send it a ReplServerStartMsg startMsg = new ReplServerStartMsg(getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, replicationServerDomain.getDbServerState(), localGenerationId, sslEncryption, getLocalGroupId(), replicationServer.getDegradedStatusThreshold()); startMsg = createReplServerStartMsg(); } else { opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -365,11 +365,7 @@ if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4) { // Peer DS uses protocol < V4 : send it a ReplServerStartMsg startMsg = new ReplServerStartMsg(getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, replicationServerDomain.getDbServerState(), localGenerationId, sslEncryption, getLocalGroupId(), replicationServer.getDegradedStatusThreshold()); startMsg = createReplServerStartMsg(); } else { opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -91,8 +91,7 @@ { if (debugEnabled()) { TRACER.debugInfo("Monitoring publisher starting for dn " + replicationServerDomain.getBaseDn()); TRACER.debugInfo(getMessage("Monitoring publisher starting.")); } try @@ -134,16 +133,12 @@ } catch (InterruptedException e) { TRACER.debugInfo("Monitoring publisher for dn " + replicationServerDomain.getBaseDn() + " in RS " + replicationServerDomain.getLocalRSServerId() + " has been interrupted while sleeping."); TRACER.debugInfo(getMessage( "Monitoring publisher has been interrupted while sleeping.")); } done = true; TRACER.debugInfo("Monitoring publisher for dn " + replicationServerDomain.getBaseDn() + " is terminated." + " This is in RS " + replicationServerDomain.getLocalRSServerId()); TRACER.debugInfo(getMessage("Monitoring publisher is terminated.")); } @@ -160,9 +155,7 @@ if (debugEnabled()) { TRACER.debugInfo("Shutting down monitoring publisher for dn " + replicationServerDomain.getBaseDn() + " in RS " + replicationServerDomain.getLocalRSServerId()); TRACER.debugInfo(getMessage("Shutting down monitoring publisher.")); } } } @@ -183,9 +176,7 @@ n++; if (n >= FACTOR) { TRACER.debugInfo("Interrupting monitoring publisher for dn " + replicationServerDomain.getBaseDn() + " in RS " + replicationServerDomain.getLocalRSServerId()); TRACER.debugInfo(getMessage("Interrupting monitoring publisher.")); interrupt(); } } @@ -203,11 +194,17 @@ { if (debugEnabled()) { TRACER.debugInfo("Monitoring publisher for dn " + replicationServerDomain.getBaseDn() + " changing period value to " + period); TRACER.debugInfo(getMessage( "Monitoring publisher changing period value to " + period)); } this.period = period; } private String getMessage(String message) { return "In RS " + replicationServerDomain.getLocalRSServerId() + ", for base dn " + replicationServerDomain.getBaseDn() + ": " + message; } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import org.opends.messages.Category; @@ -75,17 +76,22 @@ public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg> { private final String baseDn; /** * The Status analyzer that periodically verifies whether the connected DSs * are late. * are late. Using an AtomicReference to avoid leaking references to costly * threads. */ private StatusAnalyzer statusAnalyzer = null; private AtomicReference<StatusAnalyzer> statusAnalyzer = new AtomicReference<StatusAnalyzer>(); /** * The monitoring publisher that periodically sends monitoring messages to the * topology. * topology. Using an AtomicReference to avoid leaking references to costly * threads. */ private MonitoringPublisher monitoringPublisher = null; private AtomicReference<MonitoringPublisher> monitoringPublisher = new AtomicReference<MonitoringPublisher>(); /** * The following map contains one balanced tree for each replica ID to which @@ -2853,14 +2859,15 @@ */ public void startStatusAnalyzer() { if (!isRunningStatusAnalyzer()) { int degradedStatusThreshold = int degradedStatusThreshold = localReplicationServer.getDegradedStatusThreshold(); if (degradedStatusThreshold > 0) // 0 means no status analyzer if (degradedStatusThreshold > 0) // 0 means no status analyzer { final StatusAnalyzer thread = new StatusAnalyzer(this, degradedStatusThreshold); if (statusAnalyzer.compareAndSet(null, thread)) { statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold); statusAnalyzer.start(); thread.start(); } } } @@ -2870,35 +2877,26 @@ */ private void stopStatusAnalyzer() { if (isRunningStatusAnalyzer()) final StatusAnalyzer thread = statusAnalyzer.get(); if (statusAnalyzer.compareAndSet(thread, null)) { statusAnalyzer.shutdown(); statusAnalyzer.waitForShutdown(); statusAnalyzer = null; thread.shutdown(); thread.waitForShutdown(); } } /** * Tests if the status analyzer for this domain is running. * @return True if the status analyzer is running, false otherwise. */ private boolean isRunningStatusAnalyzer() { return statusAnalyzer != null; } /** * Starts the monitoring publisher for the domain if not already started. */ public void startMonitoringPublisher() { if (!isRunningMonitoringPublisher()) long period = localReplicationServer.getMonitoringPublisherPeriod(); if (period > 0) // 0 means no monitoring publisher { long period = localReplicationServer.getMonitoringPublisherPeriod(); if (period > 0) // 0 means no monitoring publisher final MonitoringPublisher thread = new MonitoringPublisher(this, period); if (monitoringPublisher.compareAndSet(null, thread)) { monitoringPublisher = new MonitoringPublisher(this, period); monitoringPublisher.start(); thread.start(); } } } @@ -2908,24 +2906,15 @@ */ private void stopMonitoringPublisher() { if (isRunningMonitoringPublisher()) final MonitoringPublisher thread = monitoringPublisher.get(); if (monitoringPublisher.compareAndSet(thread, null)) { monitoringPublisher.shutdown(); monitoringPublisher.waitForShutdown(); monitoringPublisher = null; thread.shutdown(); thread.waitForShutdown(); } } /** * Tests if the monitoring publisher for this domain is running. * @return True if the monitoring publisher is running, false otherwise. */ private boolean isRunningMonitoringPublisher() { return monitoringPublisher != null; } /** * {@inheritDoc} */ @Override @@ -3360,10 +3349,13 @@ { // Requested to stop analyzers stopStatusAnalyzer(); return; } else if (isRunningStatusAnalyzer()) final StatusAnalyzer saThread = statusAnalyzer.get(); if (saThread != null) // it is running { statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold); saThread.setDegradedStatusThreshold(degradedStatusThreshold); } else if (getConnectedDSs().size() > 0) { @@ -3384,10 +3376,13 @@ { // Requested to stop monitoring publishers stopMonitoringPublisher(); return; } else if (isRunningMonitoringPublisher()) final MonitoringPublisher mpThread = monitoringPublisher.get(); if (mpThread != null) // it is running { monitoringPublisher.setPeriod(period); mpThread.setPeriod(period); } else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0) { opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -113,11 +113,7 @@ */ private ReplServerStartMsg sendStartToRemote() throws IOException { ReplServerStartMsg outReplServerStartMsg = new ReplServerStartMsg( getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, replicationServerDomain.getDbServerState(), localGenerationId, sslEncryption, getLocalGroupId(), replicationServer.getDegradedStatusThreshold()); ReplServerStartMsg outReplServerStartMsg = createReplServerStartMsg(); send(outReplServerStartMsg); return outReplServerStartMsg; } opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -155,12 +155,12 @@ // window private int rcvWindow; private int rcvWindowSizeHalf; private final int rcvWindowSizeHalf; /** * The size of the receiving window. */ protected int maxRcvWindow; protected final int maxRcvWindow; /** * Semaphore that the writer uses to control the flow to the remote server. */ @@ -288,7 +288,7 @@ * * @throws IOException when the session becomes unavailable. */ public synchronized void decAndCheckWindow() throws IOException private synchronized void decAndCheckWindow() throws IOException { rcvWindow--; checkWindow(); @@ -318,8 +318,7 @@ * and monitoring system. * @throws DirectoryException When an exception is raised. */ protected void finalizeStart() throws DirectoryException protected void finalizeStart() throws DirectoryException { // FIXME:ECL We should refactor so that a SH always have a session if (session != null) @@ -906,11 +905,10 @@ /** * Process the reception of a WindowProbeMsg message. * * @param windowProbeMsg The message to process. * * @throws IOException When the session becomes unavailable. * @throws IOException * When the session becomes unavailable. */ public void process(WindowProbeMsg windowProbeMsg) throws IOException public void replyToWindowProbe() throws IOException { if (rcvWindow > 0) { @@ -1250,6 +1248,7 @@ */ public void put(UpdateMsg update) throws IOException { decAndCheckWindow(); if (replicationServerDomain!=null) replicationServerDomain.put(update, this); } @@ -1262,4 +1261,18 @@ if (replicationServerDomain!=null) replicationServerDomain.stopServer(this, false); } /** * Creates a ReplServerStartMsg for the current ServerHandler. * * @return a new ReplServerStartMsg for the current ServerHandler. */ protected ReplServerStartMsg createReplServerStartMsg() { return new ReplServerStartMsg(getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, replicationServerDomain.getDbServerState(), localGenerationId, sslEncryption, getLocalGroupId(), replicationServer.getDegradedStatusThreshold()); } } opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -27,11 +27,6 @@ */ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; import java.io.IOException; import org.opends.messages.Message; @@ -40,6 +35,12 @@ import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.common.ServerStatus.*; import static org.opends.server.util.StaticUtils.*; /** * This class implement the part of the replicationServer that is reading * the connection from the LDAP servers to get all the updates that @@ -74,7 +75,7 @@ public ServerReader(Session session, ServerHandler handler) { super("Replication server RS(" + handler.getReplicationServerId() + ") reading from " + handler.toString() + " at " + ") reading from " + handler + " at " + session.getReadableRemoteAddress()); this.session = session; this.handler = handler; @@ -90,7 +91,7 @@ Message errMessage = null; if (debugEnabled()) { TRACER.debugInfo(this.getName() + " starting"); TRACER.debugInfo(getName() + " starting"); } /* * wait on input stream @@ -110,13 +111,14 @@ if (msg instanceof AckMsg) { AckMsg ack = (AckMsg) msg; handler.checkWindow(); handler.processAck(ack); handler.processAck((AckMsg) msg); } else if (msg instanceof UpdateMsg) { UpdateMsg updateMsg = (UpdateMsg) msg; boolean filtered = false; /* Ignore updates in some cases */ // Ignore updates in some cases if (handler.isDataServer()) { /** @@ -133,22 +135,22 @@ * better performances in normal mode (most of the time). */ ServerStatus dsStatus = handler.getStatus(); if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) || (dsStatus == ServerStatus.FULL_UPDATE_STATUS)) if (dsStatus == BAD_GEN_ID_STATUS || dsStatus == FULL_UPDATE_STATUS) { long referenceGenerationId = handler.getReferenceGenId(); if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) if (dsStatus == BAD_GEN_ID_STATUS) logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get( handler.getReplicationServerId(), ((UpdateMsg) msg).getChangeNumber().toString(), updateMsg.getChangeNumber().toString(), handler.getBaseDN(), handler.getServerId(), session.getReadableRemoteAddress(), handler.getGenerationId(), referenceGenerationId)); if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) if (dsStatus == FULL_UPDATE_STATUS) logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get( handler.getReplicationServerId(), ((UpdateMsg) msg).getChangeNumber().toString(), updateMsg.getChangeNumber().toString(), handler.getBaseDN(), handler.getServerId(), session.getReadableRemoteAddress())); filtered = true; @@ -159,14 +161,14 @@ * Ignore updates from RS with bad gen id * (no system managed status for a RS) */ long referenceGenerationId =handler.getReferenceGenId(); if ((referenceGenerationId > 0) && (referenceGenerationId != handler.getGenerationId())) long referenceGenerationId = handler.getReferenceGenId(); if (referenceGenerationId > 0 && referenceGenerationId != handler.getGenerationId()) { logError( WARN_IGNORING_UPDATE_FROM_RS.get( handler.getReplicationServerId(), ((UpdateMsg) msg).getChangeNumber().toString(), updateMsg.getChangeNumber().toString(), handler.getBaseDN(), handler.getServerId(), session.getReadableRemoteAddress(), @@ -178,53 +180,24 @@ if (!filtered) { UpdateMsg update = (UpdateMsg) msg; handler.decAndCheckWindow(); handler.put(update); handler.put(updateMsg); } } else if (msg instanceof WindowMsg) { WindowMsg windowMsg = (WindowMsg) msg; handler.updateWindow(windowMsg); } else if (msg instanceof InitializeRequestMsg) handler.updateWindow((WindowMsg) msg); } else if (msg instanceof RoutableMsg) { InitializeRequestMsg initializeMsg = (InitializeRequestMsg) msg; handler.process(initializeMsg); } else if (msg instanceof InitializeRcvAckMsg) { InitializeRcvAckMsg initializeRcvAckMsg = (InitializeRcvAckMsg) msg; handler.process(initializeRcvAckMsg); } else if (msg instanceof InitializeTargetMsg) { InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg; handler.process(initializeMsg); } else if (msg instanceof EntryMsg) { EntryMsg entryMsg = (EntryMsg) msg; handler.process(entryMsg); } else if (msg instanceof DoneMsg) { DoneMsg doneMsg = (DoneMsg) msg; handler.process(doneMsg); } else if (msg instanceof ErrorMsg) { ErrorMsg errorMsg = (ErrorMsg) msg; handler.process(errorMsg); handler.process((RoutableMsg) msg); } else if (msg instanceof ResetGenerationIdMsg) { ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg; handler.processResetGenId(genIdMsg); handler.processResetGenId((ResetGenerationIdMsg) msg); } else if (msg instanceof WindowProbeMsg) { WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg; handler.process(windowProbeMsg); handler.replyToWindowProbe(); } else if (msg instanceof TopologyMsg) { TopologyMsg topoMsg = (TopologyMsg) msg; ReplicationServerHandler rsh = (ReplicationServerHandler)handler; rsh.receiveTopoInfoFromRS(topoMsg); rsh.receiveTopoInfoFromRS((TopologyMsg) msg); } else if (msg instanceof ChangeStatusMsg) { ChangeStatusMsg csMsg = (ChangeStatusMsg) msg; @@ -242,28 +215,18 @@ csMsg.toString()); logError(errMessage); } } else if (msg instanceof MonitorRequestMsg) { MonitorRequestMsg replServerMonitorRequestMsg = (MonitorRequestMsg) msg; handler.process(replServerMonitorRequestMsg); } else if (msg instanceof MonitorMsg) { MonitorMsg replServerMonitorMsg = (MonitorMsg) msg; handler.process(replServerMonitorMsg); } else if (msg instanceof ChangeTimeHeartbeatMsg) { ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg; handler.process(cthbMsg); handler.process((ChangeTimeHeartbeatMsg) msg); } else if (msg instanceof StopMsg) { // Peer server is properly disconnecting: go out of here to // properly close the server handler going to finally block. if (debugEnabled()) { TRACER.debugInfo(handler.toString() + " has properly " + "disconnected from this replication server " + Integer.toString(handler.getReplicationServerId())); TRACER.debugInfo(handler + " has properly disconnected from this replication server " + handler.getReplicationServerId()); } return; } else if (msg == null) @@ -281,9 +244,7 @@ // Received a V1 PDU we do not need to support: // we just trash the message and log the event for debug purpose, // then continue receiving messages. if (debugEnabled()) TRACER.debugInfo( "In " + this.getName() + " " + stackTraceToSingleLineString(e)); logException(e); } } } @@ -294,9 +255,7 @@ * Log a message and exit from this loop * So that this handler is stopped. */ if (debugEnabled()) TRACER.debugInfo( "In " + this.getName() + " " + stackTraceToSingleLineString(e)); logException(e); if (!handler.shuttingDown()) { if (handler.isDataServer()) @@ -316,9 +275,7 @@ } catch (Exception e) { if (debugEnabled()) TRACER.debugInfo( "In " + this.getName() + " " + stackTraceToSingleLineString(e)); logException(e); /* * The remote server has sent an unknown message, * close the connection. @@ -334,14 +291,21 @@ */ if (debugEnabled()) { TRACER.debugInfo("In " + this.getName() + " closing the session"); TRACER.debugInfo("In " + getName() + " closing the session"); } session.close(); handler.doStop(); if (debugEnabled()) { TRACER.debugInfo(this.getName() + " stopped " + errMessage); TRACER.debugInfo(getName() + " stopped: " + errMessage); } } } private void logException(Exception e) { if (debugEnabled()) TRACER.debugInfo( "In " + getName() + " " + stackTraceToSingleLineString(e)); } }