opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.common; @@ -48,8 +49,8 @@ */ public class ServerState implements Iterable<Integer> { private HashMap<Integer, ChangeNumber> list; private boolean saved = true; private final HashMap<Integer, ChangeNumber> list; private volatile boolean saved = true; /** * Creates a new empty ServerState. opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -191,8 +191,8 @@ */ private class ScanSearchListener implements InternalSearchListener { private ChangeNumber startingChangeNumber = null; private ChangeNumber endChangeNumber = null; private final ChangeNumber startingChangeNumber; private final ChangeNumber endChangeNumber; public ScanSearchListener( ChangeNumber startingChangeNumber, @@ -262,8 +262,8 @@ private final PersistentServerState state; private int numReplayedPostOpCalled = 0; private long generationId = -1; private boolean generationIdSavedStatus = false; private volatile long generationId = -1; private volatile boolean generationIdSavedStatus = false; private final ChangeNumberGenerator generator; @@ -289,15 +289,15 @@ private final DN baseDn; private boolean shutdown = false; private volatile boolean shutdown = false; private final InternalClientConnection conn = InternalClientConnection.getRootConnection(); private boolean solveConflictFlag = true; private boolean disabled = false; private boolean stateSavingDisabled = false; private volatile boolean disabled = false; private volatile boolean stateSavingDisabled = false; // This list is used to temporary store operations that needs // to be replayed at session establishment time. @@ -311,7 +311,7 @@ * Possible values are accept-updates or deny-updates, but other values * may be added in the future. */ private IsolationPolicy isolationpolicy; private IsolationPolicy isolationPolicy; /** * The DN of the configuration entry of this domain. @@ -323,7 +323,7 @@ * A boolean indicating if the thread used to save the persistentServerState * is terminated. */ private boolean done = true; private volatile boolean done = true; private ServerStateFlush flushThread; @@ -374,7 +374,7 @@ * fractional configuration (i.e with compliant fractional configuration in * domain root entry). */ private boolean force_bad_data_set = false; private boolean forceBadDataSet = false; /** * This flag is used by the fractional replication ldif import plugin to @@ -447,21 +447,24 @@ { done = false; while (shutdown == false) while (shutdown == false) { try { synchronized (this) { this.wait(1000); if (!disabled && !stateSavingDisabled ) if (!disabled && !stateSavingDisabled) { // save the ServerState state.save(); } } } catch (InterruptedException e) { } } catch (InterruptedException e) { // Thread interrupted: check for shutdown. } } state.save(); @@ -475,7 +478,7 @@ */ private class RSUpdater extends DirectoryThread { private ChangeNumber startChangeNumber; private final ChangeNumber startChangeNumber; protected RSUpdater(ChangeNumber replServerMaxChangeNumber) { super("Replication Server Updater for server id " + @@ -571,7 +574,7 @@ this.baseDn = configuration.getBaseDN(); int window = configuration.getWindowSize(); heartbeatInterval = configuration.getHeartbeatInterval(); this.isolationpolicy = configuration.getIsolationPolicy(); this.isolationPolicy = configuration.getIsolationPolicy(); this.configDn = configuration.dn(); this.logChangeNumber = configuration.isLogChangenumber(); this.updateToReplayQueue = updateToReplayQueue; @@ -2030,12 +2033,12 @@ */ private boolean brokerIsConnected(PreOperationOperation op) { if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) { // this policy imply that we always accept updates. return true; } if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)) if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)) { // this isolation policy specifies that the updates are denied // when the broker had problems during the connection phase @@ -4429,7 +4432,7 @@ public ConfigChangeResult applyConfigurationChange( ReplicationDomainCfg configuration) { isolationpolicy = configuration.getIsolationPolicy(); isolationPolicy = configuration.getIsolationPolicy(); logChangeNumber = configuration.isLogChangenumber(); histPurgeDelayInMilliSec = configuration.getConflictsHistoricalPurgeDelay()*60*1000; @@ -4657,7 +4660,7 @@ { // Check domain fractional configuration consistency with local // configuration variables force_bad_data_set = !isBackendFractionalConfigConsistent(); forceBadDataSet = !isBackendFractionalConfigConsistent(); super.sessionInitiated( initStatus, replicationServerState,generationID, session); @@ -4687,7 +4690,7 @@ } // Now for bad data set status if needed if (force_bad_data_set) if (forceBadDataSet) { // Go into bad data set status setNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT); @@ -4950,7 +4953,7 @@ { // Ignore message if fractional configuration is inconcsistent and // we have been passed into bad data set status if (force_bad_data_set) if (forceBadDataSet) { return false; } opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2008 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.plugin; import org.opends.server.replication.protocol.LDAPUpdateMsg; @@ -54,9 +55,9 @@ */ private static final DebugTracer TRACER = getTracer(); private BlockingQueue<UpdateToReplay> updateToReplayQueue = null; private boolean shutdown = false; private boolean done = false; private final BlockingQueue<UpdateToReplay> updateToReplayQueue; private volatile boolean shutdown = false; private volatile boolean done = false; private static int count = 0; /** opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -23,6 +23,7 @@ * * * Copyright 2007-2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -54,25 +55,25 @@ /** * The session on which heartbeats are to be monitored. */ private ProtocolSession session; private final ProtocolSession session; /** * The time in milliseconds between heartbeats from the replication * server. Zero means heartbeats are off. */ private long heartbeatInterval; private final long heartbeatInterval; /** * Set this to stop the thread. */ private boolean shutdown = false; private volatile boolean shutdown = false; /** * Send StopMsg before session closure or not. */ private boolean sendStopBeforeClose = false; private final boolean sendStopBeforeClose; /** @@ -131,7 +132,8 @@ try { session.publish(new StopMsg()); } catch(IOException ioe) } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,6 +23,7 @@ * * * Copyright 2008 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -50,25 +51,25 @@ /** * For test purposes only to simulate loss of heartbeats. */ static private boolean heartbeatsDisabled = false; private static volatile boolean heartbeatsDisabled = false; /** * The session on which heartbeats are to be sent. */ private ProtocolSession session; private final ProtocolSession session; /** * The time in milliseconds between heartbeats. */ private long heartbeatInterval; private final long heartbeatInterval; /** * Set this to stop the thread. */ private Boolean shutdown = false; private final Object shutdown_lock = new Object(); private volatile boolean shutdown = false; private final Object shutdownLock = new Object(); /** @@ -136,11 +137,11 @@ TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime); } synchronized (shutdown_lock) synchronized (shutdownLock) { if (!shutdown) { shutdown_lock.wait(sleepTime); shutdownLock.wait(sleepTime); } } } @@ -174,10 +175,10 @@ */ public void shutdown() { synchronized (shutdown_lock) synchronized (shutdownLock) { shutdown = true; shutdown_lock.notifyAll(); shutdownLock.notifyAll(); if (debugEnabled()) { TRACER.debugInfo("Going to notify Heartbeat thread."); opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -59,7 +60,7 @@ /** * The time the last message published to this session. */ private long lastPublishTime = 0; private volatile long lastPublishTime = 0; /** opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -62,7 +63,7 @@ /** * The time the last message published to this session. */ private long lastPublishTime = 0; private volatile long lastPublishTime = 0; /** opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -23,16 +23,14 @@ * * * Copyright 2007-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.protocol; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.*; import java.util.zip.DataFormatException; import org.opends.server.replication.common.AssuredMode; @@ -60,9 +58,9 @@ public class TopologyMsg extends ReplicationMsg { // Information for the DS known in the topology private List<DSInfo> dsList = new ArrayList<DSInfo>(); private final List<DSInfo> dsList; // Information for the RS known in the topology private List<RSInfo> rsList = new ArrayList<RSInfo>(); private final List<RSInfo> rsList; /** * Creates a new changelogInfo message from its encoded form. @@ -74,7 +72,168 @@ */ public TopologyMsg(byte[] in, short version) throws DataFormatException { decode(in, version); try { /* First byte is the type */ if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY) { throw new DataFormatException( "Input is not a valid " + this.getClass().getCanonicalName()); } int pos = 1; /* Read number of following DS info entries */ byte nDsInfo = in[pos++]; /* Read the DS info entries */ List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo)); while ( (nDsInfo > 0) && (pos < in.length) ) { /* Read DS id */ int length = getNextLength(in, pos); String serverIdString = new String(in, pos, length, "UTF-8"); int dsId = Integer.valueOf(serverIdString); pos += length + 1; /* Read RS id */ length = getNextLength(in, pos); serverIdString = new String(in, pos, length, "UTF-8"); int rsId = Integer.valueOf(serverIdString); pos += length + 1; /* Read the generation id */ length = getNextLength(in, pos); long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); pos += length + 1; /* Read DS status */ ServerStatus status = ServerStatus.valueOf(in[pos++]); /* Read DS assured flag */ boolean assuredFlag; if (in[pos++] == 1) { assuredFlag = true; } else { assuredFlag = false; } /* Read DS assured mode */ AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]); /* Read DS safe data level */ byte safeDataLevel = in[pos++]; /* Read DS group id */ byte groupId = in[pos++]; /* Read number of referrals URLs */ List<String> refUrls = new ArrayList<String>(); byte nUrls = in[pos++]; byte nRead = 0; /* Read urls until expected number read */ while ((nRead != nUrls) && (pos < in.length) //security ) { length = getNextLength(in, pos); String url = new String(in, pos, length, "UTF-8"); refUrls.add(url); pos += length + 1; nRead++; } Set<String> attrs = new HashSet<String>(); short protocolVersion = -1; if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4) { byte nAttrs = in[pos++]; nRead = 0; /* Read attrs until expected number read */ while ((nRead != nAttrs) && (pos < in.length) //security ) { length = getNextLength(in, pos); String attr = new String(in, pos, length, "UTF-8"); attrs.add(attr); pos += length + 1; nRead++; } /* Read Protocol version */ protocolVersion = Short.valueOf(in[pos++]); } /* Now create DSInfo and store it in list */ DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs, protocolVersion); dsList.add(dsInfo); nDsInfo--; } /* Read number of following RS info entries */ byte nRsInfo = in[pos++]; /* Read the RS info entries */ List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo)); while ( (nRsInfo > 0) && (pos < in.length) ) { /* Read RS id */ int length = getNextLength(in, pos); String serverIdString = new String(in, pos, length, "UTF-8"); int id = Integer.valueOf(serverIdString); pos += length + 1; /* Read the generation id */ length = getNextLength(in, pos); long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); pos += length + 1; /* Read RS group id */ byte groupId = in[pos++]; int weight = 1; String serverUrl = null; if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { length = getNextLength(in, pos); serverUrl = new String(in, pos, length, "UTF-8"); pos += length + 1; /* Read RS weight */ length = getNextLength(in, pos); weight = Integer.valueOf(new String(in, pos, length, "UTF-8")); pos += length + 1; } /* Now create RSInfo and store it in list */ RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId, weight); rsList.add(rsInfo); nRsInfo--; } this.dsList = Collections.unmodifiableList(dsList); this.rsList = Collections.unmodifiableList(rsList); } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** @@ -85,10 +244,23 @@ */ public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList) { if (dsList != null) // null means no info, let empty list from init time this.dsList = dsList; if (rsList != null) // null means no info, let empty list from init time this.rsList = rsList; if (dsList == null || dsList.isEmpty()) { this.dsList = Collections.emptyList(); } else { this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList)); } if (rsList == null || rsList.isEmpty()) { this.rsList = Collections.emptyList(); } else { this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList)); } } // ============ @@ -219,172 +391,7 @@ } // ============ // Msg decoding // ============ private void decode(byte[] in, short version) throws DataFormatException { try { /* First byte is the type */ if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY) { throw new DataFormatException( "Input is not a valid " + this.getClass().getCanonicalName()); } int pos = 1; /* Read number of following DS info entries */ byte nDsInfo = in[pos++]; /* Read the DS info entries */ while ( (nDsInfo > 0) && (pos < in.length) ) { /* Read DS id */ int length = getNextLength(in, pos); String serverIdString = new String(in, pos, length, "UTF-8"); int dsId = Integer.valueOf(serverIdString); pos += length + 1; /* Read RS id */ length = getNextLength(in, pos); serverIdString = new String(in, pos, length, "UTF-8"); int rsId = Integer.valueOf(serverIdString); pos += length + 1; /* Read the generation id */ length = getNextLength(in, pos); long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); pos += length + 1; /* Read DS status */ ServerStatus status = ServerStatus.valueOf(in[pos++]); /* Read DS assured flag */ boolean assuredFlag; if (in[pos++] == 1) { assuredFlag = true; } else { assuredFlag = false; } /* Read DS assured mode */ AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]); /* Read DS safe data level */ byte safeDataLevel = in[pos++]; /* Read DS group id */ byte groupId = in[pos++]; /* Read number of referrals URLs */ List<String> refUrls = new ArrayList<String>(); byte nUrls = in[pos++]; byte nRead = 0; /* Read urls until expected number read */ while ((nRead != nUrls) && (pos < in.length) //security ) { length = getNextLength(in, pos); String url = new String(in, pos, length, "UTF-8"); refUrls.add(url); pos += length + 1; nRead++; } Set<String> attrs = new HashSet<String>(); short protocolVersion = -1; if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4) { byte nAttrs = in[pos++]; nRead = 0; /* Read attrs until expected number read */ while ((nRead != nAttrs) && (pos < in.length) //security ) { length = getNextLength(in, pos); String attr = new String(in, pos, length, "UTF-8"); attrs.add(attr); pos += length + 1; nRead++; } /* Read Protocol version */ protocolVersion = Short.valueOf(in[pos++]); } /* Now create DSInfo and store it in list */ DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs, protocolVersion); dsList.add(dsInfo); nDsInfo--; } /* Read number of following RS info entries */ byte nRsInfo = in[pos++]; /* Read the RS info entries */ while ( (nRsInfo > 0) && (pos < in.length) ) { /* Read RS id */ int length = getNextLength(in, pos); String serverIdString = new String(in, pos, length, "UTF-8"); int id = Integer.valueOf(serverIdString); pos += length + 1; /* Read the generation id */ length = getNextLength(in, pos); long generationId = Long.valueOf(new String(in, pos, length, "UTF-8")); pos += length + 1; /* Read RS group id */ byte groupId = in[pos++]; int weight = 1; String serverUrl = null; if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { length = getNextLength(in, pos); serverUrl = new String(in, pos, length, "UTF-8"); pos += length + 1; /* Read RS weight */ length = getNextLength(in, pos); weight = Integer.valueOf(new String(in, pos, length, "UTF-8")); pos += length + 1; } /* Now create RSInfo and store it in list */ RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId, weight); rsList.add(rsInfo); nRsInfo--; } } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * {@inheritDoc} opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -23,6 +23,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -52,22 +53,23 @@ public class MonitoringPublisher extends DirectoryThread { private boolean shutdown = false; private volatile boolean shutdown = false; /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); // The domain we send monitoring for private ReplicationServerDomain replicationServerDomain; private final ReplicationServerDomain replicationServerDomain; // Sleep time (in ms) before sending new monitoring messages. private long period = 3000; private volatile long period; // Is the thread terminated ? private boolean done = false; private volatile boolean done = false; private final Object sleeper = new Object(); private final Object shutdownLock = new Object(); /** * Create a monitoring publisher. @@ -104,9 +106,12 @@ { try { synchronized (sleeper) synchronized (shutdownLock) { sleeper.wait(period); if (!shutdown) { shutdownLock.wait(period); } } } catch (InterruptedException ex) { @@ -157,16 +162,17 @@ */ public void shutdown() { if (debugEnabled()) synchronized (shutdownLock) { TRACER.debugInfo("Shutting down monitoring publisher for dn " + replicationServerDomain.getBaseDn().toString() + " in RS " + replicationServerDomain.getReplicationServer().getServerId()); } shutdown = true; synchronized (sleeper) { sleeper.notify(); shutdown = true; shutdownLock.notifyAll(); if (debugEnabled()) { TRACER.debugInfo("Shutting down monitoring publisher for dn " + replicationServerDomain.getBaseDn().toString() + " in RS " + replicationServerDomain.getReplicationServer().getServerId()); } } } opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; import org.opends.messages.MessageBuilder; opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; import org.opends.messages.*; @@ -255,13 +256,10 @@ status = cursor.getNext(key, data, LockMode.DEFAULT); } cursor.close(); } catch (DatabaseException dbe) finally { cursor.close(); throw dbe; } } opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java
@@ -23,6 +23,7 @@ * * * Copyright 2008 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -38,7 +39,7 @@ /** * The Replication Server that created this thread. */ private ReplicationServer server; private final ReplicationServer server; /** * Creates a new instance of this directory thread with the opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java
@@ -23,6 +23,7 @@ * * * Copyright 2008 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -38,7 +39,7 @@ /** * The Replication Server that created this thread. */ private ReplicationServer server; private final ReplicationServer server; /** * Creates a new instance of this directory thread with the opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -58,8 +59,8 @@ * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private ProtocolSession session; private ServerHandler handler; private final ProtocolSession session; private final ServerHandler handler; /** * Constructor for the LDAP server reader part of the replicationServer. opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; import org.opends.messages.Message; @@ -57,10 +58,10 @@ */ private static final DebugTracer TRACER = getTracer(); private ProtocolSession session; private ServerHandler handler; private ReplicationServerDomain replicationServerDomain; private short protocolVersion = -1; private final ProtocolSession session; private final ServerHandler handler; private final ReplicationServerDomain replicationServerDomain; private final short protocolVersion; /** * Create a ServerWriter. opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -23,6 +23,7 @@ * * * Copyright 2008-2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -47,21 +48,22 @@ public class StatusAnalyzer extends DirectoryThread { private boolean finished = false; private volatile boolean shutdown = false; /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private ReplicationServerDomain replicationServerDomain; private int degradedStatusThreshold = -1; private final ReplicationServerDomain replicationServerDomain; private volatile int degradedStatusThreshold = -1; // Sleep time for the thread, in ms. private int STATUS_ANALYZER_SLEEP_TIME = 5000; private static final int STATUS_ANALYZER_SLEEP_TIME = 5000; private boolean done = false; private volatile boolean done = false; private Object sleeper = new Object(); private final Object shutdownLock = new Object(); /** * Create a StatusAnalyzer. @@ -95,13 +97,16 @@ } boolean interrupted = false; while (!finished && !interrupted) while (!shutdown && !interrupted) { try { synchronized (sleeper) synchronized (shutdownLock) { sleeper.wait(STATUS_ANALYZER_SLEEP_TIME); if (!shutdown) { shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME); } } } catch (InterruptedException ex) { @@ -192,16 +197,17 @@ */ public void shutdown() { if (debugEnabled()) synchronized (shutdownLock) { TRACER.debugInfo("Shutting down status analyzer for dn " + replicationServerDomain.getBaseDn().toString() + " in RS " + replicationServerDomain.getReplicationServer().getServerId()); } finished = true; synchronized (sleeper) { sleeper.notify(); shutdown = true; shutdownLock.notifyAll(); if (debugEnabled()) { TRACER.debugInfo("Shutting down status analyzer for dn " + replicationServerDomain.getBaseDn().toString() + " in RS " + replicationServerDomain.getReplicationServer().getServerId()); } } } opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,6 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.service; @@ -51,26 +52,21 @@ private static final DebugTracer TRACER = getTracer(); /** * For test purposes only to simulate loss of heartbeats. */ static private boolean heartbeatsDisabled = false; /** * The session on which heartbeats are to be sent. */ private ProtocolSession session; private final ProtocolSession session; /** * The time in milliseconds between heartbeats. */ private long heartbeatInterval; private int serverId; private final long heartbeatInterval; private final int serverId; /** * Set this to stop the thread. */ private Boolean shutdown = false; private final Object shutdown_lock = new Object(); private volatile boolean shutdown = false; private final Object shutdownLock = new Object(); /** * Create a heartbeat thread. @@ -112,10 +108,7 @@ if (now > session.getLastPublishTime() + heartbeatInterval) { if (!heartbeatsDisabled) { session.publish(ctHeartbeatMsg); } session.publish(ctHeartbeatMsg); } try @@ -127,11 +120,11 @@ sleepTime = heartbeatInterval; } synchronized (shutdown_lock) synchronized (shutdownLock) { if (!shutdown) { shutdown_lock.wait(sleepTime); shutdownLock.wait(sleepTime); } } } @@ -166,20 +159,10 @@ */ public void shutdown() { synchronized (shutdown_lock) synchronized (shutdownLock) { shutdown = true; shutdown_lock.notifyAll(); shutdownLock.notifyAll(); } } /** * For testing purposes only to simulate loss of heartbeats. * @param heartbeatsDisabled Set true to prevent heartbeats from being sent. */ public static void setHeartbeatsDisabled(boolean heartbeatsDisabled) { CTHeartbeatPublisherThread.heartbeatsDisabled = heartbeatsDisabled; } } opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2008 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.service; import org.opends.messages.Message; @@ -48,9 +49,9 @@ */ private static final DebugTracer TRACER = getTracer(); private ReplicationDomain repDomain; private boolean shutdown = false; private boolean done = false; private final ReplicationDomain repDomain; private volatile boolean shutdown = false; private volatile boolean done = false; /** @@ -95,11 +96,17 @@ while ((!shutdown) && ((updateMsg = repDomain.receive()) != null)) { if (repDomain.processUpdate(updateMsg) == true) { repDomain.processUpdateDoneSynchronous(updateMsg); } } if (updateMsg == null) { shutdown = true; } catch (Exception e) } } catch (Exception e) { /* * catch all exceptions happening in repDomain.receive so that the @@ -119,6 +126,8 @@ } } /** * Wait for the completion of this thread. */ @@ -134,12 +143,13 @@ n++; if (n >= FACTOR) { TRACER.debugInfo("Interrupting listener thread for dn " + repDomain.getServiceID() + " in DS " + repDomain.getServerId()); TRACER.debugInfo("Interrupting listener thread for dn " + repDomain.getServiceID() + " in DS " + repDomain.getServerId()); this.interrupt(); } } } catch (InterruptedException e) } catch (InterruptedException e) { // exit the loop if this thread is interrupted. } opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -97,11 +98,11 @@ * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private boolean shutdown = false; private Collection<String> servers; private boolean connected = false; private String replicationServer = "Not connected"; private ProtocolSession session = null; private volatile boolean shutdown = false; private volatile Collection<String> servers; private volatile boolean connected = false; private volatile String replicationServer = "Not connected"; private volatile ProtocolSession session = null; private final ServerState state; private final String baseDn; private final int serverId; @@ -156,7 +157,7 @@ * and to know that it is necessary to print a new message when the broker * finally succeed to connect. */ private boolean connectionError = false; private volatile boolean connectionError = false; private final Object connectPhaseLock = new Object(); /** * The thread that publishes messages to the RS containing the current @@ -173,18 +174,20 @@ */ // Info for other DSs. // Warning: does not contain info for us (for our server id) private List<DSInfo> dsList = new ArrayList<DSInfo>(); private long generationID; private int updateDoneCount = 0; private boolean connectRequiresRecovery = false; private volatile List<DSInfo> dsList = new ArrayList<DSInfo>(); private volatile long generationID; private volatile int updateDoneCount = 0; private volatile boolean connectRequiresRecovery = false; /** * The map of replication server info initialized at connection time and * regularly updated. This is used to decide to which best suitable * replication server one wants to connect. * Key: replication server id * Value: replication server info for the matching replication server id * replication server one wants to connect. Key: replication server id Value: * replication server info for the matching replication server id */ private Map<Integer, ReplicationServerInfo> replicationServerInfos = null; private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos = null; /** * This integer defines when the best replication server checking algorithm * should be engaged. @@ -769,7 +772,7 @@ { Map<Integer, ReplicationServerInfo> rsInfos = new HashMap<Integer, ReplicationServerInfo>(); new ConcurrentHashMap<Integer, ReplicationServerInfo>(); for (String server : servers) { @@ -2535,8 +2538,6 @@ * called in a single thread or protected by a locking mechanism * before being called. * * @throws SocketTimeoutException if the timeout set by setSoTimeout * has expired * @param reconnectToTheBestRS Whether broker will automatically switch * to the best suitable RS. * @param reconnectOnFailure Whether broker will automatically reconnect opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -23,6 +23,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS */ package org.opends.server.replication.service; @@ -124,8 +125,8 @@ * must use the {@link #publish(UpdateMsg)} method. * <p> * If the Full Initialization process is needed then implementation * for {@link #importBackend(InputStream)} and * {@link #exportBackend(OutputStream)} must be * for {@code importBackend(InputStream)} and * {@code exportBackend(OutputStream)} must be * provided. * <p> * Full Initialization of a replica can be triggered by LDAP clients @@ -1063,19 +1064,27 @@ private final int serverToInitialize; private final int initWindow; /** * Constructor for the ExportThread. * * @param serverToInitialize serverId of server that will receive entries * @param serverToInitialize * serverId of server that will receive entries * @param initWindow * The value of the initialization window for flow control between * the importer and the exporter. */ public ExportThread(int serverToInitialize, int initWindow) { super("Export thread from serverId=" + serverID + " to serverId=" + serverToInitialize); super("Export thread from serverId=" + serverID + " to serverId=" + serverToInitialize); this.serverToInitialize = serverToInitialize; this.initWindow = initWindow; } /** * Run method for this class. */ @@ -1342,11 +1351,8 @@ * @return The source as a integer value * @throws DirectoryException if the string is not valid */ public int decodeTarget(String targetString) throws DirectoryException public int decodeTarget(String targetString) throws DirectoryException { int target = 0; Throwable cause; if (targetString.equalsIgnoreCase("all")) { return RoutableMsg.ALL_SERVERS; @@ -1355,34 +1361,26 @@ // So should be a serverID try { target = Integer.decode(targetString); int target = Integer.decode(targetString); if (target >= 0) { // FIXME Could we check now that it is a know server in the domain ? } return target; } catch(Exception e) catch (Exception e) { cause = e; ResultCode resultCode = ResultCode.OTHER; Message message = ERR_INVALID_EXPORT_TARGET.get(); throw new DirectoryException(resultCode, message, e); } ResultCode resultCode = ResultCode.OTHER; Message message = ERR_INVALID_EXPORT_TARGET.get(); if (cause != null) throw new DirectoryException( resultCode, message, cause); else throw new DirectoryException( resultCode, message); } /** * Initializes a remote server from this server. * <p> * The {@link #exportBackend(OutputStream)} will therefore be called * on this server, and the {@link #importBackend(InputStream)} * The {@code exportBackend(OutputStream)} will therefore be called * on this server, and the {@code importBackend(InputStream)} * will be called on the remote server. * <p> * The InputStream and OutpuStream given as a parameter to those @@ -2138,8 +2136,8 @@ * When this method is called, a request for initialization will * be sent to the source server asking for initialization. * <p> * The {@link #exportBackend(OutputStream)} will therefore be called * on the source server, and the {@link #importBackend(InputStream)} * The {@code exportBackend(OutputStream)} will therefore be called * on the source server, and the {@code importBackend(InputStream)} * will be called on his server. * <p> * The InputStream and OutpuStream given as a parameter to those @@ -2161,8 +2159,8 @@ /** * Initializes a remote server from this server. * <p> * The {@link #exportBackend(OutputStream)} will therefore be called * on this server, and the {@link #importBackend(InputStream)} * The {@code exportBackend(OutputStream)} will therefore be called * on this server, and the {@code importBackend(InputStream)} * will be called on the remote server. * <p> * The InputStream and OutpuStream given as a parameter to those