From 473e06f2b7f0c9c57ce90c4ef5b8347bb0c13adf Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 09 Jan 2014 10:45:02 +0000
Subject: [PATCH] Front-port of r10103,10105,10113.
---
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java | 364 +++++++++++++++++++++++++--------------------------
1 files changed, 176 insertions(+), 188 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 7f41ec9..f80743c 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -146,14 +147,15 @@
* The context related to an import or export being processed
* Null when none is being processed.
*/
- protected IEContext ieContext = null;
+ private final AtomicReference<IEContext> ieContext =
+ new AtomicReference<IEContext>();
/**
* The Thread waiting for incoming update messages for this domain and pushing
* them to the global incoming update message queue for later processing by
* replay threads.
*/
- private volatile DirectoryThread listenerThread = null;
+ private volatile DirectoryThread listenerThread;
/**
* A set of counters used for Monitoring.
@@ -732,7 +734,8 @@
else if (msg instanceof ErrorMsg)
{
ErrorMsg errorMsg = (ErrorMsg)msg;
- if (ieContext != null)
+ IEContext ieCtx = ieContext.get();
+ if (ieCtx != null)
{
/*
This is an error termination for the 2 following cases :
@@ -750,10 +753,10 @@
" baseDN: " + getBaseDN() +
" Error Msg received: " + errorMsg);
- if (errorMsg.getCreationTime() > ieContext.startTime)
+ if (errorMsg.getCreationTime() > ieCtx.startTime)
{
// consider only ErrorMsg that relate to the current import/export
- processErrorMsg(errorMsg);
+ processErrorMsg(errorMsg, ieCtx);
}
else
{
@@ -784,10 +787,11 @@
}
else if (msg instanceof InitializeRcvAckMsg)
{
- if (ieContext != null)
+ IEContext ieCtx = ieContext.get();
+ if (ieCtx != null)
{
InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
- ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
+ ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
}
// Trash this msg When no input/export is running/should never happen
}
@@ -1044,7 +1048,7 @@
private long entryLeftCount = 0;
/** Exception raised during the initialization. */
- private DirectoryException exception = null;
+ private DirectoryException exception;
/** Whether the context is related to an import or an export. */
private boolean importInProgress;
@@ -1061,7 +1065,7 @@
/**
* Request message sent when this server has the initializeFromRemote task.
*/
- private InitializeRequestMsg initReqMsgSent = null;
+ private InitializeRequestMsg initReqMsgSent;
/**
* Start time of the initialization process. ErrorMsg timestamped before
@@ -1116,12 +1120,47 @@
}
/**
+ * Returns a boolean indicating if a total update import is currently in
+ * Progress.
+ *
+ * @return A boolean indicating if a total update import is currently in
+ * Progress.
+ */
+ public boolean importInProgress()
+ {
+ return importInProgress;
+ }
+
+ /**
+ * Returns the total number of entries to be processed when a total update
+ * is in progress.
+ *
+ * @return The total number of entries to be processed when a total update
+ * is in progress.
+ */
+ long getTotalEntryCount()
+ {
+ return entryCount;
+ }
+
+ /**
+ * Returns the number of entries still to be processed when a total update
+ * is in progress.
+ *
+ * @return The number of entries still to be processed when a total update
+ * is in progress.
+ */
+ long getLeftEntryCount()
+ {
+ return entryLeftCount;
+ }
+
+ /**
* Initializes the import/export counters with the provider value.
* @param total Total number of entries to be processed.
* @throws DirectoryException if an error occurred.
*/
- private void initializeCounters(long total)
- throws DirectoryException
+ private void initializeCounters(long total) throws DirectoryException
{
entryCount = total;
entryLeftCount = total;
@@ -1150,8 +1189,7 @@
*
* @throws DirectoryException if an error occurred.
*/
- public void updateCounters(int entriesDone)
- throws DirectoryException
+ public void updateCounters(int entriesDone) throws DirectoryException
{
entryLeftCount -= entriesDone;
@@ -1241,7 +1279,7 @@
// Recompute the server with the minAck returned,means the slowest server.
slowestServerId = serverId;
- for (Integer sid : ieContext.ackVals.keySet())
+ for (Integer sid : ieContext.get().ackVals.keySet())
{
if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
{
@@ -1346,10 +1384,7 @@
int serverRunningTheTask, Task initTask, int initWindow)
throws DirectoryException
{
- DirectoryException exportRootException = null;
-
- // Acquire and initialize the export context
- acquireIEContext(false);
+ final IEContext ieCtx = acquireIEContext(false);
/*
We manage the list of servers to initialize in order :
@@ -1365,7 +1400,7 @@
for (DSInfo dsi : getReplicasList())
{
- ieContext.startList.add(dsi.getDsId());
+ ieCtx.startList.add(dsi.getDsId());
}
// We manage the list of servers with which a flow control can be enabled
@@ -1373,7 +1408,7 @@
{
if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- ieContext.setAckVal(dsi.getDsId(), 0);
+ ieCtx.setAckVal(dsi.getDsId(), 0);
}
}
}
@@ -1382,7 +1417,7 @@
logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
getBaseDNString(), getServerId(), serverToInitialize));
- ieContext.startList.add(serverToInitialize);
+ ieCtx.startList.add(serverToInitialize);
// We manage the list of servers with which a flow control can be enabled
for (DSInfo dsi : getReplicasList())
@@ -1390,11 +1425,13 @@
if (dsi.getDsId() == serverToInitialize &&
dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- ieContext.setAckVal(dsi.getDsId(), 0);
+ ieCtx.setAckVal(dsi.getDsId(), 0);
}
}
}
+ DirectoryException exportRootException = null;
+
// loop for the case where the exporter is the initiator
int attempt = 0;
boolean done = false;
@@ -1402,34 +1439,34 @@
{
try
{
- ieContext.exportTarget = serverToInitialize;
+ ieCtx.exportTarget = serverToInitialize;
if (initTask != null)
{
- ieContext.initializeTask = initTask;
+ ieCtx.initializeTask = initTask;
}
- ieContext.initializeCounters(this.countEntries());
- ieContext.msgCnt = 0;
- ieContext.initNumLostConnections = broker.getNumLostConnections();
- ieContext.initWindow = initWindow;
+ ieCtx.initializeCounters(this.countEntries());
+ ieCtx.msgCnt = 0;
+ ieCtx.initNumLostConnections = broker.getNumLostConnections();
+ ieCtx.initWindow = initWindow;
// Send start message to the peer
InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
getBaseDN(), getServerId(), serverToInitialize,
- serverRunningTheTask, ieContext.entryCount, initWindow);
+ serverRunningTheTask, ieCtx.entryCount, initWindow);
broker.publish(initTargetMsg);
// Wait for all servers to be ok
- waitForRemoteStartOfInit();
+ waitForRemoteStartOfInit(ieCtx);
// Servers that left in the list are those for which we could not test
// that they have been successfully initialized.
- if (!ieContext.failureList.isEmpty())
+ if (!ieCtx.failureList.isEmpty())
{
throw new DirectoryException(
ResultCode.OTHER,
ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
- ieContext.failureList.toString()));
+ ieCtx.failureList.toString()));
}
exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
@@ -1441,7 +1478,7 @@
catch(DirectoryException exportException)
{
// Give priority to the first exception raised - stored in the context
- final DirectoryException ieEx = ieContext.exception;
+ final DirectoryException ieEx = ieCtx.exception;
exportRootException = ieEx != null ? ieEx : exportException;
}
@@ -1500,10 +1537,8 @@
continue;
}
- ErrorMsg errorMsg =
- new ErrorMsg(serverToInitialize,
- exportRootException.getMessageObject());
- broker.publish(errorMsg);
+ broker.publish(new ErrorMsg(
+ serverToInitialize, exportRootException.getMessageObject()));
}
catch(Exception e)
{
@@ -1518,20 +1553,20 @@
} // attempt loop
// Wait for all servers to be ok, and build the failure list
- waitForRemoteEndOfInit();
+ waitForRemoteEndOfInit(ieCtx);
// Servers that left in the list are those for which we could not test
// that they have been successfully initialized.
- if (!ieContext.failureList.isEmpty() && exportRootException == null)
+ if (!ieCtx.failureList.isEmpty() && exportRootException == null)
{
exportRootException = new DirectoryException(ResultCode.OTHER,
ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
Long.toString(getGenerationID()),
- ieContext.failureList.toString()));
+ ieCtx.failureList.toString()));
}
// Don't forget to release IEcontext acquired at beginning.
- releaseIEContext();
+ releaseIEContext(); // FIXME should not this be in a finally?
final String cause = exportRootException == null ? ""
: exportRootException.getLocalizedMessage();
@@ -1558,10 +1593,10 @@
* - wait it has finished the import and present the expected generationID,
* - build the failureList.
*/
- private void waitForRemoteStartOfInit()
+ private void waitForRemoteStartOfInit(IEContext ieCtx)
{
final Set<Integer> replicasWeAreWaitingFor =
- new HashSet<Integer>(ieContext.startList);
+ new HashSet<Integer>(ieCtx.startList);
if (debugEnabled())
TRACER.debugInfo(
@@ -1580,7 +1615,7 @@
+ " " + dsi.getStatus()
+ " " + dsi.getGenerationId()
+ " " + getGenerationID());
- if (ieContext.startList.contains(dsi.getDsId()))
+ if (ieCtx.startList.contains(dsi.getDsId()))
{
if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
{
@@ -1604,11 +1639,11 @@
}
while (!done && waitResultAttempt < 1200 && !broker.shuttingDown());
- ieContext.failureList.addAll(replicasWeAreWaitingFor);
+ ieCtx.failureList.addAll(replicasWeAreWaitingFor);
if (debugEnabled())
TRACER.debugInfo(
- "[IE] wait for start ends with " + ieContext.failureList);
+ "[IE] wait for start ends with " + ieCtx.failureList);
}
/**
@@ -1616,10 +1651,10 @@
* - wait it has finished the import and present the expected generationID,
* - build the failureList.
*/
- private void waitForRemoteEndOfInit()
+ private void waitForRemoteEndOfInit(IEContext ieCtx)
{
final Set<Integer> replicasWeAreWaitingFor =
- new HashSet<Integer>(ieContext.startList);
+ new HashSet<Integer>(ieCtx.startList);
if (debugEnabled())
TRACER.debugInfo(
@@ -1645,7 +1680,7 @@
while (it.hasNext())
{
int serverId = it.next();
- if (ieContext.failureList.contains(serverId))
+ if (ieCtx.failureList.contains(serverId))
{
/*
this server has already been in error during initialization
@@ -1701,11 +1736,11 @@
}
while (!done && !broker.shuttingDown()); // infinite wait
- ieContext.failureList.addAll(replicasWeAreWaitingFor);
+ ieCtx.failureList.addAll(replicasWeAreWaitingFor);
if (debugEnabled())
TRACER.debugInfo(
- "[IE] wait for end ends with " + ieContext.failureList);
+ "[IE] wait for end ends with " + ieCtx.failureList);
}
/**
@@ -1718,23 +1753,26 @@
return state;
}
-
- private synchronized void acquireIEContext(boolean importInProgress)
- throws DirectoryException
+ /**
+ * Acquire and initialize the import/export context, verifying no other
+ * import/export is in progress.
+ */
+ private IEContext acquireIEContext(boolean importInProgress)
+ throws DirectoryException
{
- if (ieContext != null)
+ final IEContext ieCtx = new IEContext(importInProgress);
+ if (!ieContext.compareAndSet(null, ieCtx))
{
// Rejects 2 simultaneous exports
Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
throw new DirectoryException(ResultCode.OTHER, message);
}
-
- ieContext = new IEContext(importInProgress);
+ return ieCtx;
}
- private synchronized void releaseIEContext()
+ private void releaseIEContext()
{
- ieContext = null;
+ ieContext.set(null);
}
/**
@@ -1743,13 +1781,13 @@
*
* @param errorMsg The error message received.
*/
- private void processErrorMsg(ErrorMsg errorMsg)
+ private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx)
{
//Exporting must not be stopped on the first error, if we run initialize-all
- if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
+ if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
{
// The ErrorMsg is received while we have started an initialization
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER, errorMsg.getDetails()));
/*
@@ -1759,11 +1797,11 @@
* even after the nextInitAttemptDelay
* During the import, the ErrorMsg will be received by receiveEntryBytes
*/
- if (ieContext.initializeTask instanceof InitializeTask)
+ if (ieCtx.initializeTask instanceof InitializeTask)
{
// Update the task that initiated the import
- ((InitializeTask) ieContext.initializeTask)
- .updateTaskCompletionState(ieContext.getException());
+ ((InitializeTask) ieCtx.initializeTask)
+ .updateTaskCompletionState(ieCtx.getException());
releaseIEContext();
}
@@ -1781,6 +1819,7 @@
ReplicationMsg msg;
while (true)
{
+ IEContext ieCtx = ieContext.get();
try
{
// In the context of the total update, we don't want any automatic
@@ -1807,7 +1846,7 @@
else
{
// Handle connection issues
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT
.get(broker.getReplicationServer())));
return null;
@@ -1819,26 +1858,26 @@
{
EntryMsg entryMsg = (EntryMsg)msg;
byte[] entryBytes = entryMsg.getEntryBytes();
- ieContext.updateCounters(countEntryLimits(entryBytes));
+ ieCtx.updateCounters(countEntryLimits(entryBytes));
- if (ieContext.exporterProtocolVersion >=
+ if (ieCtx.exporterProtocolVersion >=
ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// check the msgCnt of the msg received to check ordering
- if (++ieContext.msgCnt != entryMsg.getMsgId())
+ if (++ieCtx.msgCnt != entryMsg.getMsgId())
{
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(
- String.valueOf(ieContext.msgCnt),
+ String.valueOf(ieCtx.msgCnt),
String.valueOf(entryMsg.getMsgId()))));
return null;
}
// send the ack of flow control mgmt
- if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
+ if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0)
{
final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
- getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
+ getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt);
broker.publish(amsg, false);
if (debugEnabled())
{
@@ -1864,12 +1903,12 @@
This is an error termination during the import
The error is stored and the import is ended by returning null
*/
- if (ieContext.getException() == null)
+ if (ieCtx.getException() == null)
{
ErrorMsg errMsg = (ErrorMsg)msg;
- if (errMsg.getCreationTime() > ieContext.startTime)
+ if (errMsg.getCreationTime() > ieCtx.startTime)
{
- ieContext.setException(
+ ieCtx.setException(
new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
return null;
}
@@ -1880,15 +1919,15 @@
// Other messages received during an import are trashed except
// the topologyMsg.
if (msg instanceof TopologyMsg
- && isRemoteDSConnected(ieContext.importSource) == null)
+ && isRemoteDSConnected(ieCtx.importSource) == null)
{
Message errMsg =
Message.raw(Category.SYNC, Severity.NOTICE,
ERR_INIT_EXPORTER_DISCONNECTION.get(
getBaseDNString(),
Integer.toString(getServerId()),
- Integer.toString(ieContext.importSource)));
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ Integer.toString(ieCtx.importSource)));
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER, errMsg));
return null;
}
@@ -1896,7 +1935,7 @@
}
catch(Exception e)
{
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER,
ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
}
@@ -1957,9 +1996,10 @@
Arrays.toString(lDIFEntry));
// build the message
+ IEContext ieCtx = ieContext.get();
EntryMsg entryMessage = new EntryMsg(
- getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
- ++ieContext.msgCnt);
+ getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
+ ++ieCtx.msgCnt);
// Waiting the slowest loop
while (!broker.shuttingDown())
@@ -1969,30 +2009,30 @@
server that have been stored by the listener thread in the ieContext,
we just abandon the export by throwing an exception.
*/
- if (ieContext.getException() != null)
+ if (ieCtx.getException() != null)
{
- throw new IOException(ieContext.getException().getMessage());
+ throw new IOException(ieCtx.getException().getMessage());
}
- int slowestServerId = ieContext.getSlowestServer();
+ int slowestServerId = ieCtx.getSlowestServer();
if (isRemoteDSConnected(slowestServerId)==null)
{
- ieContext.setException(new DirectoryException(ResultCode.OTHER,
+ ieCtx.setException(new DirectoryException(ResultCode.OTHER,
ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
- Integer.toString(ieContext.getSlowestServer()))));
+ Integer.toString(ieCtx.getSlowestServer()))));
throw new IOException("IOException with nested DirectoryException",
- ieContext.getException());
+ ieCtx.getException());
}
- int ourLastExportedCnt = ieContext.msgCnt;
- int slowestCnt = ieContext.ackVals.get(slowestServerId);
+ int ourLastExportedCnt = ieCtx.msgCnt;
+ int slowestCnt = ieCtx.ackVals.get(slowestServerId);
if (debugEnabled())
TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " +
" our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
- if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow)
+ if ((ourLastExportedCnt - slowestCnt) > ieCtx.initWindow)
{
if (debugEnabled())
TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting");
@@ -2003,13 +2043,13 @@
// process any connection error
if (broker.hasConnectionError()
- || broker.getNumLostConnections() != ieContext.initNumLostConnections)
+ || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
{
// publish failed - store the error in the ieContext ...
DirectoryException de = new DirectoryException(ResultCode.OTHER,
ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
Integer.toString(broker.getRsServerId())));
- ieContext.setExceptionIfNoneSet(de);
+ ieCtx.setExceptionIfNoneSet(de);
// .. and abandon the export by throwing an exception.
throw new IOException(de.getMessage());
}
@@ -2031,13 +2071,13 @@
// process any publish error
if (!sent
|| broker.hasConnectionError()
- || broker.getNumLostConnections() != ieContext.initNumLostConnections)
+ || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
{
// publish failed - store the error in the ieContext ...
DirectoryException de = new DirectoryException(ResultCode.OTHER,
ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
Integer.toString(broker.getRsServerId())));
- ieContext.setExceptionIfNoneSet(de);
+ ieCtx.setExceptionIfNoneSet(de);
// .. and abandon the export by throwing an exception.
throw new IOException(de.getMessage());
}
@@ -2045,11 +2085,11 @@
// publish succeeded
try
{
- ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length));
+ ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length));
}
catch (DirectoryException de)
{
- ieContext.setExceptionIfNoneSet(de);
+ ieCtx.setExceptionIfNoneSet(de);
// .. and abandon the export by throwing an exception.
throw new IOException(de.getMessage());
}
@@ -2127,17 +2167,14 @@
public void initializeFromRemote(int source, Task initTask)
throws DirectoryException
{
- Message errMsg = null;
-
if (debugEnabled())
{
TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this);
}
- if (!broker.isConnected())
- {
- errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString());
- }
+ Message errMsg = !broker.isConnected()
+ ? ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDNString())
+ : null;
/*
We must not test here whether the remote source is connected to
@@ -2156,14 +2193,12 @@
update the task.
*/
- acquireIEContext(true); //test and set if no import already in progress
- ieContext.initializeTask = initTask;
- ieContext.attemptCnt = 0;
- ieContext.initReqMsgSent = new InitializeRequestMsg(
+ final IEContext ieCtx = acquireIEContext(true);
+ ieCtx.initializeTask = initTask;
+ ieCtx.attemptCnt = 0;
+ ieCtx.initReqMsgSent = new InitializeRequestMsg(
getBaseDN(), getServerId(), source, getInitWindow());
-
- // Publish Init request msg
- broker.publish(ieContext.initReqMsgSent);
+ broker.publish(ieCtx.initReqMsgSent);
/*
The normal success processing is now to receive InitTargetMsg then
@@ -2219,6 +2254,7 @@
int source = initTargetMsgReceived.getSenderID();
+ IEContext ieCtx = ieContext.get();
try
{
// Log starting
@@ -2236,16 +2272,16 @@
server.
Test and set if no import already in progress
*/
- acquireIEContext(true);
+ ieCtx = acquireIEContext(true);
}
// Initialize stuff
- ieContext.importSource = source;
- ieContext.initializeCounters(initTargetMsgReceived.getEntryCount());
- ieContext.initWindow = initTargetMsgReceived.getInitWindow();
+ ieCtx.importSource = source;
+ ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
+ ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
// Protocol version is -1 when not known.
- ieContext.exporterProtocolVersion = getProtocolVersion(source);
- initFromTask = (InitializeTask)ieContext.initializeTask;
+ ieCtx.exporterProtocolVersion = getProtocolVersion(source);
+ initFromTask = (InitializeTask) ieCtx.initializeTask;
// Launch the import
importBackend(new ReplInputStream(this));
@@ -2257,14 +2293,14 @@
Store the exception raised. It will be considered if no other exception
has been previously stored in the context
*/
- ieContext.setExceptionIfNoneSet(e);
+ ieCtx.setExceptionIfNoneSet(e);
}
finally
{
if (debugEnabled())
{
TRACER.debugInfo("[IE] Domain=" + this
- + " ends import with exception=" + ieContext.getException()
+ + " ends import with exception=" + ieCtx.getException()
+ " connected=" + broker.isConnected());
}
@@ -2278,10 +2314,10 @@
*/
broker.reStart(false);
- if (ieContext.getException() != null
+ if (ieCtx.getException() != null
&& broker.isConnected()
&& initFromTask != null
- && ++ieContext.attemptCnt < 2)
+ && ++ieCtx.attemptCnt < 2)
{
/*
Worth a new attempt
@@ -2300,13 +2336,13 @@
the request
*/
logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
- ieContext.getException().getLocalizedMessage()));
+ ieCtx.getException().getLocalizedMessage()));
- broker.publish(ieContext.initReqMsgSent);
+ broker.publish(ieCtx.initReqMsgSent);
- ieContext.initializeCounters(0);
- ieContext.exception = null;
- ieContext.msgCnt = 0;
+ ieCtx.initializeCounters(0);
+ ieCtx.exception = null;
+ ieCtx.msgCnt = 0;
// Processing of the received initTargetMsgReceived is done
// let's wait for the next one
@@ -2320,7 +2356,7 @@
*/
logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
e.getLocalizedMessage(),
- ieContext.getException().getLocalizedMessage()));
+ ieCtx.getException().getLocalizedMessage()));
}
}
@@ -2330,19 +2366,19 @@
if (debugEnabled())
{
TRACER.debugInfo("[IE] Domain=" + this
- + " ends initialization with exception=" + ieContext.getException()
+ + " ends initialization with exception=" + ieCtx.getException()
+ " connected=" + broker.isConnected()
+ " task=" + initFromTask
- + " attempt=" + ieContext.attemptCnt);
+ + " attempt=" + ieCtx.attemptCnt);
}
try
{
- if (broker.isConnected() && ieContext.getException() != null)
+ if (broker.isConnected() && ieCtx.getException() != null)
{
// Let's notify the exporter
ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
- ieContext.getException().getMessageObject());
+ ieCtx.getException().getMessageObject());
broker.publish(errorMsg);
}
/*
@@ -2353,7 +2389,7 @@
*/
if (initFromTask != null)
{
- initFromTask.updateTaskCompletionState(ieContext.getException());
+ initFromTask.updateTaskCompletionState(ieCtx.getException());
}
}
finally
@@ -2361,8 +2397,8 @@
Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
getBaseDNString(), initTargetMsgReceived.getSenderID(),
getServerId(),
- (ieContext.getException() == null ? ""
- : ieContext.getException().getLocalizedMessage()));
+ (ieCtx.getException() == null ? ""
+ : ieCtx.getException().getLocalizedMessage()));
logError(msg);
releaseIEContext();
} // finally
@@ -2435,7 +2471,7 @@
*/
public boolean ieRunning()
{
- return ieContext != null;
+ return ieContext.get() != null;
}
/**
@@ -3449,43 +3485,13 @@
}
/**
- * Returns a boolean indicating if a total update import is currently
- * in Progress.
+ * Returns the Import/Export context associated to this ReplicationDomain.
*
- * @return A boolean indicating if a total update import is currently
- * in Progress.
+ * @return the Import/Export context associated to this ReplicationDomain
*/
- public boolean importInProgress()
+ protected IEContext getImportExportContext()
{
- return ieContext != null && ieContext.importInProgress;
- }
-
- /**
- * Returns a boolean indicating if a total update export is currently
- * in Progress.
- *
- * @return A boolean indicating if a total update export is currently
- * in Progress.
- */
- public boolean exportInProgress()
- {
- return ieContext != null && !ieContext.importInProgress;
- }
-
- /**
- * Returns the number of entries still to be processed when a total update
- * is in progress.
- *
- * @return The number of entries still to be processed when a total update
- * is in progress.
- */
- long getLeftEntryCount()
- {
- if (ieContext != null)
- {
- return ieContext.entryLeftCount;
- }
- return 0;
+ return ieContext.get();
}
/**
@@ -3501,24 +3507,6 @@
}
/**
- * Returns the total number of entries to be processed when a total update
- * is in progress.
- *
- * @return The total number of entries to be processed when a total update
- * is in progress.
- */
- long getTotalEntryCount()
- {
- if (ieContext != null)
- {
- return ieContext.entryCount;
- }
- return 0;
- }
-
-
-
- /**
* Set the attributes configured on a server to be included in the ECL.
*
* @param serverId
@@ -3617,7 +3605,7 @@
* The serverId for which we want the include attributes.
* @return The attributes.
*/
- public Set<String> getEclIncludes(int serverId)
+ Set<String> getEclIncludes(int serverId)
{
synchronized (eclIncludesLock)
{
@@ -3635,7 +3623,7 @@
* The serverId for which we want the include attributes.
* @return The attributes.
*/
- public Set<String> getEclIncludesForDeletes(int serverId)
+ Set<String> getEclIncludesForDeletes(int serverId)
{
synchronized (eclIncludesLock)
{
--
Gitblit v1.10.0