opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -22,7 +22,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS * Portions copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.common; opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
@@ -22,7 +22,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS * Portions copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.plugin; @@ -42,6 +42,7 @@ import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.replication.plugin.LDAPReplicationDomain.*; import static org.opends.server.util.StaticUtils.*; /** * This class implements a Directory Server plugin that is used in fractional @@ -61,7 +62,7 @@ { /** * Holds the fractional configuration and if available the replication domain * matching this import session (they form the importfractional context). * matching this import session (they form the import fractional context). * Domain is available if the server is online (import-ldif, online full * update..) otherwise, this is an import-ldif with server off. The key is the * ImportConfig object of the session which acts as a cookie for the whole @@ -126,9 +127,7 @@ super(); } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override() public final void initializePlugin(Set<PluginType> pluginTypes, FractionalLDIFImportPluginCfg configuration) @@ -153,9 +152,7 @@ } } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override() public final void finalizePlugin() { @@ -217,9 +214,7 @@ return FractionalConfig.toFractionalConfig(matchingReplicatedDomainCfg); } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override() public final void doLDIFImportEnd(LDIFImportConfig importConfig) { @@ -343,26 +338,20 @@ } // Compare backend and local fractional configuration boolean sameConfig = isFractionalConfigConsistent(localFractionalConfig, exclIt, inclIt); if (isFractionalConfigConsistent(localFractionalConfig, exclIt, inclIt)) { // local and remote non/fractional config are equivalent : // follow import, no need to go with filtering as remote backend // should be ok // let import finish return PluginResult.ImportLDIF.continueEntryProcessing(); } if (localFractionalConfig.isFractional()) { // Local domain is fractional if (sameConfig) { // Both local and remote fractional configuration are equivalent : // follow import, no need to go with filtering as remote backend // should be ok return PluginResult.ImportLDIF.continueEntryProcessing(); } // Local domain is fractional, remote domain has not same config boolean remoteDomainHasSomeConfig = false; if ((exclAttr != null && (exclAttr.size() > 0)) || (inclAttr != null && (inclAttr.size() > 0))) { remoteDomainHasSomeConfig = true; } boolean remoteDomainHasSomeConfig = isNotEmpty(exclAttr) || isNotEmpty(inclAttr); if (remoteDomainHasSomeConfig) { LDAPReplicationDomain domain = importFractionalContext.getDomain(); @@ -372,7 +361,6 @@ // is different : stop import (error will be logged when import is // stopped) domain.setImportErrorMessageId(IMPORT_ERROR_MESSAGE_BAD_REMOTE); domain.setFollowImport(false); return PluginResult.ImportLDIF.stopEntryProcessing(null); } @@ -384,16 +372,10 @@ // Local domain is fractional but remote domain has no config : // flush local config into root entry and follow import with filtering flushFractionalConfigIntoEntry(localFractionalConfig, entry); } else } else { // Local domain is not fractional if (sameConfig) { // None of the local or remote domain has fractional config : nothing // more to do : let import finish return PluginResult.ImportLDIF.continueEntryProcessing(); } LDAPReplicationDomain domain = importFractionalContext.getDomain(); if (domain != null) { @@ -401,7 +383,6 @@ //local domain should be configured with the same config as remote one domain.setImportErrorMessageId( IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL); domain.setFollowImport(false); return PluginResult.ImportLDIF.stopEntryProcessing(null); } @@ -420,6 +401,11 @@ return PluginResult.ImportLDIF.continueEntryProcessing(); } private boolean isNotEmpty(Attribute attr) { return attr != null && attr.size() > 0; } private Attribute getAttribute(String attributeName, Entry entry) { AttributeType attrType = DirectoryServer.getAttributeType(attributeName); @@ -459,51 +445,19 @@ String fractAttribute = fractionalExclusive ? REPLICATION_FRACTIONAL_EXCLUDE : REPLICATION_FRACTIONAL_INCLUDE; AttributeBuilder attrBuilder = new AttributeBuilder(fractAttribute); boolean somethingToFlush = false; // Add attribute values for all classes int size = fractionalAllClassesAttributes.size(); if (size > 0) { String fracValue = "*:"; int i = 1; for (String attrName : fractionalAllClassesAttributes) { fracValue += attrName; if (i < size) { fracValue += ","; } i++; } somethingToFlush = true; attrBuilder.add(fracValue); } boolean somethingToFlush = add(attrBuilder, "*", fractionalAllClassesAttributes); // Add attribute values for specific classes size = fractionalSpecificClassesAttributes.size(); if (size > 0) if (fractionalSpecificClassesAttributes.size() > 0) { for (String className : fractionalSpecificClassesAttributes.keySet()) for (Map.Entry<String, Set<String>> specific : fractionalSpecificClassesAttributes.entrySet()) { int valuesSize = fractionalSpecificClassesAttributes.get(className).size(); if (valuesSize > 0) if (add(attrBuilder, specific.getKey(), specific.getValue())) { String fracValue = className + ":"; int i = 1; for (String attrName : fractionalSpecificClassesAttributes.get( className)) { fracValue += attrName; if (i < valuesSize) { fracValue += ","; } i++; } somethingToFlush = true; attrBuilder.add(fracValue); } } } @@ -517,9 +471,18 @@ } } /** * {@inheritDoc} */ private static boolean add(AttributeBuilder attrBuilder, String className, Set<String> values) { if (values.size() > 0) { attrBuilder.add(className + ":" + collectionToString(values, ",")); return true; } return false; } /** {@inheritDoc} */ @Override() public boolean isConfigurationAcceptable(PluginCfg configuration, List<Message> unacceptableReasons) @@ -527,9 +490,7 @@ return true; } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public boolean isConfigurationChangeAcceptable( FractionalLDIFImportPluginCfg configuration, @@ -538,9 +499,7 @@ return true; } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public ConfigChangeResult applyConfigurationChange( FractionalLDIFImportPluginCfg configuration) opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -22,7 +22,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2013 ForgeRock AS * Portions Copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.plugin; @@ -41,7 +41,7 @@ import org.opends.messages.MessageBuilder; import org.opends.messages.Severity; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*; import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy; import org.opends.server.admin.std.server.ExternalChangelogDomainCfg; import org.opends.server.admin.std.server.ReplicationDomainCfg; import org.opends.server.api.AlertGenerator; @@ -71,7 +71,7 @@ import org.opends.server.util.LDIFReader; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; import org.opends.server.workflowelement.localbackend.*; import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation; import static org.opends.messages.ReplicationMessages.*; import static org.opends.messages.ToolMessages.*; @@ -285,13 +285,6 @@ private boolean forceBadDataSet = false; /** * This flag is used by the fractional replication ldif import plugin to * stop the (online) import process if a fractional configuration * inconsistency is detected by it. */ private boolean followImport = true; /** * The message id to be used when an import is stopped with error by * the fractional replication ldif import plugin. */ @@ -481,7 +474,7 @@ storeECLConfiguration(configuration); solveConflictFlag = isSolveConflict(configuration); Backend backend = retrievesBackend(getBaseDN()); Backend backend = getBackend(); if (backend == null) { throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get( @@ -544,20 +537,22 @@ * error by the fractional replication ldif import plugin. * @param importErrorMessageId The message to use. */ public void setImportErrorMessageId(int importErrorMessageId) void setImportErrorMessageId(int importErrorMessageId) { this.importErrorMessageId = importErrorMessageId; } /** * Sets the boolean telling if the online import currently in progress should * continue. * @param followImport The boolean telling if the online import currently in * progress should continue. * This flag is used by the fractional replication ldif import plugin to stop * the (online) import process if a fractional configuration inconsistency is * detected by it. * * @return true if the online import currently in progress should continue, * false otherwise. */ public void setFollowImport(boolean followImport) private boolean isFollowImport() { this.followImport = followImport; return importErrorMessageId == -1; } /** @@ -1067,7 +1062,7 @@ * @return true if the operation contains some attributes subject to filtering * by the fractional configuration */ public boolean fractionalFilterOperation( private boolean fractionalFilterOperation( PreOperationAddOperation addOperation, boolean performFiltering) { return fractionalRemoveAttributesFromEntry(fractionalConfig, @@ -1088,7 +1083,7 @@ * @return true if the operation is inconsistent with fractional * configuration */ public boolean fractionalFilterOperation( private boolean fractionalFilterOperation( PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering) { // Quick exit if not called for analyze and @@ -1406,7 +1401,7 @@ * @return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES, * FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES or FRACTIONAL_BECOME_NO_OP */ public int fractionalFilterOperation(PreOperationModifyOperation private int fractionalFilterOperation(PreOperationModifyOperation modifyOperation, boolean performFiltering) { /* @@ -1494,7 +1489,7 @@ @Override protected byte[] receiveEntryBytes() { if (followImport) if (isFollowImport()) { // Ok, next entry is allowed to be received return super.receiveEntryBytes(); @@ -1505,19 +1500,20 @@ // process: // This is an error termination during the import // The error is stored and the import is ended by returning null final IEContext ieCtx = getImportExportContext(); Message msg = null; switch (importErrorMessageId) { case IMPORT_ERROR_MESSAGE_BAD_REMOTE: msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE.get( getBaseDNString(), Integer.toString(ieContext.getImportSource())); getBaseDNString(), Integer.toString(ieCtx.getImportSource())); break; case IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL: msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL.get( getBaseDNString(), Integer.toString(ieContext.getImportSource())); getBaseDNString(), Integer.toString(ieCtx.getImportSource())); break; } ieContext.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg)); ieCtx.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg)); return null; } @@ -1552,7 +1548,7 @@ * @return A SynchronizationProviderResult indicating if the operation * can continue. */ public SynchronizationProviderResult handleConflictResolution( SynchronizationProviderResult handleConflictResolution( PreOperationDeleteOperation deleteOperation) { if (!deleteOperation.isSynchronizationOperation() && !brokerIsConnected()) @@ -1624,7 +1620,7 @@ * @return A SynchronizationProviderResult indicating if the operation * can continue. */ public SynchronizationProviderResult handleConflictResolution( SynchronizationProviderResult handleConflictResolution( PreOperationAddOperation addOperation) { if (!addOperation.isSynchronizationOperation() && !brokerIsConnected()) @@ -1754,7 +1750,7 @@ * @return A SynchronizationProviderResult indicating if the operation * can continue. */ public SynchronizationProviderResult handleConflictResolution( SynchronizationProviderResult handleConflictResolution( PreOperationModifyDNOperation modifyDNOperation) { if (!modifyDNOperation.isSynchronizationOperation() && !brokerIsConnected()) @@ -1872,7 +1868,7 @@ * @param modifyOperation the operation * @return code indicating is operation must proceed */ public SynchronizationProviderResult handleConflictResolution( SynchronizationProviderResult handleConflictResolution( PreOperationModifyOperation modifyOperation) { if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected()) @@ -2243,7 +2239,7 @@ * * @return The number of updates in the pending list */ public int getPendingUpdatesCount() private int getPendingUpdatesCount() { if (pendingChanges != null) return pendingChanges.size(); @@ -2255,7 +2251,7 @@ * * @return The number of updates replayed successfully */ public int getNumReplayedPostOpCalled() private int getNumReplayedPostOpCalled() { return numReplayedPostOpCalled; } @@ -2518,7 +2514,7 @@ * * @param csn the CSN of the operation with error. */ public void updateError(CSN csn) private void updateError(CSN csn) { try { @@ -3217,7 +3213,7 @@ * Get the number of modify conflicts successfully resolved. * @return The number of modify conflicts successfully resolved. */ public int getNumResolvedModifyConflicts() private int getNumResolvedModifyConflicts() { return numResolvedModifyConflicts.get(); } @@ -3226,7 +3222,7 @@ * Get the number of naming conflicts successfully resolved. * @return The number of naming conflicts successfully resolved. */ public int getNumResolvedNamingConflicts() private int getNumResolvedNamingConflicts() { return numResolvedNamingConflicts.get(); } @@ -3235,7 +3231,7 @@ * Get the number of unresolved conflicts. * @return The number of unresolved conflicts. */ public int getNumUnresolvedNamingConflicts() private int getNumUnresolvedNamingConflicts() { return numUnresolvedNamingConflicts.get(); } @@ -3322,7 +3318,7 @@ * @return The computed generationId. * @throws DirectoryException When an error occurs. */ public long computeGenerationId() throws DirectoryException private long computeGenerationId() throws DirectoryException { long genId = exportBackend(null, true); @@ -3503,7 +3499,7 @@ * Do whatever is needed when a backup is started. * We need to make sure that the serverState is correctly save. */ public void backupStart() void backupStart() { state.save(); } @@ -3511,7 +3507,7 @@ /** * Do whatever is needed when a backup is finished. */ public void backupEnd() void backupEnd() { // Nothing is needed at the moment } @@ -3549,7 +3545,7 @@ private long exportBackend(OutputStream output, boolean checksumOutput) throws DirectoryException { Backend backend = retrievesBackend(getBaseDN()); Backend backend = getBackend(); // Acquire a shared lock for the backend. try @@ -3678,20 +3674,6 @@ } /** * Retrieves the backend related to the domain. * * @return The backend of that domain. * @param baseDN The baseDN to retrieve the backend */ protected static Backend retrievesBackend(DN baseDN) { // Retrieves the backend related to this domain return DirectoryServer.getBackend(baseDN); } /** * Process backend before import. * * @param backend @@ -3731,13 +3713,14 @@ { LDIFImportConfig importConfig = null; Backend backend = retrievesBackend(getBaseDN()); Backend backend = getBackend(); IEContext ieCtx = getImportExportContext(); try { if (!backend.supportsLDIFImport()) { ieContext.setExceptionIfNoneSet(new DirectoryException(OTHER, ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER, ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID()))); } else @@ -3754,7 +3737,6 @@ importConfig.setInvokeImportPlugins(true); // Reset the follow import flag and message before starting the import importErrorMessageId = -1; followImport = true; // TODO How to deal with rejected entries during the import importConfig.writeRejectedEntries( @@ -3771,7 +3753,7 @@ } catch(Exception e) { ieContext.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER, ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER, ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(e)))); } finally @@ -3783,12 +3765,12 @@ { importConfig.close(); closeBackendImport(backend); // Re-enable backend backend = retrievesBackend(getBaseDN()); backend = getBackend(); } loadDataState(); if (ieContext.getException() != null) if (ieCtx.getException() != null) { // When an error occurred during an import, most of times // the generationId coming in the root entry of the imported data, @@ -3804,14 +3786,14 @@ // so we don't bother about the new Exception. // However if there was no Exception before we want // to return this Exception to the task creator. ieContext.setExceptionIfNoneSet(new DirectoryException( ieCtx.setExceptionIfNoneSet(new DirectoryException( ResultCode.OTHER, ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(fe)))); } } if (ieContext.getException() != null) throw ieContext.getException(); if (ieCtx.getException() != null) throw ieCtx.getException(); } /** @@ -3887,9 +3869,9 @@ * Returns the backend associated to this domain. * @return The associated backend. */ public Backend getBackend() private Backend getBackend() { return retrievesBackend(getBaseDN()); return DirectoryServer.getBackend(getBaseDN()); } /* @@ -3946,7 +3928,7 @@ } // Check that the base DN is configured as a base-dn of the directory server if (retrievesBackend(dn) == null) if (DirectoryServer.getBackend(dn) == null) { unacceptableReasons.add(ERR_UNKNOWN_DN.get(dn.toString())); return false; @@ -3997,7 +3979,7 @@ ReplicationDomainCfg configuration, List<Message> unacceptableReasons) { // Check that a import/export is not in progress if (importInProgress() || exportInProgress()) if (ieRunning()) { unacceptableReasons.add( NOTE_ERR_CANNOT_CHANGE_CONFIG_DURING_TOTAL_UPDATE.get()); @@ -4059,7 +4041,7 @@ * Remove from this domain configuration, the configuration of the * external change log. */ public void removeECLDomainCfg() private void removeECLDomainCfg() { try { @@ -4082,8 +4064,8 @@ * @param domCfg The provided configuration. * @throws ConfigException When an error occurred. */ public void storeECLConfiguration(ReplicationDomainCfg domCfg) throws ConfigException private void storeECLConfiguration(ReplicationDomainCfg domCfg) throws ConfigException { ExternalChangelogDomainCfg eclDomCfg = null; // create the ecl config if it does not exist @@ -4434,7 +4416,7 @@ @Override public long countEntries() throws DirectoryException { Backend backend = retrievesBackend(getBaseDN()); Backend backend = getBackend(); if (!backend.supportsLDIFExport()) { Message msg = ERR_INIT_EXPORT_NOT_SUPPORTED.get(backend.getBackendID()); @@ -5105,7 +5087,7 @@ * Specifies whether this domain is enabled/disabled regarding the ECL. * @return enabled/disabled for the ECL. */ public boolean isECLEnabled() boolean isECLEnabled() { return this.eclDomain.isEnabled(); } @@ -5116,7 +5098,7 @@ * * @return the purge delay. */ public long getHistoricalPurgeDelay() long getHistoricalPurgeDelay() { return config.getConflictsHistoricalPurgeDelay() * 60 * 1000; } opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -22,7 +22,7 @@ * * * Copyright 2008-2009 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS * Portions copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.protocol; opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2013 ForgeRock AS * Portions Copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.service; @@ -146,14 +146,14 @@ * The context related to an import or export being processed * Null when none is being processed. */ protected IEContext ieContext = null; volatile IEContext ieContext; /** * The Thread waiting for incoming update messages for this domain and pushing * them to the global incoming update message queue for later processing by * replay threads. */ private volatile DirectoryThread listenerThread = null; private volatile DirectoryThread listenerThread; /** * A set of counters used for Monitoring. @@ -732,7 +732,8 @@ else if (msg instanceof ErrorMsg) { ErrorMsg errorMsg = (ErrorMsg)msg; if (ieContext != null) IEContext ieCtx = ieContext; if (ieCtx != null) { /* This is an error termination for the 2 following cases : @@ -750,10 +751,10 @@ " baseDN: " + getBaseDN() + " Error Msg received: " + errorMsg); if (errorMsg.getCreationTime() > ieContext.startTime) if (errorMsg.getCreationTime() > ieCtx.startTime) { // consider only ErrorMsg that relate to the current import/export processErrorMsg(errorMsg); processErrorMsg(errorMsg, ieCtx); } else { @@ -784,10 +785,11 @@ } else if (msg instanceof InitializeRcvAckMsg) { if (ieContext != null) IEContext ieCtx = ieContext; if (ieCtx != null) { InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg; ieContext.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck()); ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck()); } // Trash this msg When no input/export is running/should never happen } @@ -1044,7 +1046,7 @@ private long entryLeftCount = 0; /** Exception raised during the initialization. */ private DirectoryException exception = null; private DirectoryException exception; /** Whether the context is related to an import or an export. */ private boolean importInProgress; @@ -1061,7 +1063,7 @@ /** * Request message sent when this server has the initializeFromRemote task. */ private InitializeRequestMsg initReqMsgSent = null; private InitializeRequestMsg initReqMsgSent; /** * Start time of the initialization process. ErrorMsg timestamped before @@ -1116,12 +1118,47 @@ } /** * Returns a boolean indicating if a total update import is currently in * Progress. * * @return A boolean indicating if a total update import is currently in * Progress. */ public boolean importInProgress() { return importInProgress; } /** * Returns the total number of entries to be processed when a total update * is in progress. * * @return The total number of entries to be processed when a total update * is in progress. */ long getTotalEntryCount() { return entryCount; } /** * Returns the number of entries still to be processed when a total update * is in progress. * * @return The number of entries still to be processed when a total update * is in progress. */ long getLeftEntryCount() { return entryLeftCount; } /** * Initializes the import/export counters with the provider value. * @param total Total number of entries to be processed. * @throws DirectoryException if an error occurred. */ private void initializeCounters(long total) throws DirectoryException private void initializeCounters(long total) throws DirectoryException { entryCount = total; entryLeftCount = total; @@ -1150,8 +1187,7 @@ * * @throws DirectoryException if an error occurred. */ public void updateCounters(int entriesDone) throws DirectoryException public void updateCounters(int entriesDone) throws DirectoryException { entryLeftCount -= entriesDone; @@ -1358,6 +1394,7 @@ - to update the task with the server(s) where this test failed */ IEContext ieCtx = ieContext; if (serverToInitialize == RoutableMsg.ALL_SERVERS) { logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get( @@ -1365,7 +1402,7 @@ for (DSInfo dsi : getReplicasList()) { ieContext.startList.add(dsi.getDsId()); ieCtx.startList.add(dsi.getDsId()); } // We manage the list of servers with which a flow control can be enabled @@ -1373,7 +1410,7 @@ { if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) { ieContext.setAckVal(dsi.getDsId(), 0); ieCtx.setAckVal(dsi.getDsId(), 0); } } } @@ -1382,7 +1419,7 @@ logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(), getBaseDNString(), getServerId(), serverToInitialize)); ieContext.startList.add(serverToInitialize); ieCtx.startList.add(serverToInitialize); // We manage the list of servers with which a flow control can be enabled for (DSInfo dsi : getReplicasList()) @@ -1390,7 +1427,7 @@ if (dsi.getDsId() == serverToInitialize && dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) { ieContext.setAckVal(dsi.getDsId(), 0); ieCtx.setAckVal(dsi.getDsId(), 0); } } } @@ -1402,34 +1439,34 @@ { try { ieContext.exportTarget = serverToInitialize; ieCtx.exportTarget = serverToInitialize; if (initTask != null) { ieContext.initializeTask = initTask; ieCtx.initializeTask = initTask; } ieContext.initializeCounters(this.countEntries()); ieContext.msgCnt = 0; ieContext.initNumLostConnections = broker.getNumLostConnections(); ieContext.initWindow = initWindow; ieCtx.initializeCounters(this.countEntries()); ieCtx.msgCnt = 0; ieCtx.initNumLostConnections = broker.getNumLostConnections(); ieCtx.initWindow = initWindow; // Send start message to the peer InitializeTargetMsg initTargetMsg = new InitializeTargetMsg( getBaseDN(), getServerId(), serverToInitialize, serverRunningTheTask, ieContext.entryCount, initWindow); serverRunningTheTask, ieCtx.entryCount, initWindow); broker.publish(initTargetMsg); // Wait for all servers to be ok waitForRemoteStartOfInit(); waitForRemoteStartOfInit(ieCtx); // Servers that left in the list are those for which we could not test // that they have been successfully initialized. if (!ieContext.failureList.isEmpty()) if (!ieCtx.failureList.isEmpty()) { throw new DirectoryException( ResultCode.OTHER, ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get( ieContext.failureList.toString())); ieCtx.failureList.toString())); } exportBackend(new BufferedOutputStream(new ReplOutputStream(this))); @@ -1441,7 +1478,7 @@ catch(DirectoryException exportException) { // Give priority to the first exception raised - stored in the context final DirectoryException ieEx = ieContext.exception; final DirectoryException ieEx = ieCtx.exception; exportRootException = ieEx != null ? ieEx : exportException; } @@ -1500,10 +1537,8 @@ continue; } ErrorMsg errorMsg = new ErrorMsg(serverToInitialize, exportRootException.getMessageObject()); broker.publish(errorMsg); broker.publish(new ErrorMsg( serverToInitialize, exportRootException.getMessageObject())); } catch(Exception e) { @@ -1518,16 +1553,16 @@ } // attempt loop // Wait for all servers to be ok, and build the failure list waitForRemoteEndOfInit(); waitForRemoteEndOfInit(ieCtx); // Servers that left in the list are those for which we could not test // that they have been successfully initialized. if (!ieContext.failureList.isEmpty() && exportRootException == null) if (!ieCtx.failureList.isEmpty() && exportRootException == null) { exportRootException = new DirectoryException(ResultCode.OTHER, ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get( Long.toString(getGenerationID()), ieContext.failureList.toString())); ieCtx.failureList.toString())); } // Don't forget to release IEcontext acquired at beginning. @@ -1558,10 +1593,10 @@ * - wait it has finished the import and present the expected generationID, * - build the failureList. */ private void waitForRemoteStartOfInit() private void waitForRemoteStartOfInit(IEContext ieCtx) { final Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(ieContext.startList); new HashSet<Integer>(ieCtx.startList); if (debugEnabled()) TRACER.debugInfo( @@ -1580,7 +1615,7 @@ + " " + dsi.getStatus() + " " + dsi.getGenerationId() + " " + getGenerationID()); if (ieContext.startList.contains(dsi.getDsId())) if (ieCtx.startList.contains(dsi.getDsId())) { if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS) { @@ -1604,11 +1639,11 @@ } while (!done && waitResultAttempt < 1200 && !broker.shuttingDown()); ieContext.failureList.addAll(replicasWeAreWaitingFor); ieCtx.failureList.addAll(replicasWeAreWaitingFor); if (debugEnabled()) TRACER.debugInfo( "[IE] wait for start ends with " + ieContext.failureList); "[IE] wait for start ends with " + ieCtx.failureList); } /** @@ -1616,10 +1651,10 @@ * - wait it has finished the import and present the expected generationID, * - build the failureList. */ private void waitForRemoteEndOfInit() private void waitForRemoteEndOfInit(IEContext ieCtx) { final Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(ieContext.startList); new HashSet<Integer>(ieCtx.startList); if (debugEnabled()) TRACER.debugInfo( @@ -1645,7 +1680,7 @@ while (it.hasNext()) { int serverId = it.next(); if (ieContext.failureList.contains(serverId)) if (ieCtx.failureList.contains(serverId)) { /* this server has already been in error during initialization @@ -1701,11 +1736,11 @@ } while (!done && !broker.shuttingDown()); // infinite wait ieContext.failureList.addAll(replicasWeAreWaitingFor); ieCtx.failureList.addAll(replicasWeAreWaitingFor); if (debugEnabled()) TRACER.debugInfo( "[IE] wait for end ends with " + ieContext.failureList); "[IE] wait for end ends with " + ieCtx.failureList); } /** @@ -1743,13 +1778,13 @@ * * @param errorMsg The error message received. */ private void processErrorMsg(ErrorMsg errorMsg) private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx) { //Exporting must not be stopped on the first error, if we run initialize-all if (ieContext != null && ieContext.exportTarget != RoutableMsg.ALL_SERVERS) if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS) { // The ErrorMsg is received while we have started an initialization ieContext.setExceptionIfNoneSet(new DirectoryException( ieCtx.setExceptionIfNoneSet(new DirectoryException( ResultCode.OTHER, errorMsg.getDetails())); /* @@ -1759,11 +1794,11 @@ * even after the nextInitAttemptDelay * During the import, the ErrorMsg will be received by receiveEntryBytes */ if (ieContext.initializeTask instanceof InitializeTask) if (ieCtx.initializeTask instanceof InitializeTask) { // Update the task that initiated the import ((InitializeTask) ieContext.initializeTask) .updateTaskCompletionState(ieContext.getException()); ((InitializeTask) ieCtx.initializeTask) .updateTaskCompletionState(ieCtx.getException()); releaseIEContext(); } @@ -1781,6 +1816,7 @@ ReplicationMsg msg; while (true) { IEContext ieCtx = ieContext; try { // In the context of the total update, we don't want any automatic @@ -1807,7 +1843,7 @@ else { // Handle connection issues ieContext.setExceptionIfNoneSet(new DirectoryException( ieCtx.setExceptionIfNoneSet(new DirectoryException( ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT .get(broker.getReplicationServer()))); return null; @@ -1819,26 +1855,26 @@ { EntryMsg entryMsg = (EntryMsg)msg; byte[] entryBytes = entryMsg.getEntryBytes(); ieContext.updateCounters(countEntryLimits(entryBytes)); ieCtx.updateCounters(countEntryLimits(entryBytes)); if (ieContext.exporterProtocolVersion >= if (ieCtx.exporterProtocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // check the msgCnt of the msg received to check ordering if (++ieContext.msgCnt != entryMsg.getMsgId()) if (++ieCtx.msgCnt != entryMsg.getMsgId()) { ieContext.setExceptionIfNoneSet(new DirectoryException( ieCtx.setExceptionIfNoneSet(new DirectoryException( ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get( String.valueOf(ieContext.msgCnt), String.valueOf(ieCtx.msgCnt), String.valueOf(entryMsg.getMsgId())))); return null; } // send the ack of flow control mgmt if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0) if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0) { final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg( getServerId(), entryMsg.getSenderID(), ieContext.msgCnt); getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt); broker.publish(amsg, false); if (debugEnabled()) { @@ -1864,12 +1900,12 @@ This is an error termination during the import The error is stored and the import is ended by returning null */ if (ieContext.getException() == null) if (ieCtx.getException() == null) { ErrorMsg errMsg = (ErrorMsg)msg; if (errMsg.getCreationTime() > ieContext.startTime) if (errMsg.getCreationTime() > ieCtx.startTime) { ieContext.setException( ieCtx.setException( new DirectoryException(ResultCode.OTHER,errMsg.getDetails())); return null; } @@ -1880,15 +1916,15 @@ // Other messages received during an import are trashed except // the topologyMsg. if (msg instanceof TopologyMsg && isRemoteDSConnected(ieContext.importSource) == null) && isRemoteDSConnected(ieCtx.importSource) == null) { Message errMsg = Message.raw(Category.SYNC, Severity.NOTICE, ERR_INIT_EXPORTER_DISCONNECTION.get( getBaseDNString(), Integer.toString(getServerId()), Integer.toString(ieContext.importSource))); ieContext.setExceptionIfNoneSet(new DirectoryException( Integer.toString(ieCtx.importSource))); ieCtx.setExceptionIfNoneSet(new DirectoryException( ResultCode.OTHER, errMsg)); return null; } @@ -1896,7 +1932,7 @@ } catch(Exception e) { ieContext.setExceptionIfNoneSet(new DirectoryException( ieCtx.setExceptionIfNoneSet(new DirectoryException( ResultCode.OTHER, ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage()))); } @@ -1957,9 +1993,10 @@ Arrays.toString(lDIFEntry)); // build the message IEContext ieCtx = ieContext; EntryMsg entryMessage = new EntryMsg( getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length, ++ieContext.msgCnt); getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length, ++ieCtx.msgCnt); // Waiting the slowest loop while (!broker.shuttingDown()) @@ -1969,30 +2006,30 @@ server that have been stored by the listener thread in the ieContext, we just abandon the export by throwing an exception. */ if (ieContext.getException() != null) if (ieCtx.getException() != null) { throw new IOException(ieContext.getException().getMessage()); throw new IOException(ieCtx.getException().getMessage()); } int slowestServerId = ieContext.getSlowestServer(); int slowestServerId = ieCtx.getSlowestServer(); if (isRemoteDSConnected(slowestServerId)==null) { ieContext.setException(new DirectoryException(ResultCode.OTHER, ieCtx.setException(new DirectoryException(ResultCode.OTHER, ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get( Integer.toString(ieContext.getSlowestServer())))); Integer.toString(ieCtx.getSlowestServer())))); throw new IOException("IOException with nested DirectoryException", ieContext.getException()); ieCtx.getException()); } int ourLastExportedCnt = ieContext.msgCnt; int slowestCnt = ieContext.ackVals.get(slowestServerId); int ourLastExportedCnt = ieCtx.msgCnt; int slowestCnt = ieCtx.ackVals.get(slowestServerId); if (debugEnabled()) TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " + " our=" + ourLastExportedCnt + " slowest=" + slowestCnt); if ((ourLastExportedCnt - slowestCnt) > ieContext.initWindow) if ((ourLastExportedCnt - slowestCnt) > ieCtx.initWindow) { if (debugEnabled()) TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting"); @@ -2003,13 +2040,13 @@ // process any connection error if (broker.hasConnectionError() || broker.getNumLostConnections() != ieContext.initNumLostConnections) || broker.getNumLostConnections() != ieCtx.initNumLostConnections) { // publish failed - store the error in the ieContext ... DirectoryException de = new DirectoryException(ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get( Integer.toString(broker.getRsServerId()))); ieContext.setExceptionIfNoneSet(de); ieCtx.setExceptionIfNoneSet(de); // .. and abandon the export by throwing an exception. throw new IOException(de.getMessage()); } @@ -2031,13 +2068,13 @@ // process any publish error if (!sent || broker.hasConnectionError() || broker.getNumLostConnections() != ieContext.initNumLostConnections) || broker.getNumLostConnections() != ieCtx.initNumLostConnections) { // publish failed - store the error in the ieContext ... DirectoryException de = new DirectoryException(ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get( Integer.toString(broker.getRsServerId()))); ieContext.setExceptionIfNoneSet(de); ieCtx.setExceptionIfNoneSet(de); // .. and abandon the export by throwing an exception. throw new IOException(de.getMessage()); } @@ -2045,11 +2082,11 @@ // publish succeeded try { ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length)); ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length)); } catch (DirectoryException de) { ieContext.setExceptionIfNoneSet(de); ieCtx.setExceptionIfNoneSet(de); // .. and abandon the export by throwing an exception. throw new IOException(de.getMessage()); } @@ -2157,13 +2194,12 @@ */ acquireIEContext(true); //test and set if no import already in progress ieContext.initializeTask = initTask; ieContext.attemptCnt = 0; ieContext.initReqMsgSent = new InitializeRequestMsg( IEContext ieCtx = ieContext; ieCtx.initializeTask = initTask; ieCtx.attemptCnt = 0; ieCtx.initReqMsgSent = new InitializeRequestMsg( getBaseDN(), getServerId(), source, getInitWindow()); // Publish Init request msg broker.publish(ieContext.initReqMsgSent); broker.publish(ieCtx.initReqMsgSent); /* The normal success processing is now to receive InitTargetMsg then @@ -2219,6 +2255,7 @@ int source = initTargetMsgReceived.getSenderID(); IEContext ieCtx = ieContext; try { // Log starting @@ -2240,12 +2277,12 @@ } // Initialize stuff ieContext.importSource = source; ieContext.initializeCounters(initTargetMsgReceived.getEntryCount()); ieContext.initWindow = initTargetMsgReceived.getInitWindow(); ieCtx.importSource = source; ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount()); ieCtx.initWindow = initTargetMsgReceived.getInitWindow(); // Protocol version is -1 when not known. ieContext.exporterProtocolVersion = getProtocolVersion(source); initFromTask = (InitializeTask)ieContext.initializeTask; ieCtx.exporterProtocolVersion = getProtocolVersion(source); initFromTask = (InitializeTask) ieCtx.initializeTask; // Launch the import importBackend(new ReplInputStream(this)); @@ -2257,14 +2294,14 @@ Store the exception raised. It will be considered if no other exception has been previously stored in the context */ ieContext.setExceptionIfNoneSet(e); ieCtx.setExceptionIfNoneSet(e); } finally { if (debugEnabled()) { TRACER.debugInfo("[IE] Domain=" + this + " ends import with exception=" + ieContext.getException() + " ends import with exception=" + ieCtx.getException() + " connected=" + broker.isConnected()); } @@ -2278,10 +2315,10 @@ */ broker.reStart(false); if (ieContext.getException() != null if (ieCtx.getException() != null && broker.isConnected() && initFromTask != null && ++ieContext.attemptCnt < 2) && ++ieCtx.attemptCnt < 2) { /* Worth a new attempt @@ -2300,13 +2337,13 @@ the request */ logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get( ieContext.getException().getLocalizedMessage())); ieCtx.getException().getLocalizedMessage())); broker.publish(ieContext.initReqMsgSent); broker.publish(ieCtx.initReqMsgSent); ieContext.initializeCounters(0); ieContext.exception = null; ieContext.msgCnt = 0; ieCtx.initializeCounters(0); ieCtx.exception = null; ieCtx.msgCnt = 0; // Processing of the received initTargetMsgReceived is done // let's wait for the next one @@ -2320,7 +2357,7 @@ */ logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get( e.getLocalizedMessage(), ieContext.getException().getLocalizedMessage())); ieCtx.getException().getLocalizedMessage())); } } @@ -2330,19 +2367,19 @@ if (debugEnabled()) { TRACER.debugInfo("[IE] Domain=" + this + " ends initialization with exception=" + ieContext.getException() + " ends initialization with exception=" + ieCtx.getException() + " connected=" + broker.isConnected() + " task=" + initFromTask + " attempt=" + ieContext.attemptCnt); + " attempt=" + ieCtx.attemptCnt); } try { if (broker.isConnected() && ieContext.getException() != null) if (broker.isConnected() && ieCtx.getException() != null) { // Let's notify the exporter ErrorMsg errorMsg = new ErrorMsg(requesterServerId, ieContext.getException().getMessageObject()); ieCtx.getException().getMessageObject()); broker.publish(errorMsg); } /* @@ -2353,7 +2390,7 @@ */ if (initFromTask != null) { initFromTask.updateTaskCompletionState(ieContext.getException()); initFromTask.updateTaskCompletionState(ieCtx.getException()); } } finally @@ -2361,8 +2398,8 @@ Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( getBaseDNString(), initTargetMsgReceived.getSenderID(), getServerId(), (ieContext.getException() == null ? "" : ieContext.getException().getLocalizedMessage())); (ieCtx.getException() == null ? "" : ieCtx.getException().getLocalizedMessage())); logError(msg); releaseIEContext(); } // finally @@ -3449,43 +3486,13 @@ } /** * Returns a boolean indicating if a total update import is currently * in Progress. * Returns the Import/Export context associated to this ReplicationDomain. * * @return A boolean indicating if a total update import is currently * in Progress. * @return the Import/Export context associated to this ReplicationDomain */ public boolean importInProgress() protected IEContext getImportExportContext() { return ieContext != null && ieContext.importInProgress; } /** * Returns a boolean indicating if a total update export is currently * in Progress. * * @return A boolean indicating if a total update export is currently * in Progress. */ public boolean exportInProgress() { return ieContext != null && !ieContext.importInProgress; } /** * Returns the number of entries still to be processed when a total update * is in progress. * * @return The number of entries still to be processed when a total update * is in progress. */ long getLeftEntryCount() { if (ieContext != null) { return ieContext.entryLeftCount; } return 0; return ieContext; } /** @@ -3501,24 +3508,6 @@ } /** * Returns the total number of entries to be processed when a total update * is in progress. * * @return The total number of entries to be processed when a total update * is in progress. */ long getTotalEntryCount() { if (ieContext != null) { return ieContext.entryCount; } return 0; } /** * Set the attributes configured on a server to be included in the ECL. * * @param serverId @@ -3617,7 +3606,7 @@ * The serverId for which we want the include attributes. * @return The attributes. */ public Set<String> getEclIncludes(int serverId) Set<String> getEclIncludes(int serverId) { synchronized (eclIncludesLock) { @@ -3635,7 +3624,7 @@ * The serverId for which we want the include attributes. * @return The attributes. */ public Set<String> getEclIncludesForDeletes(int serverId) Set<String> getEclIncludesForDeletes(int serverId) { synchronized (eclIncludesLock) { opends/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -22,7 +22,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2013 ForgeRock AS. * Portions copyright 2013-2014 ForgeRock AS. */ package org.opends.server.replication.service; @@ -34,6 +34,7 @@ import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.MonitorProvider; import org.opends.server.core.DirectoryServer; import org.opends.server.replication.service.ReplicationDomain.IEContext; import org.opends.server.types.*; /** @@ -130,21 +131,15 @@ String.valueOf(domain.getGenerationID()))); // Add import/export monitoring attributes if (domain.importInProgress()) final IEContext ieContext = domain.getImportExportContext(); if (ieContext != null) { addMonitorData(attributes, "total-update", "import"); addMonitorData( attributes, "total-update-entry-count", domain.getTotalEntryCount()); addMonitorData( attributes, "total-update-entry-left", domain.getLeftEntryCount()); } if (domain.exportInProgress()) { addMonitorData(attributes, "total-update", "export"); addMonitorData( attributes, "total-update-entry-count", domain.getTotalEntryCount()); addMonitorData( attributes, "total-update-entry-left", domain.getLeftEntryCount()); addMonitorData(attributes, "total-update", ieContext.importInProgress() ? "import" : "export"); addMonitorData(attributes, "total-update-entry-count", ieContext.getTotalEntryCount()); addMonitorData(attributes, "total-update-entry-left", ieContext.getLeftEntryCount()); } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -20,7 +20,7 @@ * * CDDL HEADER END * * Copyright 2013 ForgeRock AS * Copyright 2013-2014 ForgeRock AS */ package org.opends.server.replication.plugin; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -22,7 +22,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2013 ForgeRock AS * Portions Copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.plugin; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -22,7 +22,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS * Portions copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.plugin; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -22,7 +22,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS * Portions copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.server; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS * Portions Copyright 2013-2014 ForgeRock AS */ package org.opends.server.replication.service; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -22,7 +22,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS * Portions Copyright 2013-2014 ForgeRock AS */ package org.opends.server.replication.service; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -22,7 +22,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2013 ForgeRock AS * Portions Copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.service; @@ -317,12 +317,7 @@ "exportAndImportData", 100); SortedSet<String> servers = newSortedSet("localhost:" + replServerPort); StringBuilder exportedDataBuilder = new StringBuilder(); for (int i =0; i<ENTRYCOUNT; i++) { exportedDataBuilder.append("key : value"+i+"\n\n"); } String exportedData=exportedDataBuilder.toString(); String exportedData = buildExportedData(ENTRYCOUNT); domain1 = new FakeReplicationDomain( testService, serverId1, servers, 0, exportedData, null, ENTRYCOUNT); @@ -343,18 +338,8 @@ } } int count = 0; while ((importedData.length() < exportedData.length()) && (count < 500)) { count ++; Thread.sleep(100); } assertEquals(domain2.getLeftEntryCount(), 0, "LeftEntryCount for export is " + domain2.getLeftEntryCount()); assertEquals(domain1.getLeftEntryCount(), 0, "LeftEntryCount for import is " + domain1.getLeftEntryCount()); assertEquals(importedData.length(), exportedData.length()); assertEquals(importedData.toString(), exportedData); waitEndExport(exportedData, importedData); assertExportSucessful(domain1, domain2, exportedData, importedData); } finally { @@ -393,12 +378,7 @@ SortedSet<String> servers1 = newSortedSet("localhost:" + replServerPort1); SortedSet<String> servers2 = newSortedSet("localhost:" + replServerPort2); StringBuilder exportedDataBuilder = new StringBuilder(); for (int i =0; i<ENTRYCOUNT; i++) { exportedDataBuilder.append("key : value"+i+"\n\n"); } String exportedData=exportedDataBuilder.toString(); String exportedData = buildExportedData(ENTRYCOUNT); domain1 = new FakeReplicationDomain( testService, 1, servers1, 0, exportedData, null, ENTRYCOUNT); @@ -408,18 +388,8 @@ domain2.initializeFromRemote(1); int count = 0; while ((importedData.length() < exportedData.length()) && (count < 500)) { count ++; Thread.sleep(100); } assertEquals(domain2.getLeftEntryCount(), 0, "LeftEntryCount for export is " + domain2.getLeftEntryCount()); assertEquals(domain1.getLeftEntryCount(), 0, "LeftEntryCount for import is " + domain1.getLeftEntryCount()); assertEquals(importedData.length(), exportedData.length()); assertEquals(importedData.toString(), exportedData); waitEndExport(exportedData, importedData); assertExportSucessful(domain1, domain2, exportedData, importedData); } finally { @@ -428,6 +398,35 @@ } } private String buildExportedData(final int ENTRYCOUNT) { final StringBuilder sb = new StringBuilder(); for (int i = 0; i < ENTRYCOUNT; i++) { sb.append("key : value" + i + "\n\n"); } return sb.toString(); } private void waitEndExport(String exportedData, StringBuilder importedData) throws Exception { int count = 0; while (importedData.length() < exportedData.length() && count < 500) { count ++; Thread.sleep(100); } } private void assertExportSucessful(ReplicationDomain domain1, ReplicationDomain domain2, String exportedData, StringBuilder importedData) { assertEquals(domain2.ieContext.getLeftEntryCount(), 0, "Wrong LeftEntryCount for export"); assertEquals(domain1.ieContext.getLeftEntryCount(), 0, "Wrong LeftEntryCount for import"); assertEquals(importedData.length(), exportedData.length()); assertEquals(importedData.toString(), exportedData); } /** * Sender side of the Total Update Perf test. * The goal of this test is to measure the performance