opends/resource/schema/02-config.ldif
@@ -1543,11 +1543,6 @@ SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.445 NAME 'ds-cfg-cache-level' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.315 NAME 'ds-cfg-allow-retrieving-membership' SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 @@ -2182,11 +2177,20 @@ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.445 NAME 'ds-cfg-cache-level' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.446 NAME 'ds-cfg-cache-preload' SYNTAX 1.3.6.1.4.1.1466.115.121.1.7 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.447 NAME 'ds-cfg-num-update-replay-threads' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.1 NAME 'ds-cfg-access-control-handler' SUP top @@ -3087,6 +3091,7 @@ NAME 'ds-cfg-replication-synchronization-provider' SUP ds-cfg-synchronization-provider STRUCTURAL MAY ( ds-cfg-num-update-replay-threads ) X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.94 NAME 'ds-cfg-dictionary-password-validator' opends/src/admin/defn/org/opends/server/admin/std/ReplicationSynchronizationProviderConfiguration.xml
@@ -23,7 +23,7 @@ ! CDDL HEADER END ! ! ! Portions Copyright 2007 Sun Microsystems, Inc. ! Portions Copyright 2007-2008 Sun Microsystems, Inc. ! --> <adm:managed-object name="replication-synchronization-provider" plural-name="replication-synchronization-providers" @@ -79,4 +79,28 @@ </adm:defined> </adm:default-behavior> </adm:property-override> <adm:property name="num-update-replay-threads" mandatory="false" read-only="false"> <adm:synopsis> Specifies the number of update replay threads </adm:synopsis> <adm:description> This is the number of threads created for replaying every updates received for all the replication domains. </adm:description> <adm:default-behavior> <adm:defined> <adm:value> 10 </adm:value> </adm:defined> </adm:default-behavior> <adm:syntax> <adm:integer lower-limit="1" upper-limit="65535"></adm:integer> </adm:syntax> <adm:profile name="ldap"> <ldap:attribute> <ldap:name>ds-cfg-num-update-replay-threads</ldap:name> </ldap:attribute> </adm:profile> </adm:property> </adm:managed-object> opends/src/messages/messages/replication.properties
@@ -257,4 +257,6 @@ are missing due to a processing error : %s SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \ sending request to get remote monitor data SEVERE_ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE_109=An Exception was caught \ while replaying replication message : %s opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -22,9 +22,11 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.opends.messages.Message; import static org.opends.server.loggers.ErrorLogger.logError; @@ -48,22 +50,27 @@ */ private static final DebugTracer TRACER = getTracer(); private ReplicationDomain listener; private ReplicationDomain repDomain; private boolean shutdown = false; private boolean done = false; private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue; /** * Constructor for the ListenerThread. * * @param listener the Plugin that created this thread * @param repDomain the replication domain that created this thread * @param updateToReplayQueue The update messages queue we must * store messages in */ public ListenerThread(ReplicationDomain listener) public ListenerThread(ReplicationDomain repDomain, LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue) { super("Replication Listener thread " + "serverID=" + listener.serverId + " domain=" + listener.getName()); this.listener = listener; "serverID=" + repDomain.serverId + " domain=" + repDomain.getName()); this.repDomain = repDomain; this.updateToReplayQueue = updateToReplayQueue; } /** @@ -77,31 +84,50 @@ /** * Run method for this class. */ @Override public void run() { UpdateMessage msg; UpdateMessage updateMsg = null; if (debugEnabled()) { TRACER.debugInfo("Replication Listener thread starting."); } while (shutdown == false) while (!shutdown) { try { while (((msg = listener.receive()) != null) && (shutdown == false)) // Loop receiving update messages and puting them in the update message // queue while ((!shutdown) && ((updateMsg = repDomain.receive()) != null)) { listener.replay(msg); // Put update message into the queue (block until some place in the // queue is available) UpdateToReplay updateToReplay = new UpdateToReplay(updateMsg, repDomain); boolean queued = false; while (!queued && !shutdown) { // Use timedout method (offer) instead of put for being able to // shutdown the thread queued = updateToReplayQueue.offer(updateToReplay, 1L, TimeUnit.SECONDS); } if (msg == null) if (!queued) { // Shutdown requested but could not push message: ensure this one is // not lost and put it in the queue before dying updateToReplayQueue.offer(updateToReplay); } } if (updateMsg == null) shutdown = true; } catch (Exception e) { /* * catch all exceptions happening in listener.receive and * listener.replay so that the thread never dies even in case * of problems. * catch all exceptions happening in repDomain.receive so that the * thread never dies even in case of problems. */ Message message = ERR_EXCEPTION_RECEIVING_REPLICATION_MESSAGE.get( stackTraceToSingleLineString(e)); opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,19 +22,22 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.util.ArrayList; import static org.opends.server.replication.plugin. ReplicationRepairRequestControl.*; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import org.opends.messages.Message; import org.opends.server.admin.server.ConfigurationAddListener; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.server.ConfigurationDeleteListener; import org.opends.server.admin.std.server.ReplicationDomainCfg; import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; @@ -82,6 +85,8 @@ extends SynchronizationProvider<ReplicationSynchronizationProviderCfg> implements ConfigurationAddListener<ReplicationDomainCfg>, ConfigurationDeleteListener<ReplicationDomainCfg>, ConfigurationChangeListener <ReplicationSynchronizationProviderCfg>, BackupTaskListener, RestoreTaskListener, ImportTaskListener, ExportTaskListener { @@ -89,6 +94,23 @@ private static Map<DN, ReplicationDomain> domains = new HashMap<DN, ReplicationDomain>() ; /** * The queue of received update messages, to be treated by the ReplayThread * threads. */ private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue = new LinkedBlockingQueue<UpdateToReplay>(); /** * The list of ReplayThread threads. */ private static List<ReplayThread> replayThreads = new ArrayList<ReplayThread>(); /** * The configurable number of replay threads. */ private static int replayThreadNumber = 10; /** * Finds the domain for a given DN. @@ -167,7 +189,16 @@ throws ConfigException { ReplicationDomain domain; domain = new ReplicationDomain(configuration); domain = new ReplicationDomain(configuration, updateToReplayQueue); if (domains.size() == 0) { /* * Create the threads that will process incoming update messages */ createReplayThreads(); } domains.put(domain.getBaseDN(), domain); domain.start(); return domain; @@ -180,6 +211,12 @@ public static void deleteDomain(DN dn) { ReplicationDomain domain = domains.remove(dn); // No replay threads running if no replication need if (domains.size() == 0) { stopReplayThreads(); } if (domain != null) domain.shutdown(); } @@ -200,6 +237,12 @@ configuration.addReplicationDomainAddListener(this); configuration.addReplicationDomainDeleteListener(this); // Register as a root configuration listener so that we can be notified if // number of replay threads is changed and apply changes. configuration.addReplicationChangeListener(this); replayThreadNumber = configuration.getNumUpdateReplayThreads(); // Create the list of domains that are already defined. for (String name : configuration.listReplicationDomains()) { @@ -228,6 +271,39 @@ } /** * Create the threads that will wait for incoming update messages. */ private synchronized static void createReplayThreads() { replayThreads.clear(); for (int i = 0; i < replayThreadNumber; i++) { ReplayThread replayThread = new ReplayThread(updateToReplayQueue); replayThread.start(); replayThreads.add(replayThread); } } /** * Stope the threads that are waiting for incoming update messages. */ private synchronized static void stopReplayThreads() { // stop the replay threads for (ReplayThread replayThread : replayThreads) { replayThread.shutdown(); } for (ReplayThread replayThread : replayThreads) { replayThread.waitForShutdown(); } replayThreads.clear(); } /** * {@inheritDoc} */ public boolean isConfigurationAddAcceptable( @@ -431,6 +507,9 @@ @Override public void finalizeSynchronizationProvider() { // Stop replay threads stopReplayThreads(); // shutdown all the domains for (ReplicationDomain domain : domains.values()) { @@ -621,6 +700,37 @@ { return replicationServerListener; } /** * {@inheritDoc} */ public boolean isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg configuration, List<Message> unacceptableReasons) { return true; } /** * {@inheritDoc} */ public ConfigChangeResult applyConfigurationChange (ReplicationSynchronizationProviderCfg configuration) { int numUpdateRepayThread = configuration.getNumUpdateReplayThreads(); // Stop threads then restart new number of threads stopReplayThreads(); replayThreadNumber = numUpdateRepayThread; if (domains.size() > 0) { createReplayThreads(); } return new ConfigChangeResult(ResultCode.SUCCESS, false); } } opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -158,7 +158,7 @@ { /* * Parse the list of Update with dependencies and check if the dependencies * are now cleared until an Update withour dependencies is found. * are now cleared until an Update without dependencies is found. */ for (PendingChange change : dependentChanges) { opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
New file @@ -0,0 +1,142 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.opends.messages.Message; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.protocol.UpdateMessage; /** * Thread that is used to get message from the replication servers (stored * in the updates queue) and replay them in the current server. A configurable * number of this thread is created for the whole MultimasterReplication object * (i.e: these threads are shared accross the ReplicationDomain objects for * replaying the updates they receive) */ public class ReplayThread extends DirectoryThread { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private BlockingQueue<UpdateToReplay> updateToReplayQueue = null; private boolean shutdown = false; private boolean done = false; /** * Constructor for the ReplayThread. * * @param updateToReplayQueue The queue of update messages we have to replay */ public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue) { super("Replication Replay thread"); this.updateToReplayQueue = updateToReplayQueue; } /** * Shutdown this replay thread. */ public void shutdown() { shutdown = true; } /** * Run method for this class. */ @Override public void run() { if (debugEnabled()) { TRACER.debugInfo("Replication Replay thread starting."); } UpdateToReplay updateToreplay = null; while (!shutdown) { try { // Loop getting an updateToReplayQueue from the update message queue and // replaying matching changes while ( (!shutdown) && ((updateToreplay = updateToReplayQueue.poll(1L, TimeUnit.SECONDS)) != null)) { // Find replication domain for that update message UpdateMessage updateMsg = updateToreplay.getUpdateMessage(); ReplicationDomain domain = updateToreplay.getReplicationDomain(); domain.replay(updateMsg); } } catch (Exception e) { /* * catch all exceptions happening so that the thread never dies even * in case of problems. */ Message message = ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE.get( stackTraceToSingleLineString(e)); logError(message); } } done = true; if (debugEnabled()) { TRACER.debugInfo("Replication Replay thread stopping."); } } /** * Wait for the completion of this thread. */ public void waitForShutdown() { try { while (done == false) { Thread.sleep(50); } } catch (InterruptedException e) { // exit the loop if this thread is interrupted. } } } opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import static org.opends.messages.ReplicationMessages.*; @@ -51,6 +51,7 @@ import java.util.NoSuchElementException; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.Adler32; import java.util.zip.CheckedOutputStream; @@ -172,18 +173,16 @@ */ private static final DebugTracer TRACER = getTracer(); /** * on shutdown, the server will wait for existing threads to stop * during this timeout (in ms). */ private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; private ReplicationMonitor monitor; private ReplicationBroker broker; private List<ListenerThread> synchroThreads = new ArrayList<ListenerThread>(); // Thread waiting for incoming update messages for this domain and pushing // them to the global incoming update message queue for later processing by // replay threads. private ListenerThread listenerThread; // The update to replay message queue where the listener thread is going to // push incoming update messages. private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue; private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs = new TreeMap<ChangeNumber, UpdateMessage>(); private AtomicInteger numRcvdUpdates = new AtomicInteger(0); @@ -233,8 +232,6 @@ // Null when none is being processed. private IEContext ieContext = null; private int listenerThreadNumber = 10; private Collection<String> replicationServers; private DN baseDN; @@ -355,12 +352,59 @@ } /** * This thread is launched when we want to export data to another server that * has requested to be initialized with the data of our backend. */ private class ExportThread extends DirectoryThread { // Id of server that will receive updates private short target; /** * Constructor for the ExportThread. * * @param target Id of server that will receive updates */ public ExportThread(short target) { super("Export thread"); this.target = target; } /** * Run method for this class. */ public void run() { if (debugEnabled()) { TRACER.debugInfo("Export thread starting."); } try { initializeRemote(target, target, null); } catch (DirectoryException de) { // An error message has been sent to the peer // Nothing more to do locally } if (debugEnabled()) { TRACER.debugInfo("Export thread stopping."); } } } /** * Creates a new ReplicationDomain using configuration from configEntry. * * @param configuration The configuration of this ReplicationDomain. * @param updateToReplayQueue The queue for update messages to replay. * @throws ConfigException In case of invalid configuration. */ public ReplicationDomain(ReplicationDomainCfg configuration) public ReplicationDomain(ReplicationDomainCfg configuration, LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException { super("replicationDomain_" + configuration.getBaseDN()); @@ -373,6 +417,7 @@ heartbeatInterval = configuration.getHeartbeatInterval(); isolationpolicy = configuration.getIsolationPolicy(); configDn = configuration.dn(); this.updateToReplayQueue = updateToReplayQueue; /* * Modify conflicts are solved for all suffixes but the schema suffix @@ -819,10 +864,8 @@ */ public UpdateMessage receive() { UpdateMessage update = remotePendingChanges.getNextUpdate(); UpdateMessage update = null; if (update == null) { while (update == null) { InitializeRequestMessage initMsg = null; @@ -919,21 +962,13 @@ // when we are doing and export and so that a possible // closure of the socket happening when we are publishing the // entries to the remote can be handled by the other // ListenerThread when they call this method and therefore the // replay thread when they call this method and therefore the // broker.receive() method. if (initMsg != null) { try { initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(), null); } catch(DirectoryException de) { // An error message has been sent to the peer // Nothing more to do locally } } // Do this work in a thread to allow replay thread continue working ExportThread exportThread = new ExportThread(initMsg.getsenderID()); exportThread.start(); } } return update; @@ -1190,10 +1225,9 @@ @Override public void run() { /* * create the threads that will wait for incoming changes. */ createListeners(); // Create the listener thread listenerThread = new ListenerThread(this, updateToReplayQueue); listenerThread.start(); while (shutdown == false) { @@ -1217,28 +1251,6 @@ } /** * create the threads that will wait for incoming changes. * TODO : should use a pool of threads shared between all the servers * TODO : need to make number of thread configurable */ private void createListeners() { synchronized (synchroThreads) { if (!shutdown) { synchroThreads.clear(); for (int i=0; i<listenerThreadNumber; i++) { ListenerThread myThread = new ListenerThread(this); myThread.start(); synchroThreads.add(myThread); } } } } /** * Shutdown this ReplicationDomain. */ public void shutdown() @@ -1246,14 +1258,8 @@ // stop the flush thread shutdown = true; synchronized (synchroThreads) { // stop the listener threads for (ListenerThread thread : synchroThreads) { thread.shutdown(); } } // Stop the listener thread listenerThread.shutdown(); synchronized (this) { @@ -1267,11 +1273,8 @@ // stop the ReplicationBroker broker.stop(); // wait for the listener thread to stop for (ListenerThread thread : synchroThreads) { thread.waitForShutdown(); } // Wait for the listener thread to stop listenerThread.waitForShutdown(); // wait for completion of the persistentServerState thread. try @@ -1315,6 +1318,10 @@ int retryCount = 10; boolean firstTry = true; // Try replay the operation, then flush (replaying) any pending operation // whose dependency has been replayed until no more left. do { try { while ((!dependency) && (!done) && (retryCount-- > 0)) @@ -1326,6 +1333,7 @@ changeNumber = OperationContext.getChangeNumber(op); ((AbstractOperation)op).run(); // Try replay the operation ResultCode result = op.getResultCode(); if (result != ResultCode.SUCCESS) @@ -1338,8 +1346,7 @@ { done = solveNamingConflict(newOp, msg); } } else if (op instanceof DeleteOperation) } else if (op instanceof DeleteOperation) { DeleteOperation newOp = (DeleteOperation) op; dependency = remotePendingChanges.checkDependencies(newOp); @@ -1347,8 +1354,7 @@ { done = solveNamingConflict(newOp, msg); } } else if (op instanceof AddOperation) } else if (op instanceof AddOperation) { AddOperation newOp = (AddOperation) op; AddMsg addMsg = (AddMsg) msg; @@ -1357,8 +1363,7 @@ { done = solveNamingConflict(newOp, addMsg); } } else if (op instanceof ModifyDNOperationBasis) } else if (op instanceof ModifyDNOperationBasis) { ModifyDNMsg newMsg = (ModifyDNMsg) msg; dependency = remotePendingChanges.checkDependencies(newMsg); @@ -1367,8 +1372,7 @@ ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op; done = solveNamingConflict(newOp, msg); } } else } else { done = true; // unknown type of operation ?! } @@ -1379,8 +1383,7 @@ // however we still need to push this change to the serverState updateError(changeNumber); } } else } else { done = true; } @@ -1399,26 +1402,22 @@ updateError(changeNumber); } } catch (ASN1Exception e) } catch (ASN1Exception e) { Message message = ERR_EXCEPTION_DECODING_OPERATION.get( String.valueOf(msg) + stackTraceToSingleLineString(e)); logError(message); } catch (LDAPException e) } catch (LDAPException e) { Message message = ERR_EXCEPTION_DECODING_OPERATION.get( String.valueOf(msg) + stackTraceToSingleLineString(e)); logError(message); } catch (DataFormatException e) } catch (DataFormatException e) { Message message = ERR_EXCEPTION_DECODING_OPERATION.get( String.valueOf(msg) + stackTraceToSingleLineString(e)); logError(message); } catch (Exception e) } catch (Exception e) { if (changeNumber != null) { @@ -1432,15 +1431,13 @@ stackTraceToSingleLineString(e), op.toString()); logError(message); updateError(changeNumber); } else } else { Message message = ERR_EXCEPTION_DECODING_OPERATION.get( String.valueOf(msg) + stackTraceToSingleLineString(e)); logError(message); } } finally } finally { if (!dependency) { @@ -1449,6 +1446,20 @@ incProcessedUpdates(); } } // Now replay any pending update that had a dependency and whose // dependency has been replayed, do that until no more updates of that // type left... msg = remotePendingChanges.getNextUpdate(); // Prepare restart of loop done = false; dependency = false; changeNumber = null; retryCount = 10; firstTry = true; } while (msg != null); } /** @@ -2224,7 +2235,7 @@ * The session to the replication server will be stopped. * The domain will not be destroyed but call to the pre-operation * methods will result in failure. * The listener threads will be destroyed. * The listener thread will be destroyed. * The monitor informations will still be accessible. */ public void disable() @@ -2232,23 +2243,14 @@ state.save(); state.clearInMemory(); disabled = true; // stop the listener threads for (ListenerThread thread : synchroThreads) { thread.shutdown(); } broker.stop(); // this will cut the session and wake-up the listeners for (ListenerThread thread : synchroThreads) { try { thread.join(SHUTDOWN_JOIN_TIMEOUT); } catch (InterruptedException e) { // ignore } } // Stop the listener thread listenerThread.shutdown(); broker.stop(); // This will cut the session and wake up the listener // Wait for the listener thread to stop listenerThread.waitForShutdown(); } /** @@ -2265,7 +2267,6 @@ state.loadState(); disabled = false; try { generationId = loadGenerationId(); @@ -2288,7 +2289,9 @@ broker.start(replicationServers); createListeners(); // Create the listener thread listenerThread = new ListenerThread(this, updateToReplayQueue); listenerThread.start(); } /** opends/src/server/org/opends/server/replication/plugin/UpdateToReplay.java
New file @@ -0,0 +1,73 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import org.opends.server.replication.protocol.UpdateMessage; /** * This is a bag class to hold an update to replay in the queue of updates to * be replayed by the replay threads. * It associates an update message to replay with the matching * ReplicationDomain. */ public class UpdateToReplay { private UpdateMessage updateMessage = null; private ReplicationDomain replicationDomain = null; /** * Construct the object associating the update message with the replication * domain that must be used to replay it (the on it comes from). * @param updateMessage The update message * @param replicationDomain The replication domain to use for replaying the * change from the update message */ public UpdateToReplay(UpdateMessage updateMessage, ReplicationDomain replicationDomain) { this.updateMessage = updateMessage; this.replicationDomain = replicationDomain; } /** * Getter for update message. * @return The update message */ public UpdateMessage getUpdateMessage() { return updateMessage; } /** * Getter for replication domain. * @return The replication domain */ public ReplicationDomain getReplicationDomain() { return replicationDomain; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
@@ -420,7 +420,7 @@ * To increase the risks of failures a loop of add/del/add is done. */ @SuppressWarnings("unchecked") @Test(enabled=false, groups="slow") @Test(enabled=true, groups="slow") public void addDelAddDependencyTest() throws Exception { ReplicationServer replServer = null; @@ -552,7 +552,7 @@ * issuing a set of Add operation followed by a modrdn of the added entry. */ @SuppressWarnings("unchecked") @Test(enabled=false, groups="slow") @Test(enabled=true, groups="slow") public void addModdnDependencyTest() throws Exception { ReplicationServer replServer = null;