From a5131f44a6afa554af8f4c82c7ffd3d4ceac1bd4 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 04 Feb 2011 12:50:58 +0000
Subject: [PATCH] OPEN - issue OPENDJ-26: Fix OpenDS issue 4585: ConcurrentModificationException in ReplicationBroker https://bugster.forgerock.org/jira/browse/OPENDJ-26
---
opends/src/server/org/opends/server/replication/protocol/SocketSession.java | 3
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java | 38 +-
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java | 42 ++-
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 359 +++++++++++++++--------------
opends/src/server/org/opends/server/replication/service/ListenerThread.java | 24 +
opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java | 12
opends/src/server/org/opends/server/replication/server/ServerReader.java | 5
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 54 ++--
opends/src/server/org/opends/server/replication/server/ServerWriter.java | 9
opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java | 3
opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java | 19
opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java | 3
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java | 3
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 47 ++-
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 35 +-
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 1
opends/src/server/org/opends/server/replication/common/ServerState.java | 5
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java | 6
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java | 7
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java | 39 --
20 files changed, 369 insertions(+), 345 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 16861da..e8fea1c 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.common;
@@ -48,8 +49,8 @@
*/
public class ServerState implements Iterable<Integer>
{
- private HashMap<Integer, ChangeNumber> list;
- private boolean saved = true;
+ private final HashMap<Integer, ChangeNumber> list;
+ private volatile boolean saved = true;
/**
* Creates a new empty ServerState.
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 29e756b..27d3451 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -191,8 +191,8 @@
*/
private class ScanSearchListener implements InternalSearchListener
{
- private ChangeNumber startingChangeNumber = null;
- private ChangeNumber endChangeNumber = null;
+ private final ChangeNumber startingChangeNumber;
+ private final ChangeNumber endChangeNumber;
public ScanSearchListener(
ChangeNumber startingChangeNumber,
@@ -262,8 +262,8 @@
private final PersistentServerState state;
private int numReplayedPostOpCalled = 0;
- private long generationId = -1;
- private boolean generationIdSavedStatus = false;
+ private volatile long generationId = -1;
+ private volatile boolean generationIdSavedStatus = false;
private final ChangeNumberGenerator generator;
@@ -289,15 +289,15 @@
private final DN baseDn;
- private boolean shutdown = false;
+ private volatile boolean shutdown = false;
private final InternalClientConnection conn =
InternalClientConnection.getRootConnection();
private boolean solveConflictFlag = true;
- private boolean disabled = false;
- private boolean stateSavingDisabled = false;
+ private volatile boolean disabled = false;
+ private volatile boolean stateSavingDisabled = false;
// This list is used to temporary store operations that needs
// to be replayed at session establishment time.
@@ -311,7 +311,7 @@
* Possible values are accept-updates or deny-updates, but other values
* may be added in the future.
*/
- private IsolationPolicy isolationpolicy;
+ private IsolationPolicy isolationPolicy;
/**
* The DN of the configuration entry of this domain.
@@ -323,7 +323,7 @@
* A boolean indicating if the thread used to save the persistentServerState
* is terminated.
*/
- private boolean done = true;
+ private volatile boolean done = true;
private ServerStateFlush flushThread;
@@ -374,7 +374,7 @@
* fractional configuration (i.e with compliant fractional configuration in
* domain root entry).
*/
- private boolean force_bad_data_set = false;
+ private boolean forceBadDataSet = false;
/**
* This flag is used by the fractional replication ldif import plugin to
@@ -447,21 +447,24 @@
{
done = false;
- while (shutdown == false)
+ while (shutdown == false)
{
try
{
synchronized (this)
{
this.wait(1000);
- if (!disabled && !stateSavingDisabled )
+ if (!disabled && !stateSavingDisabled)
{
// save the ServerState
state.save();
}
}
- } catch (InterruptedException e)
- { }
+ }
+ catch (InterruptedException e)
+ {
+ // Thread interrupted: check for shutdown.
+ }
}
state.save();
@@ -475,7 +478,7 @@
*/
private class RSUpdater extends DirectoryThread
{
- private ChangeNumber startChangeNumber;
+ private final ChangeNumber startChangeNumber;
protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
{
super("Replication Server Updater for server id " +
@@ -571,7 +574,7 @@
this.baseDn = configuration.getBaseDN();
int window = configuration.getWindowSize();
heartbeatInterval = configuration.getHeartbeatInterval();
- this.isolationpolicy = configuration.getIsolationPolicy();
+ this.isolationPolicy = configuration.getIsolationPolicy();
this.configDn = configuration.dn();
this.logChangeNumber = configuration.isLogChangenumber();
this.updateToReplayQueue = updateToReplayQueue;
@@ -2030,12 +2033,12 @@
*/
private boolean brokerIsConnected(PreOperationOperation op)
{
- if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
+ if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
{
// this policy imply that we always accept updates.
return true;
}
- if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
+ if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
{
// this isolation policy specifies that the updates are denied
// when the broker had problems during the connection phase
@@ -4429,7 +4432,7 @@
public ConfigChangeResult applyConfigurationChange(
ReplicationDomainCfg configuration)
{
- isolationpolicy = configuration.getIsolationPolicy();
+ isolationPolicy = configuration.getIsolationPolicy();
logChangeNumber = configuration.isLogChangenumber();
histPurgeDelayInMilliSec =
configuration.getConflictsHistoricalPurgeDelay()*60*1000;
@@ -4657,7 +4660,7 @@
{
// Check domain fractional configuration consistency with local
// configuration variables
- force_bad_data_set = !isBackendFractionalConfigConsistent();
+ forceBadDataSet = !isBackendFractionalConfigConsistent();
super.sessionInitiated(
initStatus, replicationServerState,generationID, session);
@@ -4687,7 +4690,7 @@
}
// Now for bad data set status if needed
- if (force_bad_data_set)
+ if (forceBadDataSet)
{
// Go into bad data set status
setNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
@@ -4950,7 +4953,7 @@
{
// Ignore message if fractional configuration is inconcsistent and
// we have been passed into bad data set status
- if (force_bad_data_set)
+ if (forceBadDataSet)
{
return false;
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
index 1899935..c964423 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.plugin;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
@@ -54,9 +55,9 @@
*/
private static final DebugTracer TRACER = getTracer();
- private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
- private boolean shutdown = false;
- private boolean done = false;
+ private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
+ private volatile boolean shutdown = false;
+ private volatile boolean done = false;
private static int count = 0;
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java b/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
index 518435b..5a0688b 100644
--- a/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
+++ b/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2007-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -54,25 +55,25 @@
/**
* The session on which heartbeats are to be monitored.
*/
- private ProtocolSession session;
+ private final ProtocolSession session;
/**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
*/
- private long heartbeatInterval;
+ private final long heartbeatInterval;
/**
* Set this to stop the thread.
*/
- private boolean shutdown = false;
+ private volatile boolean shutdown = false;
/**
* Send StopMsg before session closure or not.
*/
- private boolean sendStopBeforeClose = false;
+ private final boolean sendStopBeforeClose;
/**
@@ -131,7 +132,8 @@
try
{
session.publish(new StopMsg());
- } catch(IOException ioe)
+ }
+ catch (IOException ioe)
{
// Anyway, going to close session, so nothing to do
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java b/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
index 8b97568..778a5a4 100644
--- a/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
+++ b/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -50,25 +51,25 @@
/**
* For test purposes only to simulate loss of heartbeats.
*/
- static private boolean heartbeatsDisabled = false;
+ private static volatile boolean heartbeatsDisabled = false;
/**
* The session on which heartbeats are to be sent.
*/
- private ProtocolSession session;
+ private final ProtocolSession session;
/**
* The time in milliseconds between heartbeats.
*/
- private long heartbeatInterval;
+ private final long heartbeatInterval;
/**
* Set this to stop the thread.
*/
- private Boolean shutdown = false;
- private final Object shutdown_lock = new Object();
+ private volatile boolean shutdown = false;
+ private final Object shutdownLock = new Object();
/**
@@ -136,11 +137,11 @@
TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
}
- synchronized (shutdown_lock)
+ synchronized (shutdownLock)
{
if (!shutdown)
{
- shutdown_lock.wait(sleepTime);
+ shutdownLock.wait(sleepTime);
}
}
}
@@ -174,10 +175,10 @@
*/
public void shutdown()
{
- synchronized (shutdown_lock)
+ synchronized (shutdownLock)
{
shutdown = true;
- shutdown_lock.notifyAll();
+ shutdownLock.notifyAll();
if (debugEnabled())
{
TRACER.debugInfo("Going to notify Heartbeat thread.");
diff --git a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 18cf6d3..9e2a936 100644
--- a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -59,7 +60,7 @@
/**
* The time the last message published to this session.
*/
- private long lastPublishTime = 0;
+ private volatile long lastPublishTime = 0;
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index ff6ea07..adf16b2 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -62,7 +63,7 @@
/**
* The time the last message published to this session.
*/
- private long lastPublishTime = 0;
+ private volatile long lastPublishTime = 0;
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index c26fd77..2fdf43b 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -23,16 +23,14 @@
*
*
* Copyright 2007-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.AssuredMode;
@@ -60,9 +58,9 @@
public class TopologyMsg extends ReplicationMsg
{
// Information for the DS known in the topology
- private List<DSInfo> dsList = new ArrayList<DSInfo>();
+ private final List<DSInfo> dsList;
// Information for the RS known in the topology
- private List<RSInfo> rsList = new ArrayList<RSInfo>();
+ private final List<RSInfo> rsList;
/**
* Creates a new changelogInfo message from its encoded form.
@@ -74,7 +72,168 @@
*/
public TopologyMsg(byte[] in, short version) throws DataFormatException
{
- decode(in, version);
+ try
+ {
+ /* First byte is the type */
+ if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
+ {
+ throw new DataFormatException(
+ "Input is not a valid " + this.getClass().getCanonicalName());
+ }
+
+ int pos = 1;
+
+ /* Read number of following DS info entries */
+
+ byte nDsInfo = in[pos++];
+
+ /* Read the DS info entries */
+ List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo));
+ while ( (nDsInfo > 0) && (pos < in.length) )
+ {
+ /* Read DS id */
+ int length = getNextLength(in, pos);
+ String serverIdString = new String(in, pos, length, "UTF-8");
+ int dsId = Integer.valueOf(serverIdString);
+ pos += length + 1;
+
+ /* Read RS id */
+ length =
+ getNextLength(in, pos);
+ serverIdString =
+ new String(in, pos, length, "UTF-8");
+ int rsId = Integer.valueOf(serverIdString);
+ pos += length + 1;
+
+ /* Read the generation id */
+ length = getNextLength(in, pos);
+ long generationId =
+ Long.valueOf(new String(in, pos, length,
+ "UTF-8"));
+ pos += length + 1;
+
+ /* Read DS status */
+ ServerStatus status = ServerStatus.valueOf(in[pos++]);
+
+ /* Read DS assured flag */
+ boolean assuredFlag;
+ if (in[pos++] == 1)
+ {
+ assuredFlag = true;
+ } else
+ {
+ assuredFlag = false;
+ }
+
+ /* Read DS assured mode */
+ AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
+
+ /* Read DS safe data level */
+ byte safeDataLevel = in[pos++];
+
+ /* Read DS group id */
+ byte groupId = in[pos++];
+
+ /* Read number of referrals URLs */
+ List<String> refUrls = new ArrayList<String>();
+ byte nUrls = in[pos++];
+ byte nRead = 0;
+ /* Read urls until expected number read */
+ while ((nRead != nUrls) &&
+ (pos < in.length) //security
+ )
+ {
+ length = getNextLength(in, pos);
+ String url = new String(in, pos, length, "UTF-8");
+ refUrls.add(url);
+ pos += length + 1;
+ nRead++;
+ }
+
+ Set<String> attrs = new HashSet<String>();
+ short protocolVersion = -1;
+ if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ byte nAttrs = in[pos++];
+ nRead = 0;
+ /* Read attrs until expected number read */
+ while ((nRead != nAttrs) &&
+ (pos < in.length) //security
+ )
+ {
+ length = getNextLength(in, pos);
+ String attr = new String(in, pos, length, "UTF-8");
+ attrs.add(attr);
+ pos += length + 1;
+ nRead++;
+ }
+ /* Read Protocol version */
+ protocolVersion = Short.valueOf(in[pos++]);
+ }
+
+ /* Now create DSInfo and store it in list */
+
+ DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status,
+ assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
+ protocolVersion);
+ dsList.add(dsInfo);
+
+ nDsInfo--;
+ }
+
+ /* Read number of following RS info entries */
+
+ byte nRsInfo = in[pos++];
+
+ /* Read the RS info entries */
+ List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
+ while ( (nRsInfo > 0) && (pos < in.length) )
+ {
+ /* Read RS id */
+ int length = getNextLength(in, pos);
+ String serverIdString = new String(in, pos, length, "UTF-8");
+ int id = Integer.valueOf(serverIdString);
+ pos += length + 1;
+
+ /* Read the generation id */
+ length = getNextLength(in, pos);
+ long generationId =
+ Long.valueOf(new String(in, pos, length,
+ "UTF-8"));
+ pos += length + 1;
+
+ /* Read RS group id */
+ byte groupId = in[pos++];
+
+ int weight = 1;
+ String serverUrl = null;
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ length = getNextLength(in, pos);
+ serverUrl = new String(in, pos, length, "UTF-8");
+ pos += length + 1;
+
+ /* Read RS weight */
+ length = getNextLength(in, pos);
+ weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length + 1;
+ }
+
+ /* Now create RSInfo and store it in list */
+
+ RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
+ weight);
+ rsList.add(rsInfo);
+
+ nRsInfo--;
+ }
+
+ this.dsList = Collections.unmodifiableList(dsList);
+ this.rsList = Collections.unmodifiableList(rsList);
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
}
/**
@@ -85,10 +244,23 @@
*/
public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
{
- if (dsList != null) // null means no info, let empty list from init time
- this.dsList = dsList;
- if (rsList != null) // null means no info, let empty list from init time
- this.rsList = rsList;
+ if (dsList == null || dsList.isEmpty())
+ {
+ this.dsList = Collections.emptyList();
+ }
+ else
+ {
+ this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList));
+ }
+
+ if (rsList == null || rsList.isEmpty())
+ {
+ this.rsList = Collections.emptyList();
+ }
+ else
+ {
+ this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList));
+ }
}
// ============
@@ -219,172 +391,7 @@
}
- // ============
- // Msg decoding
- // ============
- private void decode(byte[] in, short version)
- throws DataFormatException
- {
- try
- {
- /* First byte is the type */
- if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
- {
- throw new DataFormatException(
- "Input is not a valid " + this.getClass().getCanonicalName());
- }
-
- int pos = 1;
-
- /* Read number of following DS info entries */
-
- byte nDsInfo = in[pos++];
-
- /* Read the DS info entries */
- while ( (nDsInfo > 0) && (pos < in.length) )
- {
- /* Read DS id */
- int length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- int dsId = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read RS id */
- length =
- getNextLength(in, pos);
- serverIdString =
- new String(in, pos, length, "UTF-8");
- int rsId = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read the generation id */
- length = getNextLength(in, pos);
- long generationId =
- Long.valueOf(new String(in, pos, length,
- "UTF-8"));
- pos += length + 1;
-
- /* Read DS status */
- ServerStatus status = ServerStatus.valueOf(in[pos++]);
-
- /* Read DS assured flag */
- boolean assuredFlag;
- if (in[pos++] == 1)
- {
- assuredFlag = true;
- } else
- {
- assuredFlag = false;
- }
-
- /* Read DS assured mode */
- AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
-
- /* Read DS safe data level */
- byte safeDataLevel = in[pos++];
-
- /* Read DS group id */
- byte groupId = in[pos++];
-
- /* Read number of referrals URLs */
- List<String> refUrls = new ArrayList<String>();
- byte nUrls = in[pos++];
- byte nRead = 0;
- /* Read urls until expected number read */
- while ((nRead != nUrls) &&
- (pos < in.length) //security
- )
- {
- length = getNextLength(in, pos);
- String url = new String(in, pos, length, "UTF-8");
- refUrls.add(url);
- pos += length + 1;
- nRead++;
- }
-
- Set<String> attrs = new HashSet<String>();
- short protocolVersion = -1;
- if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- byte nAttrs = in[pos++];
- nRead = 0;
- /* Read attrs until expected number read */
- while ((nRead != nAttrs) &&
- (pos < in.length) //security
- )
- {
- length = getNextLength(in, pos);
- String attr = new String(in, pos, length, "UTF-8");
- attrs.add(attr);
- pos += length + 1;
- nRead++;
- }
- /* Read Protocol version */
- protocolVersion = Short.valueOf(in[pos++]);
- }
-
- /* Now create DSInfo and store it in list */
-
- DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status,
- assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
- protocolVersion);
- dsList.add(dsInfo);
-
- nDsInfo--;
- }
-
- /* Read number of following RS info entries */
-
- byte nRsInfo = in[pos++];
-
- /* Read the RS info entries */
- while ( (nRsInfo > 0) && (pos < in.length) )
- {
- /* Read RS id */
- int length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- int id = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read the generation id */
- length = getNextLength(in, pos);
- long generationId =
- Long.valueOf(new String(in, pos, length,
- "UTF-8"));
- pos += length + 1;
-
- /* Read RS group id */
- byte groupId = in[pos++];
-
- int weight = 1;
- String serverUrl = null;
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- length = getNextLength(in, pos);
- serverUrl = new String(in, pos, length, "UTF-8");
- pos += length + 1;
-
- /* Read RS weight */
- length = getNextLength(in, pos);
- weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
- }
-
- /* Now create RSInfo and store it in list */
-
- RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
- weight);
- rsList.add(rsInfo);
-
- nRsInfo--;
- }
-
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- }
/**
* {@inheritDoc}
diff --git a/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java b/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
index d74a43c..862ec67 100644
--- a/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
+++ b/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -52,22 +53,23 @@
public class MonitoringPublisher extends DirectoryThread
{
- private boolean shutdown = false;
+ private volatile boolean shutdown = false;
+
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
// The domain we send monitoring for
- private ReplicationServerDomain replicationServerDomain;
+ private final ReplicationServerDomain replicationServerDomain;
// Sleep time (in ms) before sending new monitoring messages.
- private long period = 3000;
+ private volatile long period;
// Is the thread terminated ?
- private boolean done = false;
+ private volatile boolean done = false;
- private final Object sleeper = new Object();
+ private final Object shutdownLock = new Object();
/**
* Create a monitoring publisher.
@@ -104,9 +106,12 @@
{
try
{
- synchronized (sleeper)
+ synchronized (shutdownLock)
{
- sleeper.wait(period);
+ if (!shutdown)
+ {
+ shutdownLock.wait(period);
+ }
}
} catch (InterruptedException ex)
{
@@ -157,16 +162,17 @@
*/
public void shutdown()
{
- if (debugEnabled())
+ synchronized (shutdownLock)
{
- TRACER.debugInfo("Shutting down monitoring publisher for dn " +
- replicationServerDomain.getBaseDn().toString() + " in RS " +
- replicationServerDomain.getReplicationServer().getServerId());
- }
- shutdown = true;
- synchronized (sleeper)
- {
- sleeper.notify();
+ shutdown = true;
+ shutdownLock.notifyAll();
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Shutting down monitoring publisher for dn " +
+ replicationServerDomain.getBaseDn().toString() + " in RS " +
+ replicationServerDomain.getReplicationServer().getServerId());
+ }
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 86e27fb..8c5ffb3 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index cc9d667..64976d2 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
import org.opends.messages.*;
@@ -255,13 +256,10 @@
status = cursor.getNext(key, data, LockMode.DEFAULT);
}
- cursor.close();
-
}
- catch (DatabaseException dbe)
+ finally
{
cursor.close();
- throw dbe;
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java
index b7ae03d..4f8447c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -38,7 +39,7 @@
/**
* The Replication Server that created this thread.
*/
- private ReplicationServer server;
+ private final ReplicationServer server;
/**
* Creates a new instance of this directory thread with the
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java
index 8a3cf1e..e2e6ccf 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -38,7 +39,7 @@
/**
* The Replication Server that created this thread.
*/
- private ReplicationServer server;
+ private final ReplicationServer server;
/**
* Creates a new instance of this directory thread with the
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 0e0d14c..4cefcf4 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -58,8 +59,8 @@
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
- private ProtocolSession session;
- private ServerHandler handler;
+ private final ProtocolSession session;
+ private final ServerHandler handler;
/**
* Constructor for the LDAP server reader part of the replicationServer.
diff --git a/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 1bdce8c..8632baa 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
import org.opends.messages.Message;
@@ -57,10 +58,10 @@
*/
private static final DebugTracer TRACER = getTracer();
- private ProtocolSession session;
- private ServerHandler handler;
- private ReplicationServerDomain replicationServerDomain;
- private short protocolVersion = -1;
+ private final ProtocolSession session;
+ private final ServerHandler handler;
+ private final ReplicationServerDomain replicationServerDomain;
+ private final short protocolVersion;
/**
* Create a ServerWriter.
diff --git a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index a48d9ff..f7a5705 100644
--- a/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -47,21 +48,22 @@
public class StatusAnalyzer extends DirectoryThread
{
- private boolean finished = false;
+ private volatile boolean shutdown = false;
+
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
- private ReplicationServerDomain replicationServerDomain;
- private int degradedStatusThreshold = -1;
+ private final ReplicationServerDomain replicationServerDomain;
+ private volatile int degradedStatusThreshold = -1;
// Sleep time for the thread, in ms.
- private int STATUS_ANALYZER_SLEEP_TIME = 5000;
+ private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
- private boolean done = false;
+ private volatile boolean done = false;
- private Object sleeper = new Object();
+ private final Object shutdownLock = new Object();
/**
* Create a StatusAnalyzer.
@@ -95,13 +97,16 @@
}
boolean interrupted = false;
- while (!finished && !interrupted)
+ while (!shutdown && !interrupted)
{
try
{
- synchronized (sleeper)
+ synchronized (shutdownLock)
{
- sleeper.wait(STATUS_ANALYZER_SLEEP_TIME);
+ if (!shutdown)
+ {
+ shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
+ }
}
} catch (InterruptedException ex)
{
@@ -192,16 +197,17 @@
*/
public void shutdown()
{
- if (debugEnabled())
+ synchronized (shutdownLock)
{
- TRACER.debugInfo("Shutting down status analyzer for dn " +
- replicationServerDomain.getBaseDn().toString() + " in RS " +
- replicationServerDomain.getReplicationServer().getServerId());
- }
- finished = true;
- synchronized (sleeper)
- {
- sleeper.notify();
+ shutdown = true;
+ shutdownLock.notifyAll();
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Shutting down status analyzer for dn "
+ + replicationServerDomain.getBaseDn().toString() + " in RS "
+ + replicationServerDomain.getReplicationServer().getServerId());
+ }
}
}
diff --git a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index e301b53..597a6d3 100644
--- a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -51,26 +52,21 @@
private static final DebugTracer TRACER = getTracer();
/**
- * For test purposes only to simulate loss of heartbeats.
- */
- static private boolean heartbeatsDisabled = false;
-
- /**
* The session on which heartbeats are to be sent.
*/
- private ProtocolSession session;
+ private final ProtocolSession session;
/**
* The time in milliseconds between heartbeats.
*/
- private long heartbeatInterval;
- private int serverId;
+ private final long heartbeatInterval;
+ private final int serverId;
/**
* Set this to stop the thread.
*/
- private Boolean shutdown = false;
- private final Object shutdown_lock = new Object();
+ private volatile boolean shutdown = false;
+ private final Object shutdownLock = new Object();
/**
* Create a heartbeat thread.
@@ -112,10 +108,7 @@
if (now > session.getLastPublishTime() + heartbeatInterval)
{
- if (!heartbeatsDisabled)
- {
- session.publish(ctHeartbeatMsg);
- }
+ session.publish(ctHeartbeatMsg);
}
try
@@ -127,11 +120,11 @@
sleepTime = heartbeatInterval;
}
- synchronized (shutdown_lock)
+ synchronized (shutdownLock)
{
if (!shutdown)
{
- shutdown_lock.wait(sleepTime);
+ shutdownLock.wait(sleepTime);
}
}
}
@@ -166,20 +159,10 @@
*/
public void shutdown()
{
- synchronized (shutdown_lock)
+ synchronized (shutdownLock)
{
shutdown = true;
- shutdown_lock.notifyAll();
+ shutdownLock.notifyAll();
}
}
-
-
- /**
- * For testing purposes only to simulate loss of heartbeats.
- * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
- */
- public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
- {
- CTHeartbeatPublisherThread.heartbeatsDisabled = heartbeatsDisabled;
- }
}
diff --git a/opends/src/server/org/opends/server/replication/service/ListenerThread.java b/opends/src/server/org/opends/server/replication/service/ListenerThread.java
index dd48f8a..dab9fcf 100644
--- a/opends/src/server/org/opends/server/replication/service/ListenerThread.java
+++ b/opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.service;
import org.opends.messages.Message;
@@ -48,9 +49,9 @@
*/
private static final DebugTracer TRACER = getTracer();
- private ReplicationDomain repDomain;
- private boolean shutdown = false;
- private boolean done = false;
+ private final ReplicationDomain repDomain;
+ private volatile boolean shutdown = false;
+ private volatile boolean done = false;
/**
@@ -95,11 +96,17 @@
while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
{
if (repDomain.processUpdate(updateMsg) == true)
+ {
repDomain.processUpdateDoneSynchronous(updateMsg);
+ }
}
+
if (updateMsg == null)
+ {
shutdown = true;
- } catch (Exception e)
+ }
+ }
+ catch (Exception e)
{
/*
* catch all exceptions happening in repDomain.receive so that the
@@ -119,6 +126,8 @@
}
}
+
+
/**
* Wait for the completion of this thread.
*/
@@ -134,12 +143,13 @@
n++;
if (n >= FACTOR)
{
- TRACER.debugInfo("Interrupting listener thread for dn " +
- repDomain.getServiceID() + " in DS " + repDomain.getServerId());
+ TRACER.debugInfo("Interrupting listener thread for dn "
+ + repDomain.getServiceID() + " in DS " + repDomain.getServerId());
this.interrupt();
}
}
- } catch (InterruptedException e)
+ }
+ catch (InterruptedException e)
{
// exit the loop if this thread is interrupted.
}
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index d8798bf..cec9b3d 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -97,11 +98,11 @@
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
- private boolean shutdown = false;
- private Collection<String> servers;
- private boolean connected = false;
- private String replicationServer = "Not connected";
- private ProtocolSession session = null;
+ private volatile boolean shutdown = false;
+ private volatile Collection<String> servers;
+ private volatile boolean connected = false;
+ private volatile String replicationServer = "Not connected";
+ private volatile ProtocolSession session = null;
private final ServerState state;
private final String baseDn;
private final int serverId;
@@ -156,7 +157,7 @@
* and to know that it is necessary to print a new message when the broker
* finally succeed to connect.
*/
- private boolean connectionError = false;
+ private volatile boolean connectionError = false;
private final Object connectPhaseLock = new Object();
/**
* The thread that publishes messages to the RS containing the current
@@ -173,18 +174,20 @@
*/
// Info for other DSs.
// Warning: does not contain info for us (for our server id)
- private List<DSInfo> dsList = new ArrayList<DSInfo>();
- private long generationID;
- private int updateDoneCount = 0;
- private boolean connectRequiresRecovery = false;
+ private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
+ private volatile long generationID;
+ private volatile int updateDoneCount = 0;
+ private volatile boolean connectRequiresRecovery = false;
+
/**
* The map of replication server info initialized at connection time and
* regularly updated. This is used to decide to which best suitable
- * replication server one wants to connect.
- * Key: replication server id
- * Value: replication server info for the matching replication server id
+ * replication server one wants to connect. Key: replication server id Value:
+ * replication server info for the matching replication server id
*/
- private Map<Integer, ReplicationServerInfo> replicationServerInfos = null;
+ private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos
+ = null;
+
/**
* This integer defines when the best replication server checking algorithm
* should be engaged.
@@ -769,7 +772,7 @@
{
Map<Integer, ReplicationServerInfo> rsInfos =
- new HashMap<Integer, ReplicationServerInfo>();
+ new ConcurrentHashMap<Integer, ReplicationServerInfo>();
for (String server : servers)
{
@@ -2535,8 +2538,6 @@
* called in a single thread or protected by a locking mechanism
* before being called.
*
- * @throws SocketTimeoutException if the timeout set by setSoTimeout
- * has expired
* @param reconnectToTheBestRS Whether broker will automatically switch
* to the best suitable RS.
* @param reconnectOnFailure Whether broker will automatically reconnect
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index d549cd7..2c0856a 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -124,8 +125,8 @@
* must use the {@link #publish(UpdateMsg)} method.
* <p>
* If the Full Initialization process is needed then implementation
- * for {@link #importBackend(InputStream)} and
- * {@link #exportBackend(OutputStream)} must be
+ * for {@code importBackend(InputStream)} and
+ * {@code exportBackend(OutputStream)} must be
* provided.
* <p>
* Full Initialization of a replica can be triggered by LDAP clients
@@ -1063,19 +1064,27 @@
private final int serverToInitialize;
private final int initWindow;
+
+
/**
* Constructor for the ExportThread.
*
- * @param serverToInitialize serverId of server that will receive entries
+ * @param serverToInitialize
+ * serverId of server that will receive entries
+ * @param initWindow
+ * The value of the initialization window for flow control between
+ * the importer and the exporter.
*/
public ExportThread(int serverToInitialize, int initWindow)
{
- super("Export thread from serverId=" + serverID
- + " to serverId=" + serverToInitialize);
+ super("Export thread from serverId=" + serverID + " to serverId="
+ + serverToInitialize);
this.serverToInitialize = serverToInitialize;
this.initWindow = initWindow;
}
+
+
/**
* Run method for this class.
*/
@@ -1342,11 +1351,8 @@
* @return The source as a integer value
* @throws DirectoryException if the string is not valid
*/
- public int decodeTarget(String targetString)
- throws DirectoryException
+ public int decodeTarget(String targetString) throws DirectoryException
{
- int target = 0;
- Throwable cause;
if (targetString.equalsIgnoreCase("all"))
{
return RoutableMsg.ALL_SERVERS;
@@ -1355,34 +1361,26 @@
// So should be a serverID
try
{
- target = Integer.decode(targetString);
+ int target = Integer.decode(targetString);
if (target >= 0)
{
// FIXME Could we check now that it is a know server in the domain ?
}
return target;
}
- catch(Exception e)
+ catch (Exception e)
{
- cause = e;
+ ResultCode resultCode = ResultCode.OTHER;
+ Message message = ERR_INVALID_EXPORT_TARGET.get();
+ throw new DirectoryException(resultCode, message, e);
}
- ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_INVALID_EXPORT_TARGET.get();
-
- if (cause != null)
- throw new DirectoryException(
- resultCode, message, cause);
- else
- throw new DirectoryException(
- resultCode, message);
-
}
/**
* Initializes a remote server from this server.
* <p>
- * The {@link #exportBackend(OutputStream)} will therefore be called
- * on this server, and the {@link #importBackend(InputStream)}
+ * The {@code exportBackend(OutputStream)} will therefore be called
+ * on this server, and the {@code importBackend(InputStream)}
* will be called on the remote server.
* <p>
* The InputStream and OutpuStream given as a parameter to those
@@ -2138,8 +2136,8 @@
* When this method is called, a request for initialization will
* be sent to the source server asking for initialization.
* <p>
- * The {@link #exportBackend(OutputStream)} will therefore be called
- * on the source server, and the {@link #importBackend(InputStream)}
+ * The {@code exportBackend(OutputStream)} will therefore be called
+ * on the source server, and the {@code importBackend(InputStream)}
* will be called on his server.
* <p>
* The InputStream and OutpuStream given as a parameter to those
@@ -2161,8 +2159,8 @@
/**
* Initializes a remote server from this server.
* <p>
- * The {@link #exportBackend(OutputStream)} will therefore be called
- * on this server, and the {@link #importBackend(InputStream)}
+ * The {@code exportBackend(OutputStream)} will therefore be called
+ * on this server, and the {@code importBackend(InputStream)}
* will be called on the remote server.
* <p>
* The InputStream and OutpuStream given as a parameter to those
--
Gitblit v1.10.0