From 48312eb62361cc16c74cd7c68346c23db63a2161 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 20 Feb 2014 14:36:12 +0000
Subject: [PATCH] OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR
---
opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 51 ++++++++++++-------------
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java | 62 ++++++++++++++++---------------
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java | 4 +-
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java | 2
4 files changed, 60 insertions(+), 59 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 3b70e8d..a671f55 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -1488,7 +1488,7 @@
// process:
// This is an error termination during the import
// The error is stored and the import is ended by returning null
- final IEContext ieCtx = getImportExportContext();
+ final ImportExportContext ieCtx = getImportExportContext();
LocalizableMessage msg = null;
switch (importErrorMessageId)
{
@@ -3689,41 +3689,40 @@
Backend backend = getBackend();
- IEContext ieCtx = getImportExportContext();
+ ImportExportContext ieCtx = getImportExportContext();
try
{
if (!backend.supportsLDIFImport())
{
ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER,
ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID())));
+ return;
}
- else
- {
- importConfig = new LDIFImportConfig(input);
- List<DN> includeBranches = new ArrayList<DN>();
- includeBranches.add(getBaseDN());
- importConfig.setIncludeBranches(includeBranches);
- importConfig.setAppendToExistingData(false);
- importConfig.setSkipDNValidation(true);
- // We should not validate schema for replication
- importConfig.setValidateSchema(false);
- // Allow fractional replication ldif import plugin to be called
- importConfig.setInvokeImportPlugins(true);
- // Reset the follow import flag and message before starting the import
- importErrorMessageId = -1;
- // TODO How to deal with rejected entries during the import
- importConfig.writeRejectedEntries(
- getFileForPath("logs" + File.separator +
- "replInitRejectedEntries").getAbsolutePath(),
- ExistingFileBehavior.OVERWRITE);
+ importConfig = new LDIFImportConfig(input);
+ List<DN> includeBranches = new ArrayList<DN>();
+ includeBranches.add(getBaseDN());
+ importConfig.setIncludeBranches(includeBranches);
+ importConfig.setAppendToExistingData(false);
+ importConfig.setSkipDNValidation(true);
+ // We should not validate schema for replication
+ importConfig.setValidateSchema(false);
+ // Allow fractional replication ldif import plugin to be called
+ importConfig.setInvokeImportPlugins(true);
+ // Reset the follow import flag and message before starting the import
+ importErrorMessageId = -1;
- // Process import
- preBackendImport(backend);
- backend.importLDIF(importConfig);
+ // TODO How to deal with rejected entries during the import
+ File rejectsFile =
+ getFileForPath("logs" + File.separator + "replInitRejectedEntries");
+ importConfig.writeRejectedEntries(rejectsFile.getAbsolutePath(),
+ ExistingFileBehavior.OVERWRITE);
- stateSavingDisabled = false;
- }
+ // Process import
+ preBackendImport(backend);
+ backend.importLDIF(importConfig);
+
+ stateSavingDisabled = false;
}
catch(Exception e)
{
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 1323777..5ef2bfb 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -242,8 +242,8 @@
* The context related to an import or export being processed
* Null when none is being processed.
*/
- private final AtomicReference<IEContext> ieContext =
- new AtomicReference<IEContext>();
+ private final AtomicReference<ImportExportContext> importExportContext =
+ new AtomicReference<ImportExportContext>();
/**
* The Thread waiting for incoming update messages for this domain and pushing
@@ -663,7 +663,7 @@
* @return the info related to this remote server if it is connected,
* null is the server is NOT connected.
*/
- private DSInfo isRemoteDSConnected(int dsId)
+ private DSInfo getConnectedRemoteDS(int dsId)
{
return getReplicaInfos().get(dsId);
}
@@ -814,7 +814,7 @@
else if (msg instanceof ErrorMsg)
{
ErrorMsg errorMsg = (ErrorMsg)msg;
- IEContext ieCtx = ieContext.get();
+ ImportExportContext ieCtx = importExportContext.get();
if (ieCtx != null)
{
/*
@@ -867,7 +867,7 @@
}
else if (msg instanceof InitializeRcvAckMsg)
{
- IEContext ieCtx = ieContext.get();
+ ImportExportContext ieCtx = importExportContext.get();
if (ieCtx != null)
{
InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
@@ -1108,10 +1108,10 @@
}
/**
- * This class contain the context related to an import or export
- * launched on the domain.
+ * This class contains the context related to an import or export launched on
+ * the domain.
*/
- protected class IEContext
+ protected class ImportExportContext
{
/** The private task that initiated the operation. */
private Task initializeTask;
@@ -1190,7 +1190,7 @@
* for and import, false if the IEContext
* will be used for and export.
*/
- private IEContext(boolean importInProgress)
+ private ImportExportContext(boolean importInProgress)
{
this.importInProgress = importInProgress;
this.startTime = System.currentTimeMillis();
@@ -1356,7 +1356,7 @@
// Recompute the server with the minAck returned,means the slowest server.
slowestServerId = serverId;
- for (Integer sid : ieContext.get().ackVals.keySet())
+ for (Integer sid : importExportContext.get().ackVals.keySet())
{
if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
{
@@ -1462,7 +1462,7 @@
int serverRunningTheTask, Task initTask, int initWindow)
throws DirectoryException
{
- final IEContext ieCtx = acquireIEContext(false);
+ final ImportExportContext ieCtx = acquireIEContext(false);
/*
We manage the list of servers to initialize in order :
@@ -1583,7 +1583,8 @@
logger.trace(
"[IE] Exporter wait for reconnection by the listener thread");
int att=0;
- while (!broker.shuttingDown() && !broker.isConnected()
+ while (!broker.shuttingDown()
+ && !broker.isConnected()
&& ++att < 100)
{
try { Thread.sleep(100); }
@@ -1591,7 +1592,8 @@
}
}
- if (initTask != null && broker.isConnected()
+ if (initTask != null
+ && broker.isConnected()
&& serverToInitialize != RoutableMsg.ALL_SERVERS)
{
/*
@@ -1665,7 +1667,7 @@
* - wait it has finished the import and present the expected generationID,
* - build the failureList.
*/
- private void waitForRemoteStartOfInit(IEContext ieCtx)
+ private void waitForRemoteStartOfInit(ImportExportContext ieCtx)
{
final Set<Integer> replicasWeAreWaitingFor =
new HashSet<Integer>(ieCtx.startList);
@@ -1723,7 +1725,7 @@
* - wait it has finished the import and present the expected generationID,
* - build the failureList.
*/
- private void waitForRemoteEndOfInit(IEContext ieCtx)
+ private void waitForRemoteEndOfInit(ImportExportContext ieCtx)
{
final Set<Integer> replicasWeAreWaitingFor =
new HashSet<Integer>(ieCtx.startList);
@@ -1758,7 +1760,7 @@
continue;
}
- DSInfo dsInfo = isRemoteDSConnected(serverId);
+ DSInfo dsInfo = getConnectedRemoteDS(serverId);
if (dsInfo == null)
{
/*
@@ -1823,11 +1825,11 @@
* Acquire and initialize the import/export context, verifying no other
* import/export is in progress.
*/
- private IEContext acquireIEContext(boolean importInProgress)
+ private ImportExportContext acquireIEContext(boolean importInProgress)
throws DirectoryException
{
- final IEContext ieCtx = new IEContext(importInProgress);
- if (!ieContext.compareAndSet(null, ieCtx))
+ final ImportExportContext ieCtx = new ImportExportContext(importInProgress);
+ if (!importExportContext.compareAndSet(null, ieCtx))
{
// Rejects 2 simultaneous exports
LocalizableMessage message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
@@ -1838,7 +1840,7 @@
private void releaseIEContext()
{
- ieContext.set(null);
+ importExportContext.set(null);
}
/**
@@ -1847,7 +1849,7 @@
*
* @param errorMsg The error message received.
*/
- private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx)
+ private void processErrorMsg(ErrorMsg errorMsg, ImportExportContext ieCtx)
{
//Exporting must not be stopped on the first error, if we run initialize-all
if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
@@ -1885,7 +1887,7 @@
ReplicationMsg msg;
while (true)
{
- IEContext ieCtx = ieContext.get();
+ ImportExportContext ieCtx = importExportContext.get();
try
{
// In the context of the total update, we don't want any automatic
@@ -1983,7 +1985,7 @@
// Other messages received during an import are trashed except
// the topologyMsg.
if (msg instanceof TopologyMsg
- && isRemoteDSConnected(ieCtx.importSource) == null)
+ && getConnectedRemoteDS(ieCtx.importSource) == null)
{
LocalizableMessage errMsg = ERR_INIT_EXPORTER_DISCONNECTION.get(
getBaseDNString(), getServerId(), ieCtx.importSource);
@@ -2056,7 +2058,7 @@
Arrays.toString(lDIFEntry));
// build the message
- IEContext ieCtx = ieContext.get();
+ ImportExportContext ieCtx = importExportContext.get();
EntryMsg entryMessage = new EntryMsg(
getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
++ieCtx.msgCnt);
@@ -2075,7 +2077,7 @@
}
int slowestServerId = ieCtx.getSlowestServer();
- if (isRemoteDSConnected(slowestServerId)==null)
+ if (getConnectedRemoteDS(slowestServerId) == null)
{
ieCtx.setException(new DirectoryException(ResultCode.OTHER,
ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(ieCtx.getSlowestServer())));
@@ -2203,7 +2205,7 @@
update the task.
*/
- final IEContext ieCtx = acquireIEContext(true);
+ final ImportExportContext ieCtx = acquireIEContext(true);
ieCtx.initializeTask = initTask;
ieCtx.attemptCnt = 0;
ieCtx.initReqMsgSent = new InitializeRequestMsg(
@@ -2263,7 +2265,7 @@
int source = initTargetMsgReceived.getSenderID();
- IEContext ieCtx = ieContext.get();
+ ImportExportContext ieCtx = importExportContext.get();
try
{
// Log starting
@@ -2470,7 +2472,7 @@
*/
public boolean ieRunning()
{
- return ieContext.get() != null;
+ return importExportContext.get() != null;
}
/**
@@ -3484,9 +3486,9 @@
*
* @return the Import/Export context associated to this ReplicationDomain
*/
- protected IEContext getImportExportContext()
+ protected ImportExportContext getImportExportContext()
{
- return ieContext.get();
+ return importExportContext.get();
}
/**
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
index b3ca6b7..ec307b8 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -131,7 +131,7 @@
String.valueOf(domain.getGenerationID())));
// Add import/export monitoring attributes
- final IEContext ieContext = domain.getImportExportContext();
+ final ImportExportContext ieContext = domain.getImportExportContext();
if (ieContext != null)
{
addMonitorData(attributes, "total-update",
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index 1f612fe..e0dd4c8 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -44,7 +44,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.service.ReplicationDomain.IEContext;
+import org.opends.server.replication.service.ReplicationDomain.*;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.testng.annotations.DataProvider;
@@ -444,7 +444,7 @@
private long getLeftEntryCount(ReplicationDomain domain)
{
- final IEContext ieContext = domain.getImportExportContext();
+ final ImportExportContext ieContext = domain.getImportExportContext();
if (ieContext != null)
{
return ieContext.getLeftEntryCount();
--
Gitblit v1.10.0