From e33ecc4dd66d4c514cd7ad52d348a0be63a7f1eb Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 08 Jan 2014 14:30:15 +0000
Subject: [PATCH] Increased encapsulation of ReplicationDomain.IEContext + moved more behaviour to it. Reduced members' visibilities in LDAPReplicationDomain + removed several methods.
---
opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java | 73 +++---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java | 2
opends/src/server/org/opends/server/replication/common/DSInfo.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java | 2
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 321 ++++++++++++-------------
opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java | 129 +++-------
opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java | 25 -
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 126 ++++------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java | 2
13 files changed, 307 insertions(+), 383 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/DSInfo.java b/opends/src/server/org/opends/server/replication/common/DSInfo.java
index a07ae6b..fb42a43 100644
--- a/opends/src/server/org/opends/server/replication/common/DSInfo.java
+++ b/opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.common;
diff --git a/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java b/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
index ace6d55..55564ab 100644
--- a/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
+++ b/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -42,6 +42,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.plugin.LDAPReplicationDomain.*;
+import static org.opends.server.util.StaticUtils.*;
/**
* This class implements a Directory Server plugin that is used in fractional
@@ -61,7 +62,7 @@
{
/**
* Holds the fractional configuration and if available the replication domain
- * matching this import session (they form the importfractional context).
+ * matching this import session (they form the import fractional context).
* Domain is available if the server is online (import-ldif, online full
* update..) otherwise, this is an import-ldif with server off. The key is the
* ImportConfig object of the session which acts as a cookie for the whole
@@ -126,9 +127,7 @@
super();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override()
public final void initializePlugin(Set<PluginType> pluginTypes,
FractionalLDIFImportPluginCfg configuration)
@@ -153,9 +152,7 @@
}
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override()
public final void finalizePlugin()
{
@@ -217,9 +214,7 @@
return FractionalConfig.toFractionalConfig(matchingReplicatedDomainCfg);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override()
public final void doLDIFImportEnd(LDIFImportConfig importConfig)
{
@@ -343,26 +338,20 @@
}
// Compare backend and local fractional configuration
- boolean sameConfig =
- isFractionalConfigConsistent(localFractionalConfig, exclIt, inclIt);
+ if (isFractionalConfigConsistent(localFractionalConfig, exclIt, inclIt))
+ {
+ // local and remote non/fractional config are equivalent :
+ // follow import, no need to go with filtering as remote backend
+ // should be ok
+ // let import finish
+ return PluginResult.ImportLDIF.continueEntryProcessing();
+ }
+
if (localFractionalConfig.isFractional())
{
- // Local domain is fractional
- if (sameConfig)
- {
- // Both local and remote fractional configuration are equivalent :
- // follow import, no need to go with filtering as remote backend
- // should be ok
- return PluginResult.ImportLDIF.continueEntryProcessing();
- }
-
// Local domain is fractional, remote domain has not same config
- boolean remoteDomainHasSomeConfig = false;
- if ((exclAttr != null && (exclAttr.size() > 0))
- || (inclAttr != null && (inclAttr.size() > 0)))
- {
- remoteDomainHasSomeConfig = true;
- }
+ boolean remoteDomainHasSomeConfig =
+ isNotEmpty(exclAttr) || isNotEmpty(inclAttr);
if (remoteDomainHasSomeConfig)
{
LDAPReplicationDomain domain = importFractionalContext.getDomain();
@@ -372,7 +361,6 @@
// is different : stop import (error will be logged when import is
// stopped)
domain.setImportErrorMessageId(IMPORT_ERROR_MESSAGE_BAD_REMOTE);
- domain.setFollowImport(false);
return PluginResult.ImportLDIF.stopEntryProcessing(null);
}
@@ -384,16 +372,10 @@
// Local domain is fractional but remote domain has no config :
// flush local config into root entry and follow import with filtering
flushFractionalConfigIntoEntry(localFractionalConfig, entry);
- } else
+ }
+ else
{
// Local domain is not fractional
- if (sameConfig)
- {
- // None of the local or remote domain has fractional config : nothing
- // more to do : let import finish
- return PluginResult.ImportLDIF.continueEntryProcessing();
- }
-
LDAPReplicationDomain domain = importFractionalContext.getDomain();
if (domain != null)
{
@@ -401,7 +383,6 @@
//local domain should be configured with the same config as remote one
domain.setImportErrorMessageId(
IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL);
- domain.setFollowImport(false);
return PluginResult.ImportLDIF.stopEntryProcessing(null);
}
@@ -420,6 +401,11 @@
return PluginResult.ImportLDIF.continueEntryProcessing();
}
+ private boolean isNotEmpty(Attribute attr)
+ {
+ return attr != null && attr.size() > 0;
+ }
+
private Attribute getAttribute(String attributeName, Entry entry)
{
AttributeType attrType = DirectoryServer.getAttributeType(attributeName);
@@ -459,51 +445,19 @@
String fractAttribute = fractionalExclusive ?
REPLICATION_FRACTIONAL_EXCLUDE : REPLICATION_FRACTIONAL_INCLUDE;
AttributeBuilder attrBuilder = new AttributeBuilder(fractAttribute);
- boolean somethingToFlush = false;
-
// Add attribute values for all classes
- int size = fractionalAllClassesAttributes.size();
- if (size > 0)
- {
- String fracValue = "*:";
- int i = 1;
- for (String attrName : fractionalAllClassesAttributes)
- {
- fracValue += attrName;
- if (i < size)
- {
- fracValue += ",";
- }
- i++;
- }
- somethingToFlush = true;
- attrBuilder.add(fracValue);
- }
+ boolean somethingToFlush =
+ add(attrBuilder, "*", fractionalAllClassesAttributes);
// Add attribute values for specific classes
- size = fractionalSpecificClassesAttributes.size();
- if (size > 0)
+ if (fractionalSpecificClassesAttributes.size() > 0)
{
- for (String className : fractionalSpecificClassesAttributes.keySet())
+ for (Map.Entry<String, Set<String>> specific
+ : fractionalSpecificClassesAttributes.entrySet())
{
- int valuesSize =
- fractionalSpecificClassesAttributes.get(className).size();
- if (valuesSize > 0)
+ if (add(attrBuilder, specific.getKey(), specific.getValue()))
{
- String fracValue = className + ":";
- int i = 1;
- for (String attrName : fractionalSpecificClassesAttributes.get(
- className))
- {
- fracValue += attrName;
- if (i < valuesSize)
- {
- fracValue += ",";
- }
- i++;
- }
somethingToFlush = true;
- attrBuilder.add(fracValue);
}
}
}
@@ -517,9 +471,18 @@
}
}
- /**
- * {@inheritDoc}
- */
+ private static boolean add(AttributeBuilder attrBuilder, String className,
+ Set<String> values)
+ {
+ if (values.size() > 0)
+ {
+ attrBuilder.add(className + ":" + collectionToString(values, ","));
+ return true;
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override()
public boolean isConfigurationAcceptable(PluginCfg configuration,
List<Message> unacceptableReasons)
@@ -527,9 +490,7 @@
return true;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public boolean isConfigurationChangeAcceptable(
FractionalLDIFImportPluginCfg configuration,
@@ -538,9 +499,7 @@
return true;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public ConfigChangeResult applyConfigurationChange(
FractionalLDIFImportPluginCfg configuration)
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 7b9a5b4..d9b3481 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -41,7 +41,7 @@
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
-import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.AlertGenerator;
@@ -71,7 +71,7 @@
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
-import org.opends.server.workflowelement.localbackend.*;
+import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
@@ -285,13 +285,6 @@
private boolean forceBadDataSet = false;
/**
- * This flag is used by the fractional replication ldif import plugin to
- * stop the (online) import process if a fractional configuration
- * inconsistency is detected by it.
- */
- private boolean followImport = true;
-
- /**
* The message id to be used when an import is stopped with error by
* the fractional replication ldif import plugin.
*/
@@ -481,7 +474,7 @@
storeECLConfiguration(configuration);
solveConflictFlag = isSolveConflict(configuration);
- Backend backend = retrievesBackend(getBaseDN());
+ Backend backend = getBackend();
if (backend == null)
{
throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
@@ -544,20 +537,22 @@
* error by the fractional replication ldif import plugin.
* @param importErrorMessageId The message to use.
*/
- public void setImportErrorMessageId(int importErrorMessageId)
+ void setImportErrorMessageId(int importErrorMessageId)
{
this.importErrorMessageId = importErrorMessageId;
}
/**
- * Sets the boolean telling if the online import currently in progress should
- * continue.
- * @param followImport The boolean telling if the online import currently in
- * progress should continue.
+ * This flag is used by the fractional replication ldif import plugin to stop
+ * the (online) import process if a fractional configuration inconsistency is
+ * detected by it.
+ *
+ * @return true if the online import currently in progress should continue,
+ * false otherwise.
*/
- public void setFollowImport(boolean followImport)
+ private boolean isFollowImport()
{
- this.followImport = followImport;
+ return importErrorMessageId == -1;
}
/**
@@ -1067,7 +1062,7 @@
* @return true if the operation contains some attributes subject to filtering
* by the fractional configuration
*/
- public boolean fractionalFilterOperation(
+ private boolean fractionalFilterOperation(
PreOperationAddOperation addOperation, boolean performFiltering)
{
return fractionalRemoveAttributesFromEntry(fractionalConfig,
@@ -1088,7 +1083,7 @@
* @return true if the operation is inconsistent with fractional
* configuration
*/
- public boolean fractionalFilterOperation(
+ private boolean fractionalFilterOperation(
PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering)
{
// Quick exit if not called for analyze and
@@ -1406,7 +1401,7 @@
* @return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES,
* FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES or FRACTIONAL_BECOME_NO_OP
*/
- public int fractionalFilterOperation(PreOperationModifyOperation
+ private int fractionalFilterOperation(PreOperationModifyOperation
modifyOperation, boolean performFiltering)
{
/*
@@ -1494,7 +1489,7 @@
@Override
protected byte[] receiveEntryBytes()
{
- if (followImport)
+ if (isFollowImport())
{
// Ok, next entry is allowed to be received
return super.receiveEntryBytes();
@@ -1505,19 +1500,20 @@
// process:
// This is an error termination during the import
// The error is stored and the import is ended by returning null
+ final IEContext ieCtx = getImportExportContext();
Message msg = null;
switch (importErrorMessageId)
{
case IMPORT_ERROR_MESSAGE_BAD_REMOTE:
msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE.get(
- getBaseDNString(), Integer.toString(ieContext.getImportSource()));
+ getBaseDNString(), Integer.toString(ieCtx.getImportSource()));
break;
case IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL:
msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL.get(
- getBaseDNString(), Integer.toString(ieContext.getImportSource()));
+ getBaseDNString(), Integer.toString(ieCtx.getImportSource()));
break;
}
- ieContext.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg));
+ ieCtx.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg));
return null;
}
@@ -1552,7 +1548,7 @@
* @return A SynchronizationProviderResult indicating if the operation
* can continue.
*/
- public SynchronizationProviderResult handleConflictResolution(
+ SynchronizationProviderResult handleConflictResolution(
PreOperationDeleteOperation deleteOperation)
{
if (!deleteOperation.isSynchronizationOperation() && !brokerIsConnected())
@@ -1624,7 +1620,7 @@
* @return A SynchronizationProviderResult indicating if the operation
* can continue.
*/
- public SynchronizationProviderResult handleConflictResolution(
+ SynchronizationProviderResult handleConflictResolution(
PreOperationAddOperation addOperation)
{
if (!addOperation.isSynchronizationOperation() && !brokerIsConnected())
@@ -1754,7 +1750,7 @@
* @return A SynchronizationProviderResult indicating if the operation
* can continue.
*/
- public SynchronizationProviderResult handleConflictResolution(
+ SynchronizationProviderResult handleConflictResolution(
PreOperationModifyDNOperation modifyDNOperation)
{
if (!modifyDNOperation.isSynchronizationOperation() && !brokerIsConnected())
@@ -1872,7 +1868,7 @@
* @param modifyOperation the operation
* @return code indicating is operation must proceed
*/
- public SynchronizationProviderResult handleConflictResolution(
+ SynchronizationProviderResult handleConflictResolution(
PreOperationModifyOperation modifyOperation)
{
if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected())
@@ -2243,7 +2239,7 @@
*
* @return The number of updates in the pending list
*/
- public int getPendingUpdatesCount()
+ private int getPendingUpdatesCount()
{
if (pendingChanges != null)
return pendingChanges.size();
@@ -2255,7 +2251,7 @@
*
* @return The number of updates replayed successfully
*/
- public int getNumReplayedPostOpCalled()
+ private int getNumReplayedPostOpCalled()
{
return numReplayedPostOpCalled;
}
@@ -2518,7 +2514,7 @@
*
* @param csn the CSN of the operation with error.
*/
- public void updateError(CSN csn)
+ private void updateError(CSN csn)
{
try
{
@@ -3217,7 +3213,7 @@
* Get the number of modify conflicts successfully resolved.
* @return The number of modify conflicts successfully resolved.
*/
- public int getNumResolvedModifyConflicts()
+ private int getNumResolvedModifyConflicts()
{
return numResolvedModifyConflicts.get();
}
@@ -3226,7 +3222,7 @@
* Get the number of naming conflicts successfully resolved.
* @return The number of naming conflicts successfully resolved.
*/
- public int getNumResolvedNamingConflicts()
+ private int getNumResolvedNamingConflicts()
{
return numResolvedNamingConflicts.get();
}
@@ -3235,7 +3231,7 @@
* Get the number of unresolved conflicts.
* @return The number of unresolved conflicts.
*/
- public int getNumUnresolvedNamingConflicts()
+ private int getNumUnresolvedNamingConflicts()
{
return numUnresolvedNamingConflicts.get();
}
@@ -3322,7 +3318,7 @@
* @return The computed generationId.
* @throws DirectoryException When an error occurs.
*/
- public long computeGenerationId() throws DirectoryException
+ private long computeGenerationId() throws DirectoryException
{
long genId = exportBackend(null, true);
@@ -3503,7 +3499,7 @@
* Do whatever is needed when a backup is started.
* We need to make sure that the serverState is correctly save.
*/
- public void backupStart()
+ void backupStart()
{
state.save();
}
@@ -3511,7 +3507,7 @@
/**
* Do whatever is needed when a backup is finished.
*/
- public void backupEnd()
+ void backupEnd()
{
// Nothing is needed at the moment
}
@@ -3549,7 +3545,7 @@
private long exportBackend(OutputStream output, boolean checksumOutput)
throws DirectoryException
{
- Backend backend = retrievesBackend(getBaseDN());
+ Backend backend = getBackend();
// Acquire a shared lock for the backend.
try
@@ -3678,20 +3674,6 @@
}
/**
- * Retrieves the backend related to the domain.
- *
- * @return The backend of that domain.
- * @param baseDN The baseDN to retrieve the backend
- */
- protected static Backend retrievesBackend(DN baseDN)
- {
- // Retrieves the backend related to this domain
- return DirectoryServer.getBackend(baseDN);
- }
-
-
-
- /**
* Process backend before import.
*
* @param backend
@@ -3731,13 +3713,14 @@
{
LDIFImportConfig importConfig = null;
- Backend backend = retrievesBackend(getBaseDN());
+ Backend backend = getBackend();
+ IEContext ieCtx = getImportExportContext();
try
{
if (!backend.supportsLDIFImport())
{
- ieContext.setExceptionIfNoneSet(new DirectoryException(OTHER,
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER,
ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID())));
}
else
@@ -3754,7 +3737,6 @@
importConfig.setInvokeImportPlugins(true);
// Reset the follow import flag and message before starting the import
importErrorMessageId = -1;
- followImport = true;
// TODO How to deal with rejected entries during the import
importConfig.writeRejectedEntries(
@@ -3771,7 +3753,7 @@
}
catch(Exception e)
{
- ieContext.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER,
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER,
ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(e))));
}
finally
@@ -3783,12 +3765,12 @@
{
importConfig.close();
closeBackendImport(backend); // Re-enable backend
- backend = retrievesBackend(getBaseDN());
+ backend = getBackend();
}
loadDataState();
- if (ieContext.getException() != null)
+ if (ieCtx.getException() != null)
{
// When an error occurred during an import, most of times
// the generationId coming in the root entry of the imported data,
@@ -3804,14 +3786,14 @@
// so we don't bother about the new Exception.
// However if there was no Exception before we want
// to return this Exception to the task creator.
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER,
ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(fe))));
}
}
- if (ieContext.getException() != null)
- throw ieContext.getException();
+ if (ieCtx.getException() != null)
+ throw ieCtx.getException();
}
/**
@@ -3887,9 +3869,9 @@
* Returns the backend associated to this domain.
* @return The associated backend.
*/
- public Backend getBackend()
+ private Backend getBackend()
{
- return retrievesBackend(getBaseDN());
+ return DirectoryServer.getBackend(getBaseDN());
}
/*
@@ -3946,7 +3928,7 @@
}
// Check that the base DN is configured as a base-dn of the directory server
- if (retrievesBackend(dn) == null)
+ if (DirectoryServer.getBackend(dn) == null)
{
unacceptableReasons.add(ERR_UNKNOWN_DN.get(dn.toString()));
return false;
@@ -3997,7 +3979,7 @@
ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
{
// Check that a import/export is not in progress
- if (importInProgress() || exportInProgress())
+ if (ieRunning())
{
unacceptableReasons.add(
NOTE_ERR_CANNOT_CHANGE_CONFIG_DURING_TOTAL_UPDATE.get());
@@ -4059,7 +4041,7 @@
* Remove from this domain configuration, the configuration of the
* external change log.
*/
- public void removeECLDomainCfg()
+ private void removeECLDomainCfg()
{
try
{
@@ -4082,8 +4064,8 @@
* @param domCfg The provided configuration.
* @throws ConfigException When an error occurred.
*/
- public void storeECLConfiguration(ReplicationDomainCfg domCfg)
- throws ConfigException
+ private void storeECLConfiguration(ReplicationDomainCfg domCfg)
+ throws ConfigException
{
ExternalChangelogDomainCfg eclDomCfg = null;
// create the ecl config if it does not exist
@@ -4434,7 +4416,7 @@
@Override
public long countEntries() throws DirectoryException
{
- Backend backend = retrievesBackend(getBaseDN());
+ Backend backend = getBackend();
if (!backend.supportsLDIFExport())
{
Message msg = ERR_INIT_EXPORT_NOT_SUPPORTED.get(backend.getBackendID());
@@ -5105,7 +5087,7 @@
* Specifies whether this domain is enabled/disabled regarding the ECL.
* @return enabled/disabled for the ECL.
*/
- public boolean isECLEnabled()
+ boolean isECLEnabled()
{
return this.eclDomain.isEnabled();
}
@@ -5116,7 +5098,7 @@
*
* @return the purge delay.
*/
- public long getHistoricalPurgeDelay()
+ long getHistoricalPurgeDelay()
{
return config.getConflictsHistoricalPurgeDelay() * 60 * 1000;
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java b/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
index 52a16cc..87913ce 100644
--- a/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.protocol;
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index f0338cc..cfbd5dd 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -146,14 +146,14 @@
* The context related to an import or export being processed
* Null when none is being processed.
*/
- protected IEContext ieContext = null;
+ volatile IEContext ieContext;
/**
* The Thread waiting for incoming update messages for this domain and pushing
* them to the global incoming update message queue for later processing by
* replay threads.
*/
- private volatile DirectoryThread listenerThread = null;
+ private volatile DirectoryThread listenerThread;
/**
* A set of counters used for Monitoring.
@@ -732,7 +732,8 @@
else if (msg instanceof ErrorMsg)
{
ErrorMsg errorMsg = (ErrorMsg)msg;
- if (ieContext != null)
+ IEContext ieCtx = ieContext;
+ if (ieCtx != null)
{
/*
This is an error termination for the 2 following cases :
@@ -750,10 +751,10 @@
" baseDN: " + getBaseDN() +
" Error Msg received: " + errorMsg);
- if (errorMsg.getCreationTime() > ieContext.startTime)
+ if (errorMsg.getCreationTime() > ieCtx.startTime)
{
// consider only ErrorMsg that relate to the current import/export
- processErrorMsg(errorMsg);
+ processErrorMsg(errorMsg, ieCtx);
}
else
{
@@ -784,10 +785,11 @@
}
else if (msg instanceof InitializeRcvAckMsg)
{
- if (ieContext != null)
+ IEContext ieCtx = ieContext;
+ if (ieCtx != null)
{
InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
- ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
+ ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
}
// Trash this msg When no input/export is running/should never happen
}
@@ -1044,7 +1046,7 @@
private long entryLeftCount = 0;
/** Exception raised during the initialization. */
- private DirectoryException exception = null;
+ private DirectoryException exception;
/** Whether the context is related to an import or an export. */
private boolean importInProgress;
@@ -1061,7 +1063,7 @@
/**
* Request message sent when this server has the initializeFromRemote task.
*/
- private InitializeRequestMsg initReqMsgSent = null;
+ private InitializeRequestMsg initReqMsgSent;
/**
* Start time of the initialization process. ErrorMsg timestamped before
@@ -1116,12 +1118,47 @@
}
/**
+ * Returns a boolean indicating if a total update import is currently in
+ * Progress.
+ *
+ * @return A boolean indicating if a total update import is currently in
+ * Progress.
+ */
+ public boolean importInProgress()
+ {
+ return importInProgress;
+ }
+
+ /**
+ * Returns the total number of entries to be processed when a total update
+ * is in progress.
+ *
+ * @return The total number of entries to be processed when a total update
+ * is in progress.
+ */
+ long getTotalEntryCount()
+ {
+ return entryCount;
+ }
+
+ /**
+ * Returns the number of entries still to be processed when a total update
+ * is in progress.
+ *
+ * @return The number of entries still to be processed when a total update
+ * is in progress.
+ */
+ long getLeftEntryCount()
+ {
+ return entryLeftCount;
+ }
+
+ /**
* Initializes the import/export counters with the provider value.
* @param total Total number of entries to be processed.
* @throws DirectoryException if an error occurred.
*/
- private void initializeCounters(long total)
- throws DirectoryException
+ private void initializeCounters(long total) throws DirectoryException
{
entryCount = total;
entryLeftCount = total;
@@ -1150,8 +1187,7 @@
*
* @throws DirectoryException if an error occurred.
*/
- public void updateCounters(int entriesDone)
- throws DirectoryException
+ public void updateCounters(int entriesDone) throws DirectoryException
{
entryLeftCount -= entriesDone;
@@ -1358,6 +1394,7 @@
- to update the task with the server(s) where this test failed
*/
+ IEContext ieCtx = ieContext;
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
@@ -1365,7 +1402,7 @@
for (DSInfo dsi : getReplicasList())
{
- ieContext.startList.add(dsi.getDsId());
+ ieCtx.startList.add(dsi.getDsId());
}
// We manage the list of servers with which a flow control can be enabled
@@ -1373,7 +1410,7 @@
{
if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- ieContext.setAckVal(dsi.getDsId(), 0);
+ ieCtx.setAckVal(dsi.getDsId(), 0);
}
}
}
@@ -1382,7 +1419,7 @@
logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
getBaseDNString(), getServerId(), serverToInitialize));
- ieContext.startList.add(serverToInitialize);
+ ieCtx.startList.add(serverToInitialize);
// We manage the list of servers with which a flow control can be enabled
for (DSInfo dsi : getReplicasList())
@@ -1390,7 +1427,7 @@
if (dsi.getDsId() == serverToInitialize &&
dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- ieContext.setAckVal(dsi.getDsId(), 0);
+ ieCtx.setAckVal(dsi.getDsId(), 0);
}
}
}
@@ -1402,34 +1439,34 @@
{
try
{
- ieContext.exportTarget = serverToInitialize;
+ ieCtx.exportTarget = serverToInitialize;
if (initTask != null)
{
- ieContext.initializeTask = initTask;
+ ieCtx.initializeTask = initTask;
}
- ieContext.initializeCounters(this.countEntries());
- ieContext.msgCnt = 0;
- ieContext.initNumLostConnections = broker.getNumLostConnections();
- ieContext.initWindow = initWindow;
+ ieCtx.initializeCounters(this.countEntries());
+ ieCtx.msgCnt = 0;
+ ieCtx.initNumLostConnections = broker.getNumLostConnections();
+ ieCtx.initWindow = initWindow;
// Send start message to the peer
InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
getBaseDN(), getServerId(), serverToInitialize,
- serverRunningTheTask, ieContext.entryCount, initWindow);
+ serverRunningTheTask, ieCtx.entryCount, initWindow);
broker.publish(initTargetMsg);
// Wait for all servers to be ok
- waitForRemoteStartOfInit();
+ waitForRemoteStartOfInit(ieCtx);
// Servers that left in the list are those for which we could not test
// that they have been successfully initialized.
- if (!ieContext.failureList.isEmpty())
+ if (!ieCtx.failureList.isEmpty())
{
throw new DirectoryException(
ResultCode.OTHER,
ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(
- ieContext.failureList.toString()));
+ ieCtx.failureList.toString()));
}
exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
@@ -1441,7 +1478,7 @@
catch(DirectoryException exportException)
{
// Give priority to the first exception raised - stored in the context
- final DirectoryException ieEx = ieContext.exception;
+ final DirectoryException ieEx = ieCtx.exception;
exportRootException = ieEx != null ? ieEx : exportException;
}
@@ -1500,10 +1537,8 @@
continue;
}
- ErrorMsg errorMsg =
- new ErrorMsg(serverToInitialize,
- exportRootException.getMessageObject());
- broker.publish(errorMsg);
+ broker.publish(new ErrorMsg(
+ serverToInitialize, exportRootException.getMessageObject()));
}
catch(Exception e)
{
@@ -1518,16 +1553,16 @@
} // attempt loop
// Wait for all servers to be ok, and build the failure list
- waitForRemoteEndOfInit();
+ waitForRemoteEndOfInit(ieCtx);
// Servers that left in the list are those for which we could not test
// that they have been successfully initialized.
- if (!ieContext.failureList.isEmpty() && exportRootException == null)
+ if (!ieCtx.failureList.isEmpty() && exportRootException == null)
{
exportRootException = new DirectoryException(ResultCode.OTHER,
ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(
Long.toString(getGenerationID()),
- ieContext.failureList.toString()));
+ ieCtx.failureList.toString()));
}
// Don't forget to release IEcontext acquired at beginning.
@@ -1558,10 +1593,10 @@
* - wait it has finished the import and present the expected generationID,
* - build the failureList.
*/
- private void waitForRemoteStartOfInit()
+ private void waitForRemoteStartOfInit(IEContext ieCtx)
{
final Set<Integer> replicasWeAreWaitingFor =
- new HashSet<Integer>(ieContext.startList);
+ new HashSet<Integer>(ieCtx.startList);
if (debugEnabled())
TRACER.debugInfo(
@@ -1580,7 +1615,7 @@
+ " " + dsi.getStatus()
+ " " + dsi.getGenerationId()
+ " " + getGenerationID());
- if (ieContext.startList.contains(dsi.getDsId()))
+ if (ieCtx.startList.contains(dsi.getDsId()))
{
if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
{
@@ -1604,11 +1639,11 @@
}
while (!done && waitResultAttempt < 1200 && !broker.shuttingDown());
- ieContext.failureList.addAll(replicasWeAreWaitingFor);
+ ieCtx.failureList.addAll(replicasWeAreWaitingFor);
if (debugEnabled())
TRACER.debugInfo(
- "[IE] wait for start ends with " + ieContext.failureList);
+ "[IE] wait for start ends with " + ieCtx.failureList);
}
/**
@@ -1616,10 +1651,10 @@
* - wait it has finished the import and present the expected generationID,
* - build the failureList.
*/
- private void waitForRemoteEndOfInit()
+ private void waitForRemoteEndOfInit(IEContext ieCtx)
{
final Set<Integer> replicasWeAreWaitingFor =
- new HashSet<Integer>(ieContext.startList);
+ new HashSet<Integer>(ieCtx.startList);
if (debugEnabled())
TRACER.debugInfo(
@@ -1645,7 +1680,7 @@
while (it.hasNext())
{
int serverId = it.next();
- if (ieContext.failureList.contains(serverId))
+ if (ieCtx.failureList.contains(serverId))
{
/*
this server has already been in error during initialization
@@ -1701,11 +1736,11 @@
}
while (!done && !broker.shuttingDown()); // infinite wait
- ieContext.failureList.addAll(replicasWeAreWaitingFor);
+ ieCtx.failureList.addAll(replicasWeAreWaitingFor);
if (debugEnabled())
TRACER.debugInfo(
- "[IE] wait for end ends with " + ieContext.failureList);
+ "[IE] wait for end ends with " + ieCtx.failureList);
}
/**
@@ -1743,13 +1778,13 @@
*
* @param errorMsg The error message received.
*/
- private void processErrorMsg(ErrorMsg errorMsg)
+ private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx)
{
//Exporting must not be stopped on the first error, if we run initialize-all
- if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS)
+ if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
{
// The ErrorMsg is received while we have started an initialization
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER, errorMsg.getDetails()));
/*
@@ -1759,11 +1794,11 @@
* even after the nextInitAttemptDelay
* During the import, the ErrorMsg will be received by receiveEntryBytes
*/
- if (ieContext.initializeTask instanceof InitializeTask)
+ if (ieCtx.initializeTask instanceof InitializeTask)
{
// Update the task that initiated the import
- ((InitializeTask) ieContext.initializeTask)
- .updateTaskCompletionState(ieContext.getException());
+ ((InitializeTask) ieCtx.initializeTask)
+ .updateTaskCompletionState(ieCtx.getException());
releaseIEContext();
}
@@ -1781,6 +1816,7 @@
ReplicationMsg msg;
while (true)
{
+ IEContext ieCtx = ieContext;
try
{
// In the context of the total update, we don't want any automatic
@@ -1807,7 +1843,7 @@
else
{
// Handle connection issues
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT
.get(broker.getReplicationServer())));
return null;
@@ -1819,26 +1855,26 @@
{
EntryMsg entryMsg = (EntryMsg)msg;
byte[] entryBytes = entryMsg.getEntryBytes();
- ieContext.updateCounters(countEntryLimits(entryBytes));
+ ieCtx.updateCounters(countEntryLimits(entryBytes));
- if (ieContext.exporterProtocolVersion >=
+ if (ieCtx.exporterProtocolVersion >=
ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// check the msgCnt of the msg received to check ordering
- if (++ieContext.msgCnt != entryMsg.getMsgId())
+ if (++ieCtx.msgCnt != entryMsg.getMsgId())
{
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(
- String.valueOf(ieContext.msgCnt),
+ String.valueOf(ieCtx.msgCnt),
String.valueOf(entryMsg.getMsgId()))));
return null;
}
// send the ack of flow control mgmt
- if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
+ if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0)
{
final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
- getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
+ getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt);
broker.publish(amsg, false);
if (debugEnabled())
{
@@ -1864,12 +1900,12 @@
This is an error termination during the import
The error is stored and the import is ended by returning null
*/
- if (ieContext.getException() == null)
+ if (ieCtx.getException() == null)
{
ErrorMsg errMsg = (ErrorMsg)msg;
- if (errMsg.getCreationTime() > ieContext.startTime)
+ if (errMsg.getCreationTime() > ieCtx.startTime)
{
- ieContext.setException(
+ ieCtx.setException(
new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
return null;
}
@@ -1880,15 +1916,15 @@
// Other messages received during an import are trashed except
// the topologyMsg.
if (msg instanceof TopologyMsg
- && isRemoteDSConnected(ieContext.importSource) == null)
+ && isRemoteDSConnected(ieCtx.importSource) == null)
{
Message errMsg =
Message.raw(Category.SYNC, Severity.NOTICE,
ERR_INIT_EXPORTER_DISCONNECTION.get(
getBaseDNString(),
Integer.toString(getServerId()),
- Integer.toString(ieContext.importSource)));
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ Integer.toString(ieCtx.importSource)));
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER, errMsg));
return null;
}
@@ -1896,7 +1932,7 @@
}
catch(Exception e)
{
- ieContext.setExceptionIfNoneSet(new DirectoryException(
+ ieCtx.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER,
ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
}
@@ -1957,9 +1993,10 @@
Arrays.toString(lDIFEntry));
// build the message
+ IEContext ieCtx = ieContext;
EntryMsg entryMessage = new EntryMsg(
- getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
- ++ieContext.msgCnt);
+ getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
+ ++ieCtx.msgCnt);
// Waiting the slowest loop
while (!broker.shuttingDown())
@@ -1969,30 +2006,30 @@
server that have been stored by the listener thread in the ieContext,
we just abandon the export by throwing an exception.
*/
- if (ieContext.getException() != null)
+ if (ieCtx.getException() != null)
{
- throw new IOException(ieContext.getException().getMessage());
+ throw new IOException(ieCtx.getException().getMessage());
}
- int slowestServerId = ieContext.getSlowestServer();
+ int slowestServerId = ieCtx.getSlowestServer();
if (isRemoteDSConnected(slowestServerId)==null)
{
- ieContext.setException(new DirectoryException(ResultCode.OTHER,
+ ieCtx.setException(new DirectoryException(ResultCode.OTHER,
ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(
- Integer.toString(ieContext.getSlowestServer()))));
+ Integer.toString(ieCtx.getSlowestServer()))));
throw new IOException("IOException with nested DirectoryException",
- ieContext.getException());
+ ieCtx.getException());
}
- int ourLastExportedCnt = ieContext.msgCnt;
- int slowestCnt = ieContext.ackVals.get(slowestServerId);
+ int ourLastExportedCnt = ieCtx.msgCnt;
+ int slowestCnt = ieCtx.ackVals.get(slowestServerId);
if (debugEnabled())
TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " +
" our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
- if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow)
+ if ((ourLastExportedCnt - slowestCnt) > ieCtx.initWindow)
{
if (debugEnabled())
TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting");
@@ -2003,13 +2040,13 @@
// process any connection error
if (broker.hasConnectionError()
- || broker.getNumLostConnections() != ieContext.initNumLostConnections)
+ || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
{
// publish failed - store the error in the ieContext ...
DirectoryException de = new DirectoryException(ResultCode.OTHER,
ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
Integer.toString(broker.getRsServerId())));
- ieContext.setExceptionIfNoneSet(de);
+ ieCtx.setExceptionIfNoneSet(de);
// .. and abandon the export by throwing an exception.
throw new IOException(de.getMessage());
}
@@ -2031,13 +2068,13 @@
// process any publish error
if (!sent
|| broker.hasConnectionError()
- || broker.getNumLostConnections() != ieContext.initNumLostConnections)
+ || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
{
// publish failed - store the error in the ieContext ...
DirectoryException de = new DirectoryException(ResultCode.OTHER,
ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(
Integer.toString(broker.getRsServerId())));
- ieContext.setExceptionIfNoneSet(de);
+ ieCtx.setExceptionIfNoneSet(de);
// .. and abandon the export by throwing an exception.
throw new IOException(de.getMessage());
}
@@ -2045,11 +2082,11 @@
// publish succeeded
try
{
- ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length));
+ ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length));
}
catch (DirectoryException de)
{
- ieContext.setExceptionIfNoneSet(de);
+ ieCtx.setExceptionIfNoneSet(de);
// .. and abandon the export by throwing an exception.
throw new IOException(de.getMessage());
}
@@ -2157,13 +2194,12 @@
*/
acquireIEContext(true); //test and set if no import already in progress
- ieContext.initializeTask = initTask;
- ieContext.attemptCnt = 0;
- ieContext.initReqMsgSent = new InitializeRequestMsg(
+ IEContext ieCtx = ieContext;
+ ieCtx.initializeTask = initTask;
+ ieCtx.attemptCnt = 0;
+ ieCtx.initReqMsgSent = new InitializeRequestMsg(
getBaseDN(), getServerId(), source, getInitWindow());
-
- // Publish Init request msg
- broker.publish(ieContext.initReqMsgSent);
+ broker.publish(ieCtx.initReqMsgSent);
/*
The normal success processing is now to receive InitTargetMsg then
@@ -2219,6 +2255,7 @@
int source = initTargetMsgReceived.getSenderID();
+ IEContext ieCtx = ieContext;
try
{
// Log starting
@@ -2240,12 +2277,12 @@
}
// Initialize stuff
- ieContext.importSource = source;
- ieContext.initializeCounters(initTargetMsgReceived.getEntryCount());
- ieContext.initWindow = initTargetMsgReceived.getInitWindow();
+ ieCtx.importSource = source;
+ ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
+ ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
// Protocol version is -1 when not known.
- ieContext.exporterProtocolVersion = getProtocolVersion(source);
- initFromTask = (InitializeTask)ieContext.initializeTask;
+ ieCtx.exporterProtocolVersion = getProtocolVersion(source);
+ initFromTask = (InitializeTask) ieCtx.initializeTask;
// Launch the import
importBackend(new ReplInputStream(this));
@@ -2257,14 +2294,14 @@
Store the exception raised. It will be considered if no other exception
has been previously stored in the context
*/
- ieContext.setExceptionIfNoneSet(e);
+ ieCtx.setExceptionIfNoneSet(e);
}
finally
{
if (debugEnabled())
{
TRACER.debugInfo("[IE] Domain=" + this
- + " ends import with exception=" + ieContext.getException()
+ + " ends import with exception=" + ieCtx.getException()
+ " connected=" + broker.isConnected());
}
@@ -2278,10 +2315,10 @@
*/
broker.reStart(false);
- if (ieContext.getException() != null
+ if (ieCtx.getException() != null
&& broker.isConnected()
&& initFromTask != null
- && ++ieContext.attemptCnt < 2)
+ && ++ieCtx.attemptCnt < 2)
{
/*
Worth a new attempt
@@ -2300,13 +2337,13 @@
the request
*/
logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
- ieContext.getException().getLocalizedMessage()));
+ ieCtx.getException().getLocalizedMessage()));
- broker.publish(ieContext.initReqMsgSent);
+ broker.publish(ieCtx.initReqMsgSent);
- ieContext.initializeCounters(0);
- ieContext.exception = null;
- ieContext.msgCnt = 0;
+ ieCtx.initializeCounters(0);
+ ieCtx.exception = null;
+ ieCtx.msgCnt = 0;
// Processing of the received initTargetMsgReceived is done
// let's wait for the next one
@@ -2320,7 +2357,7 @@
*/
logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
e.getLocalizedMessage(),
- ieContext.getException().getLocalizedMessage()));
+ ieCtx.getException().getLocalizedMessage()));
}
}
@@ -2330,19 +2367,19 @@
if (debugEnabled())
{
TRACER.debugInfo("[IE] Domain=" + this
- + " ends initialization with exception=" + ieContext.getException()
+ + " ends initialization with exception=" + ieCtx.getException()
+ " connected=" + broker.isConnected()
+ " task=" + initFromTask
- + " attempt=" + ieContext.attemptCnt);
+ + " attempt=" + ieCtx.attemptCnt);
}
try
{
- if (broker.isConnected() && ieContext.getException() != null)
+ if (broker.isConnected() && ieCtx.getException() != null)
{
// Let's notify the exporter
ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
- ieContext.getException().getMessageObject());
+ ieCtx.getException().getMessageObject());
broker.publish(errorMsg);
}
/*
@@ -2353,7 +2390,7 @@
*/
if (initFromTask != null)
{
- initFromTask.updateTaskCompletionState(ieContext.getException());
+ initFromTask.updateTaskCompletionState(ieCtx.getException());
}
}
finally
@@ -2361,8 +2398,8 @@
Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
getBaseDNString(), initTargetMsgReceived.getSenderID(),
getServerId(),
- (ieContext.getException() == null ? ""
- : ieContext.getException().getLocalizedMessage()));
+ (ieCtx.getException() == null ? ""
+ : ieCtx.getException().getLocalizedMessage()));
logError(msg);
releaseIEContext();
} // finally
@@ -3449,43 +3486,13 @@
}
/**
- * Returns a boolean indicating if a total update import is currently
- * in Progress.
+ * Returns the Import/Export context associated to this ReplicationDomain.
*
- * @return A boolean indicating if a total update import is currently
- * in Progress.
+ * @return the Import/Export context associated to this ReplicationDomain
*/
- public boolean importInProgress()
+ protected IEContext getImportExportContext()
{
- return ieContext != null && ieContext.importInProgress;
- }
-
- /**
- * Returns a boolean indicating if a total update export is currently
- * in Progress.
- *
- * @return A boolean indicating if a total update export is currently
- * in Progress.
- */
- public boolean exportInProgress()
- {
- return ieContext != null && !ieContext.importInProgress;
- }
-
- /**
- * Returns the number of entries still to be processed when a total update
- * is in progress.
- *
- * @return The number of entries still to be processed when a total update
- * is in progress.
- */
- long getLeftEntryCount()
- {
- if (ieContext != null)
- {
- return ieContext.entryLeftCount;
- }
- return 0;
+ return ieContext;
}
/**
@@ -3501,24 +3508,6 @@
}
/**
- * Returns the total number of entries to be processed when a total update
- * is in progress.
- *
- * @return The total number of entries to be processed when a total update
- * is in progress.
- */
- long getTotalEntryCount()
- {
- if (ieContext != null)
- {
- return ieContext.entryCount;
- }
- return 0;
- }
-
-
-
- /**
* Set the attributes configured on a server to be included in the ECL.
*
* @param serverId
@@ -3617,7 +3606,7 @@
* The serverId for which we want the include attributes.
* @return The attributes.
*/
- public Set<String> getEclIncludes(int serverId)
+ Set<String> getEclIncludes(int serverId)
{
synchronized (eclIncludesLock)
{
@@ -3635,7 +3624,7 @@
* The serverId for which we want the include attributes.
* @return The attributes.
*/
- public Set<String> getEclIncludesForDeletes(int serverId)
+ Set<String> getEclIncludesForDeletes(int serverId)
{
synchronized (eclIncludesLock)
{
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java b/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
index ab272c0..589f539 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.service;
@@ -34,6 +34,7 @@
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.service.ReplicationDomain.IEContext;
import org.opends.server.types.*;
/**
@@ -130,21 +131,15 @@
String.valueOf(domain.getGenerationID())));
// Add import/export monitoring attributes
- if (domain.importInProgress())
+ final IEContext ieContext = domain.getImportExportContext();
+ if (ieContext != null)
{
- addMonitorData(attributes, "total-update", "import");
- addMonitorData(
- attributes, "total-update-entry-count", domain.getTotalEntryCount());
- addMonitorData(
- attributes, "total-update-entry-left", domain.getLeftEntryCount());
- }
- if (domain.exportInProgress())
- {
- addMonitorData(attributes, "total-update", "export");
- addMonitorData(
- attributes, "total-update-entry-count", domain.getTotalEntryCount());
- addMonitorData(
- attributes, "total-update-entry-left", domain.getLeftEntryCount());
+ addMonitorData(attributes, "total-update",
+ ieContext.importInProgress() ? "import" : "export");
+ addMonitorData(attributes, "total-update-entry-count",
+ ieContext.getTotalEntryCount());
+ addMonitorData(attributes, "total-update-entry-left",
+ ieContext.getLeftEntryCount());
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
index 668622a..cae6348 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -20,7 +20,7 @@
*
* CDDL HEADER END
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 9341c30..26405f8 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index 5f30b31..4ce3844 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index fe1c98b..0e2057a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index 52b6ba9..2108816 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS
+ * Portions Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.service;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
index d2a0621..d6b5ca1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS
+ * Portions Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.service;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
index a9c222c..8945c4b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -317,12 +317,7 @@
"exportAndImportData", 100);
SortedSet<String> servers = newSortedSet("localhost:" + replServerPort);
- StringBuilder exportedDataBuilder = new StringBuilder();
- for (int i =0; i<ENTRYCOUNT; i++)
- {
- exportedDataBuilder.append("key : value"+i+"\n\n");
- }
- String exportedData=exportedDataBuilder.toString();
+ String exportedData = buildExportedData(ENTRYCOUNT);
domain1 = new FakeReplicationDomain(
testService, serverId1, servers, 0, exportedData, null, ENTRYCOUNT);
@@ -343,18 +338,8 @@
}
}
- int count = 0;
- while ((importedData.length() < exportedData.length()) && (count < 500))
- {
- count ++;
- Thread.sleep(100);
- }
- assertEquals(domain2.getLeftEntryCount(), 0,
- "LeftEntryCount for export is " + domain2.getLeftEntryCount());
- assertEquals(domain1.getLeftEntryCount(), 0,
- "LeftEntryCount for import is " + domain1.getLeftEntryCount());
- assertEquals(importedData.length(), exportedData.length());
- assertEquals(importedData.toString(), exportedData);
+ waitEndExport(exportedData, importedData);
+ assertExportSucessful(domain1, domain2, exportedData, importedData);
}
finally
{
@@ -393,12 +378,7 @@
SortedSet<String> servers1 = newSortedSet("localhost:" + replServerPort1);
SortedSet<String> servers2 = newSortedSet("localhost:" + replServerPort2);
- StringBuilder exportedDataBuilder = new StringBuilder();
- for (int i =0; i<ENTRYCOUNT; i++)
- {
- exportedDataBuilder.append("key : value"+i+"\n\n");
- }
- String exportedData=exportedDataBuilder.toString();
+ String exportedData = buildExportedData(ENTRYCOUNT);
domain1 = new FakeReplicationDomain(
testService, 1, servers1, 0, exportedData, null, ENTRYCOUNT);
@@ -408,18 +388,8 @@
domain2.initializeFromRemote(1);
- int count = 0;
- while ((importedData.length() < exportedData.length()) && (count < 500))
- {
- count ++;
- Thread.sleep(100);
- }
- assertEquals(domain2.getLeftEntryCount(), 0,
- "LeftEntryCount for export is " + domain2.getLeftEntryCount());
- assertEquals(domain1.getLeftEntryCount(), 0,
- "LeftEntryCount for import is " + domain1.getLeftEntryCount());
- assertEquals(importedData.length(), exportedData.length());
- assertEquals(importedData.toString(), exportedData);
+ waitEndExport(exportedData, importedData);
+ assertExportSucessful(domain1, domain2, exportedData, importedData);
}
finally
{
@@ -428,6 +398,35 @@
}
}
+ private String buildExportedData(final int ENTRYCOUNT)
+ {
+ final StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < ENTRYCOUNT; i++)
+ {
+ sb.append("key : value" + i + "\n\n");
+ }
+ return sb.toString();
+ }
+
+ private void waitEndExport(String exportedData, StringBuilder importedData) throws Exception
+ {
+ int count = 0;
+ while (importedData.length() < exportedData.length() && count < 500)
+ {
+ count ++;
+ Thread.sleep(100);
+ }
+ }
+
+ private void assertExportSucessful(ReplicationDomain domain1,
+ ReplicationDomain domain2, String exportedData, StringBuilder importedData)
+ {
+ assertEquals(domain2.ieContext.getLeftEntryCount(), 0, "Wrong LeftEntryCount for export");
+ assertEquals(domain1.ieContext.getLeftEntryCount(), 0, "Wrong LeftEntryCount for import");
+ assertEquals(importedData.length(), exportedData.length());
+ assertEquals(importedData.toString(), exportedData);
+ }
+
/**
* Sender side of the Total Update Perf test.
* The goal of this test is to measure the performance
--
Gitblit v1.10.0