opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -25,7 +25,6 @@ * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.plugin; import static org.opends.messages.ReplicationMessages.*; @@ -37,14 +36,12 @@ import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.*; import java.io.File; import java.io.InputStream; import java.io.OutputStream; import java.io.StringReader; import java.io.UnsupportedEncodingException; import java.io.*; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.DataFormatException; @@ -63,14 +60,7 @@ import org.opends.server.backends.jeb.BackendImpl; import org.opends.server.backends.task.Task; import org.opends.server.config.ConfigException; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.core.LockFileManager; import org.opends.server.core.ModifyDNOperation; import org.opends.server.core.ModifyDNOperationBasis; import org.opends.server.core.ModifyOperation; import org.opends.server.core.ModifyOperationBasis; import org.opends.server.core.*; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.asn1.ASN1Exception; import org.opends.server.protocols.internal.InternalClientConnection; @@ -80,12 +70,7 @@ import org.opends.server.protocols.ldap.LDAPControl; import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.protocols.ldap.LDAPModification; import org.opends.server.replication.common.AssuredMode; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.common.StatusMachineEvent; import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.replication.service.ReplicationDomain; @@ -93,16 +78,7 @@ import org.opends.server.tasks.PurgeConflictsHistoricalTask; import org.opends.server.tasks.TaskUtils; import org.opends.server.types.*; import org.opends.server.types.operation.PluginOperation; import org.opends.server.types.operation.PostOperationAddOperation; import org.opends.server.types.operation.PostOperationDeleteOperation; import org.opends.server.types.operation.PostOperationModifyDNOperation; import org.opends.server.types.operation.PostOperationModifyOperation; import org.opends.server.types.operation.PostOperationOperation; import org.opends.server.types.operation.PreOperationAddOperation; import org.opends.server.types.operation.PreOperationDeleteOperation; import org.opends.server.types.operation.PreOperationModifyDNOperation; import org.opends.server.types.operation.PreOperationModifyOperation; import org.opends.server.types.operation.*; import org.opends.server.util.LDIFReader; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; @@ -191,8 +167,10 @@ */ private static final DebugTracer TRACER = getTracer(); // The update to replay message queue where the listener thread is going to // push incoming update messages. /** * The update to replay message queue where the listener thread is going to * push incoming update messages. */ private final BlockingQueue<UpdateToReplay> updateToReplayQueue; private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger(); private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger(); @@ -238,9 +216,11 @@ private volatile boolean disabled = false; private volatile boolean stateSavingDisabled = false; // This list is used to temporary store operations that needs // to be replayed at session establishment time. private final TreeMap<ChangeNumber, FakeOperation> replayOperations = /** * This list is used to temporary store operations that needs to be replayed * at session establishment time. */ private final SortedMap<ChangeNumber, FakeOperation> replayOperations = new TreeMap<ChangeNumber, FakeOperation>(); /** @@ -288,7 +268,7 @@ * Fractional replication variables. */ // Holds the fractional configuration for this domain, if any. /** Holds the fractional configuration for this domain, if any. */ private FractionalConfig fractionalConfig = null; /** @@ -341,29 +321,39 @@ * fractionalFilterOperation(PreOperationModifyOperation * modifyOperation, boolean performFiltering) method */ // The operation contains attributes subject to fractional filtering according // to the fractional configuration /** * The operation contains attributes subject to fractional filtering according * to the fractional configuration. */ private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1; // The operation contains no attributes subject to fractional filtering // according to the fractional configuration /** * The operation contains no attributes subject to fractional filtering * according to the fractional configuration. */ private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2; // The operation should become a no-op /** The operation should become a no-op. */ private static final int FRACTIONAL_BECOME_NO_OP = 3; // This configuration boolean indicates if this ReplicationDomain should log // ChangeNumbers. /** * This configuration boolean indicates if this ReplicationDomain should log * ChangeNumbers. */ private boolean logChangeNumber = false; // This configuration integer indicates the time the domain keeps the // historical information necessary to solve conflicts. // When a change stored in the historical part of the user entry has a date // (from its replication ChangeNumber) older than this delay, it is candidate // to be purged. /** * This configuration integer indicates the time the domain keeps the * historical information necessary to solve conflicts.<br> * When a change stored in the historical part of the user entry has a date * (from its replication ChangeNumber) older than this delay, it is candidate * to be purged. */ private long histPurgeDelayInMilliSec = 0; // The last change number purged in this domain. Allows to have a continuous // purging process from one purge processing (task run) to the next one. // Values 0 when the server starts. /** * The last change number purged in this domain. Allows to have a continuous * purging process from one purge processing (task run) to the next one. * Values 0 when the server starts. */ private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0); /** @@ -752,10 +742,8 @@ if (fractionalConfig.isFractional()) { // Set new fractional configuration values if (newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL) fractionalConfig.setFractionalExclusive(true); else fractionalConfig.setFractionalExclusive(false); fractionalConfig.setFractionalExclusive( newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL); fractionalConfig.setFractionalSpecificClassesAttributes( newFractionalConfig.getFractionalSpecificClassesAttributes()); fractionalConfig.setFractionalAllClassesAttributes( @@ -950,10 +938,8 @@ // Set stored fractional configuration values if (storedFractionalConfig.isFractional()) { if (storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL) storedFractionalConfig.setFractionalExclusive(true); else storedFractionalConfig.setFractionalExclusive(false); storedFractionalConfig.setFractionalExclusive( storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL); } storedFractionalConfig.setFractionalSpecificClassesAttributes( storedFractionalSpecificClassesAttributes); @@ -1434,12 +1420,10 @@ AttributeValue rdnAttributeValue = entryRdn.getAttributeValue(attributeType); List<Attribute> attrList = attributesMap.get(attributeType); Iterator<Attribute> attrIt = attrList.iterator(); AttributeValue sameAttrValue = null; // Locate the attribute value identical to the one in the RDN while(attrIt.hasNext()) for (Attribute attr : attrList) { Attribute attr = attrIt.next(); if (attr.contains(rdnAttributeValue)) { for (AttributeValue attrValue : attr) { @@ -2577,9 +2561,12 @@ /** * Create and replay a synchronized Operation from an UpdateMsg. * * @param msg The UpdateMsg to be replayed. * @param msg * The UpdateMsg to be replayed. * @param shutdown * whether the server initiated shutdown */ public void replay(LDAPUpdateMsg msg) public void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown) { Operation op = null; boolean replayDone = false; @@ -2599,6 +2586,11 @@ while ((!dependency) && (!replayDone) && (retryCount-- > 0)) { if (shutdown.get()) { // shutdown initiated, let's leave return; } // Try replay the operation op.setInternalOperation(true); op.setSynchronizationOperation(true); @@ -2622,6 +2614,25 @@ // renamed by a more recent modify DN. replayDone = true; } else if (result == ResultCode.BUSY) { /* * We probably could not get a lock (OPENDJ-885). Give the server * another chance to process this operation immediately. */ Thread.yield(); continue; } else if (result == ResultCode.UNAVAILABLE) { /* * It can happen when a rebuild is performed or the backend is * offline (OPENDJ-49). Give the server another chance to process * this operation after some time. */ Thread.sleep(50); continue; } else if (op instanceof ModifyOperation) { ModifyOperation newOp = (ModifyOperation) op; @@ -2693,22 +2704,13 @@ } } catch (ASN1Exception e) { Message message = ERR_EXCEPTION_DECODING_OPERATION.get( String.valueOf(msg) + stackTraceToSingleLineString(e)); logError(message); replayErrorMsg = message.toString(); replayErrorMsg = logDecodingOperationError(msg, e); } catch (LDAPException e) { Message message = ERR_EXCEPTION_DECODING_OPERATION.get( String.valueOf(msg) + stackTraceToSingleLineString(e)); logError(message); replayErrorMsg = message.toString(); replayErrorMsg = logDecodingOperationError(msg, e); } catch (DataFormatException e) { Message message = ERR_EXCEPTION_DECODING_OPERATION.get( String.valueOf(msg) + stackTraceToSingleLineString(e)); logError(message); replayErrorMsg = message.toString(); replayErrorMsg = logDecodingOperationError(msg, e); } catch (Exception e) { if (changeNumber != null) @@ -2726,10 +2728,7 @@ updateError(changeNumber); } else { Message message = ERR_EXCEPTION_DECODING_OPERATION.get( String.valueOf(msg) + stackTraceToSingleLineString(e)); logError(message); replayErrorMsg = message.toString(); replayErrorMsg = logDecodingOperationError(msg, e); } } finally { @@ -2753,6 +2752,14 @@ } while (msg != null); } private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e) { Message message = ERR_EXCEPTION_DECODING_OPERATION.get( String.valueOf(msg) + stackTraceToSingleLineString(e)); logError(message); return message.toString(); } /** * This method is called when an error happens while replaying * an operation. @@ -4862,7 +4869,7 @@ * {@inheritDoc} */ @Override public boolean processUpdate(UpdateMsg updateMsg) public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown) { // Ignore message if fractional configuration is inconsistent and // we have been passed into bad data set status @@ -4880,8 +4887,23 @@ // Put update message into the replay queue // (block until some place in the queue is available) UpdateToReplay updateToReplay = new UpdateToReplay(msg, this); updateToReplayQueue.offer(updateToReplay); final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this); while (!shutdown.get()) { // loop until we can offer to the queue or shutdown was initiated try { if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS)) { // successful offer to the queue, let's exit the loop break; } } catch (InterruptedException e) { // Thread interrupted: check for shutdown. } } return false; } @@ -5321,10 +5343,8 @@ case EXCLUSIVE_FRACTIONAL: case INCLUSIVE_FRACTIONAL: result.setFractional(true); if (newFractionalMode == EXCLUSIVE_FRACTIONAL) result.setFractionalExclusive(true); else result.setFractionalExclusive(false); result.setFractionalExclusive( newFractionalMode == EXCLUSIVE_FRACTIONAL); break; } result.setFractionalSpecificClassesAttributes( opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -27,57 +27,29 @@ */ package org.opends.server.replication.plugin; import java.util.ArrayList; import static org.opends.server.replication.plugin. ReplicationRepairRequestControl.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*; import static org.opends.server.util.StaticUtils.*; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import org.opends.messages.Message; import org.opends.server.admin.server.ConfigurationAddListener; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.server.ConfigurationDeleteListener; import org.opends.server.admin.std.server.ReplicationDomainCfg; import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; import org.opends.server.api.Backend; import org.opends.server.api.BackupTaskListener; import org.opends.server.api.ExportTaskListener; import org.opends.server.api.ImportTaskListener; import org.opends.server.api.RestoreTaskListener; import org.opends.server.api.SynchronizationProvider; import org.opends.server.api.*; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.types.BackupConfig; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.Control; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.LDIFExportConfig; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.Modification; import org.opends.server.types.Operation; import org.opends.server.types.RestoreConfig; import org.opends.server.types.ResultCode; import org.opends.server.types.SynchronizationProviderResult; import org.opends.server.types.operation.PluginOperation; import org.opends.server.types.operation.PostOperationAddOperation; import org.opends.server.types.operation.PostOperationDeleteOperation; import org.opends.server.types.operation.PostOperationModifyDNOperation; import org.opends.server.types.operation.PostOperationModifyOperation; import org.opends.server.types.operation.PostOperationOperation; import org.opends.server.types.operation.PreOperationAddOperation; import org.opends.server.types.operation.PreOperationDeleteOperation; import org.opends.server.types.operation.PreOperationModifyDNOperation; import org.opends.server.types.operation.PreOperationModifyOperation; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import org.opends.server.types.*; import org.opends.server.types.operation.*; /** * This class is used to load the Replication code inside the JVM @@ -104,8 +76,8 @@ * The queue of received update messages, to be treated by the ReplayThread * threads. */ private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue = new LinkedBlockingQueue<UpdateToReplay>(); private static final BlockingQueue<UpdateToReplay> updateToReplayQueue = new LinkedBlockingQueue<UpdateToReplay>(10000); /** * The list of ReplayThread threads. @@ -228,9 +200,9 @@ } /** * Creates a new domain from its configEntry, do the * necessary initialization and starts it so that it is * fully operational when this method returns. * Creates a new domain from its configEntry, do the necessary initialization * and starts it so that it is fully operational when this method returns. It * is only used for tests so far. * * @param configuration The entry with the configuration of this domain. * @param queue The BlockingQueue that this domain will use. @@ -239,13 +211,13 @@ * * @throws ConfigException When the configuration is not valid. */ public static LDAPReplicationDomain createNewDomain( static LDAPReplicationDomain createNewDomain( ReplicationDomainCfg configuration, BlockingQueue<UpdateToReplay> queue) throws ConfigException { LDAPReplicationDomain domain; domain = new LDAPReplicationDomain(configuration, queue); LDAPReplicationDomain domain = new LDAPReplicationDomain(configuration, queue); domains.put(domain.getBaseDN(), domain); return domain; @@ -362,6 +334,7 @@ /** * {@inheritDoc} */ @Override public boolean isConfigurationAddAcceptable( ReplicationDomainCfg configuration, List<Message> unacceptableReasons) { @@ -372,6 +345,7 @@ /** * {@inheritDoc} */ @Override public ConfigChangeResult applyConfigurationAdd( ReplicationDomainCfg configuration) { @@ -652,6 +626,7 @@ /** * {@inheritDoc} */ @Override public void processBackupBegin(Backend backend, BackupConfig config) { for (DN dn : backend.getBaseDNs()) @@ -665,6 +640,7 @@ /** * {@inheritDoc} */ @Override public void processBackupEnd(Backend backend, BackupConfig config, boolean successful) { @@ -679,6 +655,7 @@ /** * {@inheritDoc} */ @Override public void processRestoreBegin(Backend backend, RestoreConfig config) { for (DN dn : backend.getBaseDNs()) @@ -692,6 +669,7 @@ /** * {@inheritDoc} */ @Override public void processRestoreEnd(Backend backend, RestoreConfig config, boolean successful) { @@ -706,6 +684,7 @@ /** * {@inheritDoc} */ @Override public void processImportBegin(Backend backend, LDIFImportConfig config) { for (DN dn : backend.getBaseDNs()) @@ -719,6 +698,7 @@ /** * {@inheritDoc} */ @Override public void processImportEnd(Backend backend, LDIFImportConfig config, boolean successful) { @@ -733,6 +713,7 @@ /** * {@inheritDoc} */ @Override public void processExportBegin(Backend backend, LDIFExportConfig config) { for (DN dn : backend.getBaseDNs()) @@ -746,6 +727,7 @@ /** * {@inheritDoc} */ @Override public void processExportEnd(Backend backend, LDIFExportConfig config, boolean successful) { @@ -760,6 +742,7 @@ /** * {@inheritDoc} */ @Override public ConfigChangeResult applyConfigurationDelete( ReplicationDomainCfg configuration) { @@ -771,6 +754,7 @@ /** * {@inheritDoc} */ @Override public boolean isConfigurationDeleteAcceptable( ReplicationDomainCfg configuration, List<Message> unacceptableReasons) { @@ -804,10 +788,10 @@ /** * {@inheritDoc} */ public boolean isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg configuration, List<Message> unacceptableReasons) @Override public boolean isConfigurationChangeAcceptable( ReplicationSynchronizationProviderCfg configuration, List<Message> unacceptableReasons) { return true; } @@ -815,9 +799,9 @@ /** * {@inheritDoc} */ public ConfigChangeResult applyConfigurationChange (ReplicationSynchronizationProviderCfg configuration) @Override public ConfigChangeResult applyConfigurationChange( ReplicationSynchronizationProviderCfg configuration) { int numUpdateRepayThread = configuration.getNumUpdateReplayThreads(); @@ -838,6 +822,7 @@ /** * {@inheritDoc} */ @Override public void completeSynchronizationProvider() { isRegistered = true; opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -26,20 +26,20 @@ * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.plugin; import org.opends.server.replication.protocol.LDAPUpdateMsg; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Message; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.protocol.LDAPUpdateMsg; /** * Thread that is used to get message from the replication servers (stored @@ -56,7 +56,7 @@ private static final DebugTracer TRACER = getTracer(); private final BlockingQueue<UpdateToReplay> updateToReplayQueue; private volatile boolean shutdown = false; private AtomicBoolean shutdown = new AtomicBoolean(false); private static int count = 0; /** @@ -75,7 +75,7 @@ */ public void shutdown() { shutdown = true; shutdown.set(true); } /** @@ -84,27 +84,26 @@ @Override public void run() { if (debugEnabled()) { TRACER.debugInfo("Replication Replay thread starting."); } while (!shutdown) while (!shutdown.get()) { try { UpdateToReplay updateToreplay; // Loop getting an updateToReplayQueue from the update message queue and // replaying matching changes while ( (!shutdown) && while (!shutdown.get() && ((updateToreplay = updateToReplayQueue.poll(1L, TimeUnit.SECONDS)) != null)) { // Find replication domain for that update message LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage(); LDAPReplicationDomain domain = updateToreplay.getReplicationDomain(); domain.replay(updateMsg); domain.replay(updateMsg, shutdown); } } catch (Exception e) { opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -26,14 +26,15 @@ * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.service; import org.opends.messages.Message; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Message; import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.protocol.UpdateMsg; @@ -50,7 +51,7 @@ private static final DebugTracer TRACER = getTracer(); private final ReplicationDomain repDomain; private volatile boolean shutdown = false; private AtomicBoolean shutdown = new AtomicBoolean(false); private volatile boolean done = false; @@ -72,7 +73,7 @@ */ public void shutdown() { shutdown = true; shutdown.set(true); } /** @@ -81,22 +82,21 @@ @Override public void run() { UpdateMsg updateMsg = null; if (debugEnabled()) { TRACER.debugInfo("Replication Listener thread starting."); } while (!shutdown) while (!shutdown.get()) { UpdateMsg updateMsg = null; try { // Loop receiving update messages and puting them in the update message // Loop receiving update messages and putting them in the update message // queue while ((!shutdown) && ((updateMsg = repDomain.receive()) != null)) while (!shutdown.get() && ((updateMsg = repDomain.receive()) != null)) { if (repDomain.processUpdate(updateMsg)) if (repDomain.processUpdate(updateMsg, shutdown)) { repDomain.processUpdateDoneSynchronous(updateMsg); } @@ -104,7 +104,7 @@ if (updateMsg == null) { shutdown = true; shutdown.set(true); } } catch (Exception e) opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -28,9 +28,8 @@ package org.opends.server.replication.service; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.common.StatusMachine.*; import java.io.BufferedOutputStream; @@ -41,6 +40,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.opends.messages.Category; @@ -50,32 +50,8 @@ import org.opends.server.backends.task.Task; import org.opends.server.config.ConfigException; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.AssuredMode; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.common.DSInfo; import org.opends.server.replication.common.RSInfo; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.common.StatusMachine; import org.opends.server.replication.common.StatusMachineEvent; import org.opends.server.replication.protocol.AckMsg; import org.opends.server.replication.protocol.ChangeStatusMsg; import org.opends.server.replication.protocol.DoneMsg; import org.opends.server.replication.protocol.EntryMsg; import org.opends.server.replication.protocol.ErrorMsg; import org.opends.server.replication.protocol.HeartbeatMsg; import org.opends.server.replication.protocol.InitializeRcvAckMsg; import org.opends.server.replication.protocol.InitializeRequestMsg; import org.opends.server.replication.protocol.InitializeTargetMsg; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.ResetGenerationIdMsg; import org.opends.server.replication.protocol.RoutableMsg; import org.opends.server.replication.protocol.TopologyMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; import org.opends.server.tasks.InitializeTargetTask; import org.opends.server.tasks.InitializeTask; import org.opends.server.types.Attribute; @@ -3184,41 +3160,40 @@ public abstract long countEntries() throws DirectoryException; /** * This method should handle the processing of {@link UpdateMsg} receive * from remote replication entities. * This method should handle the processing of {@link UpdateMsg} receive from * remote replication entities. * <p> * This method will be called by a single thread and should therefore * should not be blocking. * This method will be called by a single thread and should therefore should * not be blocking. * * @param updateMsg The {@link UpdateMsg} that was received. * * @return A boolean indicating if the processing is completed at return * time. * If <code> true </code> is returned, no further * processing is necessary. * * If <code> false </code> is returned, the subclass should * call the method * {@link #processUpdateDone(UpdateMsg, String)} * and update the ServerState * When this processing is complete. * * @param updateMsg * The {@link UpdateMsg} that was received. * @param shutdown * whether the server initiated shutdown * @return A boolean indicating if the processing is completed at return time. * If <code> true </code> is returned, no further processing is * necessary. If <code> false </code> is returned, the subclass should * call the method {@link #processUpdateDone(UpdateMsg, String)} and * update the ServerState When this processing is complete. */ public abstract boolean processUpdate(UpdateMsg updateMsg); public abstract boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown); /** * This method must be called after each call to * {@link #processUpdate(UpdateMsg)} when the processing of the update is * completed. * {@link #processUpdate(UpdateMsg, AtomicBoolean)} when the processing of the * update is completed. * <p> * It is useful for implementation needing to process the update in an * asynchronous way or using several threads, but must be called even * by implementation doing it in a synchronous, single-threaded way. * asynchronous way or using several threads, but must be called even by * implementation doing it in a synchronous, single-threaded way. * * @param msg The UpdateMsg whose processing was completed. * @param replayErrorMsg if not null, this means an error occurred during the * replay of this update, and this is the matching human readable message * describing the problem. * @param msg * The UpdateMsg whose processing was completed. * @param replayErrorMsg * if not null, this means an error occurred during the replay of * this update, and this is the matching human readable message * describing the problem. */ public void processUpdateDone(UpdateMsg msg, String replayErrorMsg) { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -23,24 +23,20 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.plugin; import org.opends.server.util.StaticUtils; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; @@ -60,84 +56,80 @@ import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.service.ReplicationDomain; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.Attributes; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.ObjectClass; import org.opends.server.types.ResultCode; import org.opends.server.types.*; import org.opends.server.util.StaticUtils; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.testng.Assert.*; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; import static org.testng.Assert.*; /** * Various tests around fractional replication */ @SuppressWarnings("javadoc") public class FractionalReplicationTest extends ReplicationTestCase { // The RS /** The RS */ private ReplicationServer replicationServer = null; // RS port /** RS port */ private int replServerPort = -1; // Represents the real domain to test (replays and filters) /** Represents the real domain to test (replays and filters) */ private Entry fractionalDomainCfgEntry = null; // The domain used to send updates to the reald domain /** The domain used to send updates to the reald domain */ private FakeReplicationDomain replicationDomain = null; // Ids of servers /** Ids of servers */ private static final int DS1_ID = 1; // fractional domain private static final int DS2_ID = 2; // fake domain private static final int RS_ID = 91; // replication server private final String testName = this.getClass().getSimpleName(); // Fractional mode /** Fractional mode */ private static final int EXCLUDE_FRAC_MODE = 0; private static final int INCLUDE_FRAC_MODE = 1; int initWindow = 100; private ChangeNumberGenerator gen = null; // The tracer object for the debug logger /** The tracer object for the debug logger */ private static final DebugTracer TRACER = getTracer(); // Number of seconds before generating an error if some conditions not met /** Number of seconds before generating an error if some conditions not met */ private static final int TIMEOUT = 10000; // Uuid of the manipulated entry /** Uuid of the manipulated entry */ private static final String ENTRY_UUID = "11111111-1111-1111-1111-111111111111"; private static final String ENTRY_UUID2 = "22222222-2222-2222-2222-222222222222"; private static final String ENTRY_UUID3 = "33333333-3333-3333-3333-333333333333"; // Dn of the manipulated entry /** Dn of the manipulated entry */ private static String ENTRY_DN = "uid=1," + TEST_ROOT_DN_STRING; // Optional attribute not part of concerned attributes of the fractional // configuration during tests. It should not be impacted by fractional // mechanism /** * Optional attribute not part of concerned attributes of the fractional * configuration during tests. It should not be impacted by fractional * mechanism */ private static final String OPTIONAL_ATTR = "description"; // Optional attribute used as synchronization attribute to know when the modify // operation has been processed (used as add new attribute in the modify operation) // It may or may not be part of the filtered attributes, depending on the fractional // test mode : exclusive or inclusive /** * Optional attribute used as synchronization attribute to know when the * modify operation has been processed (used as add new attribute in the * modify operation) It may or may not be part of the filtered attributes, * depending on the fractional test mode : exclusive or inclusive */ private static final String SYNCHRO_OPTIONAL_ATTR = "seeAlso"; // Second test backend /** Second test backend */ private static final String TEST2_ROOT_DN_STRING = "dc=example,dc=com"; private static final String TEST2_ORG_DN_STRING = "o=test2," + TEST2_ROOT_DN_STRING; private static String ENTRY_DN2 = "uid=1," + TEST2_ORG_DN_STRING; @@ -456,7 +448,6 @@ private void createFakeReplicationDomain(boolean firstBackend, long generationId) { try{ List<String> replicationServers = new ArrayList<String>(); replicationServers.add("localhost:" + replServerPort); @@ -474,7 +465,7 @@ String rdPortStr = serverStr.substring(index + 1); try { rdPort = (new Integer(rdPortStr)).intValue(); rdPort = Integer.valueOf(rdPortStr); } catch (Exception e) { fail("Enable to get an int from: " + rdPortStr); @@ -706,11 +697,6 @@ this.exportedEntryCount = exportedEntryCount; } public void initImport(StringBuilder importString) { this.importString = importString; } @Override public long countEntries() throws DirectoryException { @@ -730,7 +716,6 @@ throw new DirectoryException(ResultCode.OPERATIONS_ERROR, ERR_BACKEND_EXPORT_ENTRY.get("", "")); } } @Override @@ -761,17 +746,12 @@ } @Override public boolean processUpdate(UpdateMsg updateMsg) public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown) { if (queue != null) queue.add(updateMsg); return true; } public void setGenerationID(long newGenerationID) { generationID = newGenerationID; } } private static final String REPLICATION_GENERATION_ID = @@ -1320,7 +1300,7 @@ "domain status obtained after " + (toWait-nSec) + " second(s)."); return; } sleep(1000); TestCaseUtils.sleep(1000); nSec--; } fail("Did not get expected replication domain status: expected <" + expectedStatus + @@ -1567,9 +1547,8 @@ createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING)); /* * Perform add operation with fornbidden attribute in RDN * Perform add operation with forbidden attribute in RDN */ String entryLdif = "dn: displayName=ValueToBeKept," + TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + @@ -1620,9 +1599,8 @@ */ /* * Perform add operation with fornbidden attribute in RDN * Perform add operation with forbidden attribute in RDN */ entryLdif = "dn: displayName=ValueToBeKept+description=ValueToBeKeptToo," + TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + @@ -1698,9 +1676,8 @@ createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING)); /* * Perform add operation with fornbidden attribute in RDN * Perform add operation with forbidden attribute in RDN */ String entryLdif = "dn: displayName=ValueToBeKept," + TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + @@ -1754,9 +1731,8 @@ */ /* * Perform add operation with fornbidden attribute in RDN * Perform add operation with forbidden attribute in RDN */ entryLdif = "dn: displayName=ValueToBeKept+description=ValueToBeKeptToo," + TEST_ROOT_DN_STRING + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + @@ -1834,9 +1810,8 @@ createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING)); /* * Perform add operation with fornbidden attribute in RDN * Perform add operation with forbidden attribute in RDN */ String entryName = "displayName=ValueToBeKept+description=ValueToBeRemoved," + TEST_ROOT_DN_STRING ; String entryLdif = "dn: " + entryName + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + @@ -1936,7 +1911,7 @@ @Test public void testModifyDnWithForbiddenAttrInRDNInclude() { String testcase = "testModifyDnWithForbiddenAttrInRDNInclude"; String testcase = "testModifyDnWithForbiddenAttrInRDNInclude"; initTest(); @@ -1953,9 +1928,8 @@ createFakeReplicationDomain(true, readGenIdFromSuffixRootEntry(TEST_ROOT_DN_STRING)); /* * Perform add operation with fornbidden attribute in RDN * Perform add operation with forbidden attribute in RDN */ String entryName = "displayName=ValueToBeKept+description=ValueToBeRemoved," + TEST_ROOT_DN_STRING ; String entryLdif = "dn: " + entryName + "\n" + "objectClass: top\n" + "objectClass: person\n" + "objectClass: organizationalPerson\n" + opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/NamingConflictTest.java
@@ -23,13 +23,16 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS */ package org.opends.server.replication.plugin; import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING; import static org.opends.server.TestCaseUtils.*; import static org.testng.Assert.*; import java.util.ArrayList; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.server.TestCaseUtils; import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy; @@ -40,24 +43,20 @@ import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.types.Attribute; import org.opends.server.types.DN; import org.opends.server.types.RDN; import org.opends.server.types.Entry; import org.opends.server.types.ResultCode; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.types.*; import org.testng.annotations.Test; import static org.testng.Assert.*; /** * Test the naming conflict resolution code. */ @SuppressWarnings("javadoc") public class NamingConflictTest extends ReplicationTestCase { private static final AtomicBoolean SHUTDOWN = new AtomicBoolean(false); /** * Test for issue 3402 : test, that a modrdn that is older than an other * modrdn but that is applied later is ignored. @@ -123,10 +122,10 @@ "uid=simultaneous2"); // Put the message in the replay queue domain.processUpdate(modDnMsg); domain.processUpdate(modDnMsg, SHUTDOWN); // Make the domain replay the change from the replay queue domain.replay(queue.take().getUpdateMessage()); domain.replay(queue.take().getUpdateMessage(), SHUTDOWN); // This MODIFY DN uses an older DN and should therefore be cancelled // at replay time. @@ -137,11 +136,11 @@ "uid=simulatneouswrong"); // Put the message in the replay queue domain.processUpdate(modDnMsg); domain.processUpdate(modDnMsg, SHUTDOWN); // Make the domain replay the change from the replay queue // and resolve conflict domain.replay(queue.take().getUpdateMessage()); domain.replay(queue.take().getUpdateMessage(), SHUTDOWN); // Expect the conflict resolution assertFalse(DirectoryServer.entryExists(entry.getDN()), @@ -158,8 +157,6 @@ * a delete operation has removed one of the conflicting entries * the other conflicting entry is correctly renamed to its * original name. * * @throws Exception if the test fails. */ @Test(enabled=true) public void conflictCleaningDelete() throws Exception @@ -215,10 +212,10 @@ null); // Put the message in the replay queue domain.processUpdate(addMsg); domain.processUpdate(addMsg, SHUTDOWN); // Make the domain replay the change from the replay queue domain.replay(queue.take().getUpdateMessage()); domain.replay(queue.take().getUpdateMessage(), SHUTDOWN); // Now delete the first entry that was added at the beginning TestCaseUtils.deleteEntry(entry.getDN()); @@ -226,7 +223,7 @@ // Expect the conflict resolution : the second entry should now // have been renamed with the original DN. Entry resultEntry = DirectoryServer.getEntry(entry.getDN()); assertTrue(resultEntry != null, "The conflict was not cleared"); assertNotNull(resultEntry, "The conflict was not cleared"); assertEquals(getEntryUUID(resultEntry.getDN()), "c9cb8c3c-615a-4122-865d-50323aaaed48", "The wrong entry has been renamed"); @@ -300,10 +297,10 @@ null); // Put the message in the replay queue domain.processUpdate(addMsg); domain.processUpdate(addMsg, SHUTDOWN); // Make the domain replay the change from the replay queue domain.replay(queue.take().getUpdateMessage()); domain.replay(queue.take().getUpdateMessage(), SHUTDOWN); // Now delete the first entry that was added at the beginning InternalClientConnection conn = @@ -316,7 +313,7 @@ // Expect the conflict resolution : the second entry should now // have been renamed with the original DN. Entry resultEntry = DirectoryServer.getEntry(entry.getDN()); assertTrue(resultEntry != null, "The conflict was not cleared"); assertNotNull(resultEntry, "The conflict was not cleared"); assertEquals(getEntryUUID(resultEntry.getDN()), "c9cb8c3c-615a-4122-865d-50323aaaed48", "The wrong entry has been renamed"); @@ -409,9 +406,9 @@ delMsg.setSubtreeDelete(true); // Put the message in the replay queue domain.processUpdate(delMsg); domain.processUpdate(delMsg, SHUTDOWN); // Make the domain replay the change from the replay queue domain.replay(queue.take().getUpdateMessage()); domain.replay(queue.take().getUpdateMessage(), SHUTDOWN); // Expect the subtree to be deleted and no conflict entry created assertFalse(DirectoryServer.entryExists(parentEntry.getDN()), @@ -488,9 +485,9 @@ // NOT SUBTREE // Put the message in the replay queue domain.processUpdate(delMsg); domain.processUpdate(delMsg, SHUTDOWN); // Make the domain replay the change from the replay queue domain.replay(queue.take().getUpdateMessage()); domain.replay(queue.take().getUpdateMessage(), SHUTDOWN); // Expect the parent entry to be deleted assertTrue(!DirectoryServer.entryExists(parentEntry.getDN()), @@ -502,7 +499,6 @@ "+cn=child,o=test"); assertTrue(DirectoryServer.entryExists(childDN), "Child entry conflict exist with DN="+childDN); } finally { @@ -572,9 +568,9 @@ new ArrayList<Attribute>()); // Put the message in the replay queue domain.processUpdate(addMsg); domain.processUpdate(addMsg, SHUTDOWN); // Make the domain replay the change from the replay queue domain.replay(queue.take().getUpdateMessage()); domain.replay(queue.take().getUpdateMessage(), SHUTDOWN); // Expect the parent entry to be deleted assertFalse(DirectoryServer.entryExists(parentEntry.getDN()), @@ -586,7 +582,6 @@ "+cn=child,o=test"); assertTrue(DirectoryServer.entryExists(childDN), "Child entry conflict exist with DN="+childDN); } finally { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -23,36 +23,45 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS */ package org.opends.server.replication.service; import org.opends.server.types.ResultCode; import static org.opends.messages.ReplicationMessages.*; import java.io.IOException; import java.util.concurrent.BlockingQueue; import org.opends.server.config.ConfigException; import java.util.Collection; import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.server.config.ConfigException; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.types.DirectoryException; import static org.opends.messages.ReplicationMessages.*; import org.opends.server.types.ResultCode; /** * This class is the minimum implementation of a Concrete ReplicationDomain * used to test the Generic Replication Service. */ @SuppressWarnings("javadoc") public class FakeReplicationDomain extends ReplicationDomain { // A blocking queue that is used to send the UpdateMsg received from // the Replication Service. BlockingQueue<UpdateMsg> queue = null; /** * A blocking queue that is used to send the UpdateMsg received from the * Replication Service. */ private BlockingQueue<UpdateMsg> queue = null; // A string that will be exported should exportBackend be called. String exportString = null; /** A string that will be exported should exportBackend be called. */ private String exportString = null; // A StringBuilder that will be used to build a build a new String should the // import be called. StringBuilder importString = null; /** * A StringBuilder that will be used to build a build a new String should the * import be called. */ private StringBuilder importString = null; private int exportedEntryCount; @@ -142,7 +151,7 @@ } @Override public boolean processUpdate(UpdateMsg updateMsg) public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown) { if (queue != null) queue.add(updateMsg); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -23,36 +23,36 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS */ package org.opends.server.replication.service; import org.opends.server.types.ResultCode; import static org.opends.messages.ReplicationMessages.*; import java.io.IOException; import java.util.concurrent.BlockingQueue; import org.opends.server.config.ConfigException; import java.util.Collection; import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.server.config.ConfigException; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.types.DirectoryException; import static org.opends.messages.ReplicationMessages.*; import org.opends.server.types.ResultCode; /** * This class is the minimum implementation of a Concrete ReplicationDomain * used to test the Generic Replication Service. */ @SuppressWarnings("javadoc") public class FakeStressReplicationDomain extends ReplicationDomain { // A blocking queue that is used to send the UpdateMsg received from // the Replication Service. BlockingQueue<UpdateMsg> queue = null; // A string that will be exported should exportBackend be called. String exportString = null; // A StringBuilder that will be used to build a build a new String should the // import be called. StringBuilder importString = null; /** * A blocking queue that is used to send the UpdateMsg received from the * Replication Service. */ private BlockingQueue<UpdateMsg> queue = null; public FakeStressReplicationDomain( String serviceID, @@ -68,23 +68,8 @@ this.queue = queue; } public FakeStressReplicationDomain( String serviceID, int serverID, Collection<String> replicationServers, int window, long heartbeatInterval, String exportString, StringBuilder importString) throws ConfigException { super(serviceID, serverID, 100); startPublishService(replicationServers, window, heartbeatInterval, 500); startListenService(); this.exportString = exportString; this.importString = importString; } private static final int IMPORT_SIZE = 100000000; final int IMPORT_SIZE = 100000000; @Override public long countEntries() throws DirectoryException { @@ -150,7 +135,7 @@ } @Override public boolean processUpdate(UpdateMsg updateMsg) public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown) { if (queue != null) queue.add(updateMsg);