From 1c59d6c7d4e33c5b88fbe0692c1d50c0eab74c4a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 20 Feb 2014 14:08:01 +0000
Subject: [PATCH] OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR
---
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java | 355 ++++++++++++++++++++++++++++------------------------------
1 files changed, 171 insertions(+), 184 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 60a2b4a..1323777 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -95,15 +95,118 @@
* <p>
* Full Initialization of a replica can be triggered by LDAP clients
* by creating InitializeTasks or InitializeTargetTask.
- * Full initialization can also by triggered from the ReplicationDomain
- * implementation using methods {@link #initializeRemote(int)}
- * or {@link #initializeFromRemote(int)}.
+ * Full initialization can also be triggered from the ReplicationDomain
+ * implementation using methods {@link #initializeRemote(int, Task)}
+ * or {@link #initializeFromRemote(int, Task)}.
* <p>
* At shutdown time, the {@link #disableService()} method should be called to
* cleanly stop the replication service.
*/
public abstract class ReplicationDomain
{
+
+ /**
+ * Contains all the attributes included for the ECL (External Changelog).
+ */
+ // @Immutable
+ private final static class ECLIncludes
+ {
+
+ final Map<Integer, Set<String>> includedAttrsByServer;
+ final Set<String> includedAttrsAllServers;
+
+ final Map<Integer, Set<String>> includedAttrsForDeletesByServer;
+ final Set<String> includedAttrsForDeletesAllServers;
+
+ private ECLIncludes(
+ Map<Integer, Set<String>> includedAttrsByServer,
+ Set<String> includedAttrsAllServers,
+ Map<Integer, Set<String>> includedAttrsForDeletesByServer,
+ Set<String> includedAttrsForDeletesAllServers)
+ {
+ this.includedAttrsByServer = includedAttrsByServer;
+ this.includedAttrsAllServers = includedAttrsAllServers;
+
+ this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer;
+ this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers;
+ }
+
+ @SuppressWarnings("unchecked")
+ public ECLIncludes()
+ {
+ this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP,
+ Collections.EMPTY_SET);
+ }
+
+ /**
+ * Add attributes to be included in the ECL.
+ *
+ * @param serverId
+ * Server where these attributes are configured.
+ * @param includeAttributes
+ * Attributes to be included with all change records, may include
+ * wild-cards.
+ * @param includeAttributesForDeletes
+ * Additional attributes to be included with delete change records,
+ * may include wild-cards.
+ * @return a new {@link ECLIncludes} object if included attributes have
+ * changed, or the current object otherwise.
+ */
+ public ECLIncludes addIncludedAttributes(int serverId,
+ Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
+ {
+ boolean configurationChanged = false;
+
+ Set<String> s1 = new HashSet<String>(includeAttributes);
+
+ // Combine all+delete attributes.
+ Set<String> s2 = new HashSet<String>(s1);
+ s2.addAll(includeAttributesForDeletes);
+
+ Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer;
+ if (!s1.equals(this.includedAttrsByServer.get(serverId)))
+ {
+ configurationChanged = true;
+ eclIncludesByServer = new HashMap<Integer, Set<String>>(
+ this.includedAttrsByServer);
+ eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
+ }
+
+ Map<Integer, Set<String>> eclIncludesForDeletesByServer =
+ this.includedAttrsForDeletesByServer;
+ if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId)))
+ {
+ configurationChanged = true;
+ eclIncludesForDeletesByServer = new HashMap<Integer, Set<String>>(
+ this.includedAttrsForDeletesByServer);
+ eclIncludesForDeletesByServer.put(
+ serverId, Collections.unmodifiableSet(s2));
+ }
+
+ if (!configurationChanged)
+ {
+ return this;
+ }
+
+ // and rebuild the global list to be ready for usage
+ Set<String> eclIncludesAllServer = new HashSet<String>();
+ for (Set<String> attributes : eclIncludesByServer.values())
+ {
+ eclIncludesAllServer.addAll(attributes);
+ }
+
+ Set<String> eclIncludesForDeletesAllServer = new HashSet<String>();
+ for (Set<String> attributes : eclIncludesForDeletesByServer.values())
+ {
+ eclIncludesForDeletesAllServer.addAll(attributes);
+ }
+ return new ECLIncludes(eclIncludesByServer,
+ Collections.unmodifiableSet(eclIncludesAllServer),
+ eclIncludesForDeletesByServer,
+ Collections.unmodifiableSet(eclIncludesForDeletesAllServer));
+ }
+ }
+
/**
* Current status for this replicated domain.
*/
@@ -251,14 +354,8 @@
*/
private final CSNGenerator generator;
- private final Object eclIncludesLock = new Object();
- private final Map<Integer, Set<String>> eclIncludesByServer =
- new HashMap<Integer, Set<String>>();
- private Set<String> eclIncludesAllServers = Collections.emptySet();
-
- private final Map<Integer, Set<String>> eclIncludesForDeletesByServer =
- new HashMap<Integer, Set<String>>();
- private Set<String> eclIncludesForDeletesAllServers = Collections.emptySet();
+ private final AtomicReference<ECLIncludes> eclIncludes =
+ new AtomicReference<ECLIncludes>(new ECLIncludes());
/**
* An object used to protect the initialization of the underlying broker
@@ -551,9 +648,9 @@
* Gets the info for Replicas in the topology (except us).
* @return The info for Replicas in the topology (except us)
*/
- public List<DSInfo> getReplicasList()
+ public Map<Integer, DSInfo> getReplicaInfos()
{
- return broker.getDsList();
+ return broker.getReplicaInfos();
}
/**
@@ -562,20 +659,13 @@
* disconnected. Return null when no server with the provided serverId is
* connected.
*
- * @param serverId The provided serverId of the remote replica
+ * @param dsId The provided serverId of the remote replica
* @return the info related to this remote server if it is connected,
* null is the server is NOT connected.
*/
- public DSInfo isRemoteDSConnected(int serverId)
+ private DSInfo isRemoteDSConnected(int dsId)
{
- for (DSInfo remoteDS : getReplicasList())
- {
- if (remoteDS.getDsId() == serverId)
- {
- return remoteDS;
- }
- }
- return null;
+ return getReplicaInfos().get(dsId);
}
/**
@@ -601,9 +691,9 @@
* @return The info for RSs in the topology (except the one we are connected
* to)
*/
- public List<RSInfo> getRsList()
+ public List<RSInfo> getRsInfos()
{
- return broker.getRsList();
+ return broker.getRsInfos();
}
@@ -1100,7 +1190,7 @@
* for and import, false if the IEContext
* will be used for and export.
*/
- public IEContext(boolean importInProgress)
+ private IEContext(boolean importInProgress)
{
this.importInProgress = importInProgress;
this.startTime = System.currentTimeMillis();
@@ -1114,7 +1204,7 @@
* @return A boolean indicating if a total update import is currently in
* Progress.
*/
- public boolean importInProgress()
+ boolean importInProgress()
{
return importInProgress;
}
@@ -1153,18 +1243,17 @@
entryCount = total;
entryLeftCount = total;
- if (initializeTask != null)
+ if (initializeTask instanceof InitializeTask)
{
- if (initializeTask instanceof InitializeTask)
- {
- ((InitializeTask)initializeTask).setTotal(entryCount);
- ((InitializeTask)initializeTask).setLeft(entryCount);
- }
- else if (initializeTask instanceof InitializeTargetTask)
- {
- ((InitializeTargetTask)initializeTask).setTotal(entryCount);
- ((InitializeTargetTask)initializeTask).setLeft(entryCount);
- }
+ final InitializeTask task = (InitializeTask) initializeTask;
+ task.setTotal(entryCount);
+ task.setLeft(entryCount);
+ }
+ else if (initializeTask instanceof InitializeTargetTask)
+ {
+ final InitializeTargetTask task = (InitializeTargetTask) initializeTask;
+ task.setTotal(entryCount);
+ task.setLeft(entryCount);
}
}
@@ -1177,7 +1266,7 @@
*
* @throws DirectoryException if an error occurred.
*/
- public void updateCounters(int entriesDone) throws DirectoryException
+ private void updateCounters(int entriesDone) throws DirectoryException
{
entryLeftCount -= entriesDone;
@@ -1198,7 +1287,7 @@
@Override
public String toString()
{
- return "[ Entry count=" + this.entryCount +
+ return "[Entry count=" + this.entryCount +
", Entry left count=" + this.entryLeftCount + "]";
}
@@ -1258,7 +1347,7 @@
* @param serverId serverId of the acknowledger/receiver/importer server.
* @param numAck id of the message received.
*/
- public void setAckVal(int serverId, int numAck)
+ private void setAckVal(int serverId, int numAck)
{
if (logger.isTraceEnabled())
logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
@@ -1315,6 +1404,7 @@
if (target >= 0)
{
// FIXME Could we check now that it is a know server in the domain ?
+ // JNR: Yes please
}
return target;
}
@@ -1338,7 +1428,7 @@
*
* @param target The server-id of the server that should be initialized.
* The target can be discovered using the
- * {@link #getReplicasList()} method.
+ * {@link #getReplicaInfos()} method.
* @param initTask The task that triggers this initialization and that should
* be updated with its progress.
*
@@ -1386,13 +1476,10 @@
logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL,
countEntries(), getBaseDNString(), getServerId());
- for (DSInfo dsi : getReplicasList())
- {
- ieCtx.startList.add(dsi.getDsId());
- }
+ ieCtx.startList.addAll(getReplicaInfos().keySet());
// We manage the list of servers with which a flow control can be enabled
- for (DSInfo dsi : getReplicasList())
+ for (DSInfo dsi : getReplicaInfos().values())
{
if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
@@ -1408,7 +1495,7 @@
ieCtx.startList.add(serverToInitialize);
// We manage the list of servers with which a flow control can be enabled
- for (DSInfo dsi : getReplicasList())
+ for (DSInfo dsi : getReplicaInfos().values())
{
if (dsi.getDsId() == serverToInitialize &&
dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
@@ -1453,7 +1540,7 @@
{
throw new DirectoryException(
ResultCode.OTHER,
- ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
+ ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDNString(),
ieCtx.failureList));
}
@@ -1472,7 +1559,7 @@
if (logger.isTraceEnabled())
logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName()
- + " export ends with " + " connected=" + broker.isConnected()
+ + " export ends with connected=" + broker.isConnected()
+ " exportRootException=" + exportRootException);
if (exportRootException != null)
@@ -1592,7 +1679,7 @@
do
{
done = true;
- for (DSInfo dsi : getReplicasList())
+ for (DSInfo dsi : getReplicaInfos().values())
{
if (logger.isTraceEnabled())
logger.trace(
@@ -1650,10 +1737,7 @@
considered in the processing of sorting the successfully initialized
and the others
*/
- for (DSInfo dsi : getReplicasList())
- {
- replicasWeAreWaitingFor.add(dsi.getDsId());
- }
+ replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet());
boolean done;
do
@@ -1698,13 +1782,11 @@
done = false;
break;
}
- else
- {
- if (dsInfo.getGenerationId() == getGenerationID())
- { // and with the expected generationId
- // We're done with this server
- it.remove();
- }
+
+ if (dsInfo.getGenerationId() == getGenerationID())
+ { // and with the expected generationId
+ // We're done with this server
+ it.remove();
}
}
}
@@ -1717,7 +1799,6 @@
Thread.currentThread().interrupt();
} // 1sec
}
-
}
while (!done && !broker.shuttingDown()); // infinite wait
@@ -1967,8 +2048,8 @@
*
* @throws IOException when an error occurred.
*/
- public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
- throws IOException
+ void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
+ throws IOException
{
if (logger.isTraceEnabled())
logger.trace("[IE] Entering exportLDIFEntry entry=" +
@@ -2072,53 +2153,6 @@
}
/**
- * Initializes this domain from another source server.
- * <p>
- * When this method is called, a request for initialization will
- * be sent to the source server asking for initialization.
- * <p>
- * The {@code exportBackend(OutputStream)} will therefore be called
- * on the source server, and the {@code importBackend(InputStream)}
- * will be called on his server.
- * <p>
- * The InputStream and OutpuStream given as a parameter to those
- * methods will be connected through the replication protocol.
- *
- * @param source The server-id of the source from which to initialize.
- * The source can be discovered using the
- * {@link #getReplicasList()} method.
- *
- * @throws DirectoryException If it was not possible to publish the
- * Initialization message to the Topology.
- */
- public void initializeFromRemote(int source) throws DirectoryException
- {
- initializeFromRemote(source, null);
- }
-
- /**
- * Initializes a remote server from this server.
- * <p>
- * The {@code exportBackend(OutputStream)} will therefore be called
- * on this server, and the {@code importBackend(InputStream)}
- * will be called on the remote server.
- * <p>
- * The InputStream and OutpuStream given as a parameter to those
- * methods will be connected through the replication protocol.
- *
- * @param target The server-id of the server that should be initialized.
- * The target can be discovered using the
- * {@link #getReplicasList()} method.
- *
- * @throws DirectoryException If it was not possible to publish the
- * Initialization message to the Topology.
- */
- public void initializeRemote(int target) throws DirectoryException
- {
- initializeRemote(target, null);
- }
-
- /**
* Initializes asynchronously this domain from a remote source server.
* Before returning from this call, for the provided task :
* - the progressing counters are updated during the initialization using
@@ -2131,7 +2165,7 @@
*
* @param source The server-id of the source from which to initialize.
* The source can be discovered using the
- * {@link #getReplicasList()} method.
+ * {@link #getReplicaInfos()} method.
*
* @param initTask The task that launched the initialization
* and should be updated of its progress.
@@ -2217,7 +2251,7 @@
* task has initially been created (this server,
* or the remote server).
*/
- void initialize(InitializeTargetMsg initTargetMsgReceived,
+ private void initialize(InitializeTargetMsg initTargetMsgReceived,
int requesterServerId)
{
InitializeTask initFromTask = null;
@@ -2254,7 +2288,6 @@
ieCtx.importSource = source;
ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
- // Protocol version is -1 when not known.
ieCtx.exporterProtocolVersion = getProtocolVersion(source);
initFromTask = (InitializeTask) ieCtx.initializeTask;
@@ -2382,18 +2415,14 @@
* @param dsServerId The provided serverId.
* @return The protocol version.
*/
- short getProtocolVersion(int dsServerId)
+ private short getProtocolVersion(int dsServerId)
{
- short protocolVersion = -1;
- for (DSInfo dsi : getReplicasList())
+ final DSInfo dsInfo = getReplicaInfos().get(dsServerId);
+ if (dsInfo != null)
{
- if (dsi.getDsId() == dsServerId)
- {
- protocolVersion = dsi.getProtocolVersion();
- break;
- }
+ return dsInfo.getProtocolVersion();
}
- return protocolVersion;
+ return -1;
}
/**
@@ -2459,7 +2488,7 @@
for (int i = 0; i< 50; i++)
{
allSet = true;
- for (RSInfo rsInfo : getRsList())
+ for (RSInfo rsInfo : getRsInfos())
{
// the 'empty' RSes (generationId==-1) are considered as good citizens
if (rsInfo.getGenerationId() != -1 &&
@@ -2498,7 +2527,7 @@
* connected to a Replication Server or it
* was not possible to contact it.
*/
- public void resetReplicationLog() throws DirectoryException
+ void resetReplicationLog() throws DirectoryException
{
// Reset the Generation ID to -1 to clean the ReplicationServers.
resetGenerationId(-1L);
@@ -2913,7 +2942,7 @@
{
// create the broker object used to publish and receive changes
broker = new ReplicationBroker(
- this, state, config, getGenerationID(), new ReplSessionSecurity());
+ this, state, config, new ReplSessionSecurity());
broker.start();
}
}
@@ -3067,8 +3096,8 @@
}
/**
- * Applies a configuration change to the attributes which should be be
- * included in the ECL.
+ * Applies a configuration change to the attributes which should be included
+ * in the ECL.
*
* @param includeAttributes
* attributes to be included with all change records.
@@ -3385,7 +3414,7 @@
* @param msg The byte array containing the information that should
* be sent to the remote entities.
*/
- public void publish(byte[] msg)
+ void publish(byte[] msg)
{
UpdateMsg update;
synchronized (this)
@@ -3489,46 +3518,16 @@
Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
{
- boolean configurationChanged = false;
-
- synchronized (eclIncludesLock)
+ ECLIncludes current;
+ ECLIncludes updated;
+ do
{
- Set<String> s1 = new HashSet<String>(includeAttributes);
-
- // Combine all+delete attributes.
- Set<String> s2 = new HashSet<String>(s1);
- s2.addAll(includeAttributesForDeletes);
-
- if (!s1.equals(eclIncludesByServer.get(serverId)))
- {
- configurationChanged = true;
- eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
- }
-
- if (!s2.equals(eclIncludesForDeletesByServer.get(serverId)))
- {
- configurationChanged = true;
- eclIncludesForDeletesByServer.put(serverId,
- Collections.unmodifiableSet(s2));
- }
-
- // and rebuild the global list to be ready for usage
- Set<String> s = new HashSet<String>();
- for (Set<String> attributes : eclIncludesByServer.values())
- {
- s.addAll(attributes);
- }
- eclIncludesAllServers = Collections.unmodifiableSet(s);
-
- s = new HashSet<String>();
- for (Set<String> attributes : eclIncludesForDeletesByServer.values())
- {
- s.addAll(attributes);
- }
- eclIncludesForDeletesAllServers = Collections.unmodifiableSet(s);
+ current = this.eclIncludes.get();
+ updated = current.addIncludedAttributes(
+ serverId, includeAttributes, includeAttributesForDeletes);
}
-
- return configurationChanged;
+ while (!this.eclIncludes.compareAndSet(current, updated));
+ return current != updated;
}
@@ -3540,10 +3539,7 @@
*/
public Set<String> getEclIncludes()
{
- synchronized (eclIncludesLock)
- {
- return eclIncludesAllServers;
- }
+ return eclIncludes.get().includedAttrsAllServers;
}
@@ -3555,10 +3551,7 @@
*/
public Set<String> getEclIncludesForDeletes()
{
- synchronized (eclIncludesLock)
- {
- return eclIncludesForDeletesAllServers;
- }
+ return eclIncludes.get().includedAttrsForDeletesAllServers;
}
@@ -3573,10 +3566,7 @@
*/
Set<String> getEclIncludes(int serverId)
{
- synchronized (eclIncludesLock)
- {
- return eclIncludesByServer.get(serverId);
- }
+ return eclIncludes.get().includedAttrsByServer.get(serverId);
}
@@ -3591,10 +3581,7 @@
*/
Set<String> getEclIncludesForDeletes(int serverId)
{
- synchronized (eclIncludesLock)
- {
- return eclIncludesForDeletesByServer.get(serverId);
- }
+ return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId);
}
/**
--
Gitblit v1.10.0