opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -304,4 +304,25 @@ { return list.keySet().iterator(); } /** * Check that all the ChangeNumbers in the covered serverState are also in * this serverState. * * @param covered The ServerState that needs to be checked. * @return A boolean indicating if this ServerState covers the ServerState * given in parameter. */ public boolean cover(ServerState covered) { for (ChangeNumber coveredChange : covered.list.values()) { ChangeNumber change = this.list.get(coveredChange.getServerId()); if ((change == null) || (change.older(coveredChange))) { return false; } } return true; } } opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -82,6 +82,68 @@ /** * Finds the domain for a given DN. * * @param dn The DN for which the domain must be returned. * @param op An optional operation for which the check is done. * Can be null is the request has no associated operation. * @return The domain for this DN. */ public static ReplicationDomain findDomain(DN dn, Operation op) { /* * Don't run the special replication code on Operation that are * specifically marked as don't synchronize. */ if ((op != null) && op.dontSynchronize()) return null; ReplicationDomain domain = null; DN temp = dn; do { domain = domains.get(temp); temp = temp.getParentDNInSuffix(); if (temp == null) { break; } } while (domain == null); return domain; } /** * Creates a new domain from its configEntry, do the * necessary initialization and starts it so that it is * fully operational when this method returns. * @param configuration The entry whith the configuration of this domain. * @return The domain created. * @throws ConfigException When the configuration is not valid. */ public static ReplicationDomain createNewDomain( MultimasterDomainCfg configuration) throws ConfigException { ReplicationDomain domain; domain = new ReplicationDomain(configuration); domains.put(domain.getBaseDN(), domain); domain.start(); return domain; } /** * Deletes a domain. * @param dn : the base DN of the domain to delete. */ public static void deleteDomain(DN dn) { ReplicationDomain domain = domains.remove(dn); if (domain != null) domain.shutdown(); } /** * {@inheritDoc} */ @Override @@ -149,23 +211,6 @@ } /** * Creates a new domain from its configEntry, do the * necessary initialization and starts it so that it is * fully operational when this method returns. * @param configuration The entry whith the configuration of this domain. * @throws ConfigException When the configuration is not valid. */ private void createNewDomain( MultimasterDomainCfg configuration) throws ConfigException { ReplicationDomain domain; domain = new ReplicationDomain(configuration); domains.put(domain.getBaseDN(), domain); domain.start(); } /** * {@inheritDoc} */ @Override @@ -354,55 +399,6 @@ } /** * Finds the domain for a given DN. * * @param dn The DN for which the domain must be returned. * @param op An optional operation for which the check is done. * Can be null is the request has no associated operation. * @return The domain for this DN. */ public static ReplicationDomain findDomain(DN dn, Operation op) { /* * Don't run the special replication code on Operation that are * specifically marked as don't synchronize. */ if ((op != null) && op.dontSynchronize()) return null; ReplicationDomain domain = null; DN temp = dn; do { domain = domains.get(temp); temp = temp.getParentDNInSuffix(); if (temp == null) { break; } } while (domain == null); return domain; } /** * Generic code for all the postOperation entry point. * * @param operation The Operation for which the post-operation is called. * @param dn The Dn for which the post-operation is called. */ private void genericPostOperation(Operation operation, DN dn) { ReplicationDomain domain = findDomain(dn, operation); if (domain == null) return; domain.synchronize(operation); return; } /** * This method is called whenever the server detects a modification * of the schema done by directly modifying the backing files * of the schema backend. @@ -535,10 +531,7 @@ public ConfigChangeResult applyConfigurationDelete( MultimasterDomainCfg configuration) { DN dn = configuration.getReplicationDN(); ReplicationDomain domain = domains.remove(dn); if (domain != null) domain.shutdown(); deleteDomain(configuration.getReplicationDN()); return new ConfigChangeResult(ResultCode.SUCCESS, false); } @@ -551,6 +544,23 @@ { return true; } /** * Generic code for all the postOperation entry point. * * @param operation The Operation for which the post-operation is called. * @param dn The Dn for which the post-operation is called. */ private void genericPostOperation(Operation operation, DN dn) { ReplicationDomain domain = findDomain(dn, operation); if (domain == null) return; domain.synchronize(operation); return; } } opends/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -27,19 +27,24 @@ package org.opends.server.replication.plugin; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Operation; /** * This class is use to store the list of operations currently * This class is use to store an operation currently * in progress and not yet committed in the database. */ public class PendingChange public class PendingChange implements Comparable<PendingChange> { private ChangeNumber changeNumber; private boolean committed; private UpdateMessage msg; private Operation op; private ServerState dependencyState = null; private DN targetDN = null; /** * Construct a new PendingChange. @@ -121,4 +126,66 @@ this.op = op; } /** * Add the given ChangeNumber in the list of dependencies of this * PendingChange. * * @param changeNumber The ChangeNumber to add in the list of dependencies * of this PendingChange. */ public void addDependency(ChangeNumber changeNumber) { if (dependencyState == null) { dependencyState = new ServerState(); } dependencyState.update(changeNumber); } /** * Check if the given ServerState covers the dependencies of this * PendingChange. * * @param state The ServerState for which dependencies must be checked, * * @return A boolean indicating if the given ServerState covers the * dependencies of this PendingChange. */ public boolean dependenciesIsCovered(ServerState state) { return state.cover(dependencyState); } /** * Get the Target DN of this message. * * @return The target DN of this message. */ public DN getTargetDN() { synchronized (this) { if (targetDN != null) return targetDN; else { try { targetDN = DN.decode(msg.getDn()); } catch (DirectoryException e) { } } return targetDN; } } /** * {@inheritDoc} */ public int compareTo(PendingChange o) { return this.getChangeNumber().compareTo(o.getChangeNumber()); } } opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
New file @@ -0,0 +1,559 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.util.NoSuchElementException; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.OperationContext; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.types.DN; import org.opends.server.types.Operation; /** * * This class is use to store the list of operations currently * in progress and not yet committed in the database. */ public class PendingChanges { /** * A map used to store the pending changes. */ private SortedMap<ChangeNumber, PendingChange> pendingChanges = new TreeMap<ChangeNumber, PendingChange>(); /** * A sorted set containing the list of PendingChanges that have * not been replayed correctly because they are dependent on * another change to be completed. */ private SortedSet<PendingChange> dependentChanges = new TreeSet<PendingChange>(); /** * The ChangeNumberGenerator to use to create new unique ChangeNumbers * for each operation done on the replication domain. */ private ChangeNumberGenerator changeNumberGenerator; /** * The Replicationbroker that will be used to send UpdateMessage. */ private ReplicationBroker broker; /** * The ServerState that will be updated when UpdateMessage are committed. */ private ServerState state; /** * Creates a new PendingChanges using the provided ChangeNumberGenerator. * * @param changeNumberGenerator The ChangeNumberGenerator to use to create * new unique ChangeNumbers. * @param broker The Replicationbroker that will be used to send * UpdateMessage. * @param state The ServerState that will be updated when UpdateMessage * are committed. */ public PendingChanges( ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker, ServerState state) { this.changeNumberGenerator = changeNumberGenerator; this.broker = broker; this.state = state; } /** * Remove and return an update form the pending changes list. * * @param changeNumber The ChangeNumber of the update to remove. * * @return The UpdateMessage that was just removed. */ public synchronized UpdateMessage remove(ChangeNumber changeNumber) { return pendingChanges.remove(changeNumber).getMsg(); } /** * Returns the number of update currently in the list. * * @return The number of update currently in the list. */ public synchronized int size() { return pendingChanges.size(); } /** * Mark an update message as committed. * * @param changeNumber The ChangeNumber of the update message that must be * set as committed. * @param op The Operation associated of the update when the * update is a local update. * @param msg The message associated to the update when the update * was received from a replication server. */ public synchronized void commit(ChangeNumber changeNumber, Operation op, UpdateMessage msg) { PendingChange curChange = pendingChanges.get(changeNumber); if (curChange == null) { throw new NoSuchElementException(); } curChange.setCommitted(true); if (op.isSynchronizationOperation()) curChange.setOp(op); else curChange.setMsg(msg); } /** * Mark an update message as committed. * * @param changeNumber The ChangeNumber of the update message that must be * set as committed. */ public synchronized void commit(ChangeNumber changeNumber) { PendingChange curChange = pendingChanges.get(changeNumber); if (curChange == null) { throw new NoSuchElementException(); } curChange.setCommitted(true); } /** * Add a new UpdateMessage to the pending list from the provided local * operation. * * @param operation The local operation for which an UpdateMessage mus * be added in the pending list. * @return The ChangeNumber now associated to the operation. */ public synchronized ChangeNumber putLocalOperation(Operation operation) { ChangeNumber changeNumber; changeNumber = changeNumberGenerator.NewChangeNumber(); PendingChange change = new PendingChange(changeNumber, operation, null); pendingChanges.put(changeNumber, change); return changeNumber; } /** * Add a new UpdateMessage that was received from the replication server * to the pendingList. * * @param update The UpdateMessage that was received from the replication * server and that will be added to the pending list. */ public synchronized void putRemoteUpdate(UpdateMessage update) { ChangeNumber changeNumber = update.getChangeNumber(); changeNumberGenerator.adjust(changeNumber); pendingChanges.put(changeNumber, new PendingChange(changeNumber, null, update)); } /** * Push all committed local changes to the replicationServer service. * * @return The number of pushed updates. */ public synchronized int pushCommittedChanges() { int numSentUpdates = 0; if (pendingChanges.isEmpty()) return numSentUpdates; ChangeNumber firstChangeNumber = pendingChanges.firstKey(); PendingChange firstChange = pendingChanges.get(firstChangeNumber); while ((firstChange != null) && firstChange.isCommitted()) { if ((firstChange.getOp() != null ) && (firstChange.getOp().isSynchronizationOperation() == false)) { numSentUpdates++; broker.publish(firstChange.getMsg()); } state.update(firstChangeNumber); pendingChanges.remove(firstChangeNumber); if (pendingChanges.isEmpty()) { firstChange = null; } else { firstChangeNumber = pendingChanges.firstKey(); firstChange = pendingChanges.get(firstChangeNumber); } } return numSentUpdates; } /** * Get the first update in the list that have some dependencies cleared. * * @return The UpdateMessage to be handled. */ public synchronized UpdateMessage getNextUpdate() { /* * Parse the list of Update with dependencies and check if the dependencies * are now cleared until an Update withour dependencies is found. */ for (PendingChange change : dependentChanges) { if (change.dependenciesIsCovered(state)) { dependentChanges.remove(change); return change.getMsg(); } } return null; } /** * Check if the given AddOperation has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * AddOperation depends on * * - DeleteOperation done on the same DN * - ModifyDnOperation with the same target DN as the ADD DN * - ModifyDnOperation with new DN equals to the ADD DN parent * - AddOperation done on the parent DN of the ADD DN * * @param op The AddOperation to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(AddOperation op) { boolean hasDependencies = false; DN targetDn = op.getEntryDN(); ChangeNumber changeNumber = OperationContext.getChangeNumber(op); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) { /* * Check is the operation to be run is a deleteOperation on the * same DN. */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof AddMsg) { /* * Check if the operation to be run is an addOperation on a * parent of the current AddOperation. */ if (pendingChange.getTargetDN().isAncestorOf(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof ModifyDNMsg) { /* * Check if the operation to be run is ModifyDnOperation with * the same target DN as the ADD DN * or a ModifyDnOperation with new DN equals to the ADD DN parent */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } else { ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg(); if (pendingModDn.newDNIsParent(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } } } } } return hasDependencies; } /** * Check if the given ModifyOperation has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * * ModifyOperation depends on * - AddOperation done on the same DN * * @param op The ModifyOperation to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(ModifyOperation op) { boolean hasDependencies = false; DN targetDn = op.getEntryDN(); ChangeNumber changeNumber = OperationContext.getChangeNumber(op); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof AddMsg) { /* * Check if the operation to be run is an addOperation on a * same DN. */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } } } } return hasDependencies; } /** * Mark the first pendingChange as dependent on the second PendingChange. * @param dependentChange The PendingChange that depend on the second * PendingChange. * @param pendingChange The PendingChange on which the first PendingChange * is dependent. */ private void addDependency( PendingChange dependentChange, PendingChange pendingChange) { dependentChange.addDependency(pendingChange.getChangeNumber()); dependentChanges.add(dependentChange); } /** * Check if the given ModifyDNMsg has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * * Modify DN Operation depends on * - AddOperation done on the same DN as the target DN of the MODDN operation * - AddOperation done on the new parent of the MODDN operation * - DeleteOperation done on the new DN of the MODDN operation * - ModifyDNOperation done from the new DN of the MODDN operation * * @param msg The ModifyDNMsg to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(ModifyDNMsg msg) { boolean hasDependencies = false; ChangeNumber changeNumber = msg.getChangeNumber(); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; DN targetDn = change.getTargetDN(); for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) { // Check if the target of the Delete is the same // as the new DN of this ModifyDN if (msg.newDNIsEqual(pendingChange.getTargetDN())) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof AddMsg) { // Check if the Add Operation was done on the new parent of // the MODDN operation if (msg.newParentIsEqual(pendingChange.getTargetDN())) { hasDependencies = true; addDependency(change, pendingChange); } // Check if the AddOperation was done on the same DN as the // target DN of the MODDN operation if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof ModifyDNMsg) { // Check if the ModifyDNOperation was done from the new DN of // the MODDN operation if (msg.newDNIsEqual(pendingChange.getTargetDN())) { hasDependencies = true; addDependency(change, pendingChange); } } } } } return hasDependencies; } /** * Check if the given DeleteOperation has some dependencies on any * currently running previous operation. * Update the dependency list in the associated PendingChange if * there are some dependencies. * * DeleteOperation depends on * - DeleteOperation done on children DN * - ModifyDnOperation with target DN that are children of the DEL DN * - AddOperation done on the same DN * * * @param op The DeleteOperation to be checked. * * @return A boolean indicating if this operation has some dependencies. */ public synchronized boolean checkDependencies(DeleteOperation op) { boolean hasDependencies = false; DN targetDn = op.getEntryDN(); ChangeNumber changeNumber = OperationContext.getChangeNumber(op); PendingChange change = pendingChanges.get(changeNumber); if (change == null) return false; for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getChangeNumber().older(changeNumber)) { UpdateMessage pendingMsg = pendingChange.getMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) { /* * Check if the operation to be run is a deleteOperation on a * children of the current DeleteOperation. */ if (pendingChange.getTargetDN().isDescendantOf(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof AddMsg) { /* * Check if the operation to be run is an addOperation on a * parent of the current DeleteOperation. */ if (pendingChange.getTargetDN().equals(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } else if (pendingMsg instanceof ModifyDNMsg) { /* * Check if the operation to be run is an ModifyDNOperation * on a children of the current DeleteOperation */ if (pendingChange.getTargetDN().isDescendantOf(targetDn)) { hasDependencies = true; addDependency(change, pendingChange); } } } } } return hasDependencies; } } opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -49,6 +49,7 @@ import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; @@ -128,17 +129,14 @@ { private ReplicationMonitor monitor; private ChangeNumberGenerator changeNumberGenerator; private ReplicationBroker broker; private List<ListenerThread> synchroThreads = new ArrayList<ListenerThread>(); private final SortedMap<ChangeNumber, PendingChange> pendingChanges = new TreeMap<ChangeNumber, PendingChange>(); private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs = new TreeMap<ChangeNumber, UpdateMessage>(); private int numRcvdUpdates = 0; private int numSentUpdates = 0; private AtomicInteger numRcvdUpdates = new AtomicInteger(0); private AtomicInteger numSentUpdates = new AtomicInteger(0); private AtomicInteger numProcessedUpdates = new AtomicInteger(); private int debugCount = 0; private PersistentServerState state; @@ -150,6 +148,19 @@ private int maxSendDelay = 0; /** * This object is used to store the list of update currently being * done on the local database. * It contain both the update that are done directly on this server * and the updates that was done on another server, transmitted * by the replication server and that are currently replayed. * It is usefull to make sure that dependencies between operations * are correctly fullfilled, that the local operations are sent in a * correct order to the replication server and that the ServerState * is not updated too early. */ private PendingChanges pendingChanges; /** * The time in milliseconds between heartbeats from the replication * server. Zero means heartbeats are off. */ @@ -245,7 +256,7 @@ private ConfigEntry backendConfigEntry; private List<DN> branches = new ArrayList<DN>(0); private int listenerThreadNumber = 1; private int listenerThreadNumber = 10; private boolean receiveStatus = true; private Collection<String> replicationServers; @@ -317,12 +328,6 @@ DirectoryServer.registerMonitorProvider(monitor); /* * ChangeNumberGenerator is used to create new unique ChangeNumbers * for each operation done on the replication domain. */ changeNumberGenerator = new ChangeNumberGenerator(serverId, state); /* * create the broker object used to publish and receive changes */ try @@ -348,6 +353,14 @@ */ } /* * ChangeNumberGenerator is used to create new unique ChangeNumbers * for each operation done on the replication domain. */ pendingChanges = new PendingChanges(new ChangeNumberGenerator(serverId, state), broker, state); // listen for changes on the configuration configuration.addChangeListener(this); } @@ -444,11 +457,24 @@ * of the parent entry */ String parentUid = ctx.getParentUid(); // root entry have no parent, // there is no need to check for it. if (parentUid != null) { // There is a potential of perfs improvement here // if we could avoid the following parent entry retrieval DN parentDnFromCtx = findEntryDN(ctx.getParentUid()); if (parentDnFromCtx != null) if (parentDnFromCtx == null) { // The parent does not exist with the specified unique id // stop the operation with NO_SUCH_OBJECT and let the // conflict resolution or the dependency resolution solve this. addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); return new SynchronizationProviderResult(false); } else { DN entryDN = addOperation.getEntryDN(); DN parentDnFromEntryDn = entryDN.getParentDNInSuffix(); @@ -463,6 +489,7 @@ } } } } return new SynchronizationProviderResult(true); } @@ -633,9 +660,12 @@ */ public UpdateMessage receive() { UpdateMessage update = pendingChanges.getNextUpdate(); if (update == null) { synchronized (broker) { UpdateMessage update = null; while (update == null) { ReplicationMessage msg; @@ -653,11 +683,6 @@ AckMessage ack = (AckMessage) msg; receiveAck(ack); } else if (msg instanceof UpdateMessage) { update = (UpdateMessage) msg; receiveUpdate(update); } else if (msg instanceof InitializeRequestMessage) { // Another server requests us to provide entries @@ -672,7 +697,8 @@ { // Returns an error message to notify the sender int msgID = de.getMessageID(); ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), msgID, de.getMessage()); broker.publish(errorMsg); } @@ -690,9 +716,11 @@ { // Return an error message to notify the sender int msgID = de.getMessageID(); ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), msgID, de.getMessage()); log(getMessage(msgID, backend.getBackendID()) + de.getMessage()); log(getMessage(msgID, backend.getBackendID()) + de.getMessage()); broker.publish(errorMsg); } } @@ -708,15 +736,21 @@ abandonImportExport((ErrorMessage)msg); } } } catch (SocketTimeoutException e) else if (msg instanceof UpdateMessage) { update = (UpdateMessage) msg; receiveUpdate(update); } } catch (SocketTimeoutException e) { // just retry } } } } return update; } } /** * Do the necessary processing when an UpdateMessage was received. @@ -725,21 +759,8 @@ */ public void receiveUpdate(UpdateMessage update) { ChangeNumber changeNumber = update.getChangeNumber(); synchronized (pendingChanges) { if (pendingChanges.containsKey(changeNumber)) { /* * This should never happen, * TODO log error and throw exception */ } pendingChanges.put(changeNumber, new PendingChange(changeNumber, null, update)); numRcvdUpdates++; } pendingChanges.putRemoteUpdate(update); numRcvdUpdates.incrementAndGet(); } /** @@ -752,10 +773,9 @@ UpdateMessage update; ChangeNumber changeNumber = ack.getChangeNumber(); synchronized (pendingChanges) synchronized (waitingAckMsgs) { update = waitingAckMsgs.get(changeNumber); waitingAckMsgs.remove(changeNumber); update = waitingAckMsgs.remove(changeNumber); } if (update != null) { @@ -798,8 +818,6 @@ * This is an operation type that we do not know about * It should never happen. */ synchronized (pendingChanges) { pendingChanges.remove(curChangeNumber); int msgID = MSGID_UNKNOWN_TYPE; String message = getMessage(msgID, op.getOperationType().toString()); @@ -809,16 +827,15 @@ return; } } } synchronized(pendingChanges) { if (result == ResultCode.SUCCESS) { PendingChange curChange = pendingChanges.get(curChangeNumber); if (curChange == null) try { // This should never happen pendingChanges.commit(curChangeNumber, op, msg); } catch (NoSuchElementException e) { int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING; String message = getMessage(msgID, curChangeNumber.toString(), op.toString()); @@ -827,20 +844,17 @@ message, msgID); return; } curChange.setCommitted(true); if (op.isSynchronizationOperation()) curChange.setOp(op); else curChange.setMsg(msg); if (msg != null && isAssured) { // Add the assured message to the list of those whose acknowledgements // we are awaiting. synchronized (waitingAckMsgs) { // Add the assured message to the list of update that are // waiting acknowledgements waitingAckMsgs.put(curChangeNumber, msg); } } } else if (!op.isSynchronizationOperation()) { // Remove an unsuccessful non-replication operation from the pending @@ -851,8 +865,8 @@ } } pushCommittedChanges(); } int pushedChanges = pendingChanges.pushCommittedChanges(); numSentUpdates.addAndGet(pushedChanges); // Wait for acknowledgement of an assured message. if (msg != null && isAssured) @@ -880,7 +894,7 @@ */ public int getNumRcvdUpdates() { return numRcvdUpdates; return numRcvdUpdates.get(); } /** @@ -890,11 +904,11 @@ */ public int getNumSentUpdates() { return numSentUpdates; return numSentUpdates.get(); } /** * get the number of updates in the pending list. * Get the number of updates in the pending list. * * @return The number of updates in the pending list */ @@ -1052,46 +1066,64 @@ { Operation op = null; boolean done = false; boolean dependency = false; ChangeNumber changeNumber = null; int retryCount = 10; boolean firstTry = true; try { while (!done && retryCount-- > 0) while ((!dependency) && (!done) && (retryCount-- > 0)) { op = msg.createOperation(conn); op.setInternalOperation(true); op.setSynchronizationOperation(true); changeNumber = OperationContext.getChangeNumber(op); if (changeNumber != null) changeNumberGenerator.adjust(changeNumber); op.run(); ResultCode result = op.getResultCode(); if (result != ResultCode.SUCCESS) { if (op instanceof ModifyOperation) { ModifyOperation newOp = (ModifyOperation) op; dependency = pendingChanges.checkDependencies(newOp); if (!dependency) { done = solveNamingConflict(newOp, msg); } } else if (op instanceof DeleteOperation) { DeleteOperation newOp = (DeleteOperation) op; dependency = pendingChanges.checkDependencies(newOp); if ((!dependency) && (!firstTry)) { done = solveNamingConflict(newOp, msg); } } else if (op instanceof AddOperation) { AddOperation newOp = (AddOperation) op; dependency = pendingChanges.checkDependencies(newOp); if (!dependency) { done = solveNamingConflict(newOp, msg); } else if (op instanceof ModifyDNOperation) } } else if (op instanceof ModifyDNOperation) { ModifyDNMsg newMsg = (ModifyDNMsg) msg; dependency = pendingChanges.checkDependencies(newMsg); if (!dependency) { ModifyDNOperation newOp = (ModifyDNOperation) op; done = solveNamingConflict(newOp, msg); } } else { done = true; // unknown type of operation ?! @@ -1108,9 +1140,10 @@ { done = true; } firstTry = false; } if (!done) if (!done && !dependency) { // Continue with the next change but the servers could now become // inconsistent. @@ -1119,6 +1152,7 @@ String message = getMessage(msgID, op.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); updateError(changeNumber); } } @@ -1177,11 +1211,14 @@ } finally { if (!dependency) { if (msg.isAssured()) ack(msg.getChangeNumber()); incProcessedUpdates(); } } } /** * This method is called when an error happens while replaying @@ -1193,12 +1230,9 @@ */ public void updateError(ChangeNumber changeNumber) { synchronized (pendingChanges) { PendingChange change = pendingChanges.get(changeNumber); change.setCommitted(true); pushCommittedChanges(); } pendingChanges.commit(changeNumber); int pushedChanges = pendingChanges.pushCommittedChanges(); numSentUpdates.addAndGet(pushedChanges); } /** @@ -1210,15 +1244,7 @@ */ private ChangeNumber generateChangeNumber(Operation operation) { ChangeNumber changeNumber; changeNumber = changeNumberGenerator.NewChangeNumber(); PendingChange change = new PendingChange(changeNumber, operation, null); synchronized(pendingChanges) { pendingChanges.put(changeNumber, change); } return changeNumber; return pendingChanges.putLocalOperation(operation); } @@ -1604,42 +1630,6 @@ } /** * Push all committed local changes to the replicationServer service. * PRECONDITION : The pendingChanges lock must be held before calling * this method. */ private void pushCommittedChanges() { if (pendingChanges.isEmpty()) return; ChangeNumber firstChangeNumber = pendingChanges.firstKey(); PendingChange firstChange = pendingChanges.get(firstChangeNumber); while ((firstChange != null) && firstChange.isCommitted()) { if ((firstChange.getOp() != null ) && (firstChange.getOp().isSynchronizationOperation() == false)) { numSentUpdates++; broker.publish(firstChange.getMsg()); } state.update(firstChangeNumber); pendingChanges.remove(firstChangeNumber); if (pendingChanges.isEmpty()) { firstChange = null; } else { firstChangeNumber = pendingChanges.firstKey(); firstChange = pendingChanges.get(firstChangeNumber); } } } /** * Get the maximum receive window size. * * @return The maximum receive window size. opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -139,6 +139,7 @@ for (Attribute a : userAttributes) elems.add(new LDAPAttribute(a).encode()); if (operationalAttributes != null) for (Attribute a : operationalAttributes) elems.add(new LDAPAttribute(a).encode()); opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -35,6 +35,8 @@ import org.opends.server.protocols.asn1.ASN1OctetString; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Operation; /** @@ -267,4 +269,86 @@ this.newRDN = newRDN; } /** * Check if this MSG will change the DN of the target entry to be * the same as the dn given as a parameter. * @param targetDn the DN to use when checking if this MSG will change * the DN of the entry to a given DN. * @return A boolean indicating if the modify DN MSG will change the DN of * the target entry to be the same as the dn given as a parameter. */ public boolean newDNIsParent(DN targetDn) { try { String newStringDN = newRDN + "," + newSuperior; DN newDN = DN.decode(newStringDN); if (newDN.isAncestorOf(targetDn)) return true; else return false; } catch (DirectoryException e) { // The DN was not a correct DN, and therefore does not a parent of the // DN given as a parameter. return false; } } /** * Check if the new dn of this ModifyDNMsg is the same as the targetDN * given in parameter. * * @param targetDN The targetDN to use to check for equality. * * @return A boolean indicating if the targetDN if the same as the new DN of * the ModifyDNMsg. */ public boolean newDNIsEqual(DN targetDN) { try { String newStringDN = newRDN + "," + newSuperior; DN newDN = DN.decode(newStringDN); if (newDN.equals(targetDN)) return true; else return false; } catch (DirectoryException e) { // The DN was not a correct DN, and therefore does not match the // DN given as a parameter. return false; } } /** * Check if the new parent of the modifyDNMsg is the same as the targetDN * given in parameter. * * @param targetDN the targetDN to use when checking equality. * * @return A boolean indicating if the new parent of the modifyDNMsg is the * same as the targetDN. */ public boolean newParentIsEqual(DN targetDN) { try { DN newSuperiorDN = DN.decode(newSuperior); if (newSuperiorDN.equals(targetDN)) return true; else return false; } catch (DirectoryException e) { // The newsuperior was not a correct DN, and therefore does not match the // DN given as a parameter. return false; } } } opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
@@ -90,7 +90,7 @@ * @throws DataFormatException if the encoded byte array is not valid. * @throws UnsupportedEncodingException if UTF-8 is not supprted. */ public UpdateMessage(byte[] in) throws DataFormatException, protected UpdateMessage(byte[] in) throws DataFormatException, UnsupportedEncodingException { /* read the changeNumber */ opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -238,11 +238,14 @@ {} } if ( (recentChangeNumber != null) && (recentChangeNumber.getTimeSec() - changeNumber.getTimeSec() < 2)) if ( (recentChangeNumber != null) && (changeNumber != null)) { if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) || ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20)) { flush(); } } return new ReplicationIterator(serverId, db, changeNumber); } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -221,6 +221,7 @@ newSocket = listenSocket.accept(); newSocket.setReceiveBufferSize(1000000); newSocket.setTcpNoDelay(true); newSocket.setKeepAlive(true); ServerHandler handler = new ServerHandler( new SocketSession(newSocket), queueSize); handler.start(null, serverId, serverURL, rcvWindow, this); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
New file @@ -0,0 +1,553 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2007 Sun Microsystems, Inc. */ package org.opends.server.replication; import static org.testng.Assert.*; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.ServerSocket; import java.util.LinkedList; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import org.opends.server.TestCaseUtils; import org.opends.server.api.SynchronizationProvider; import org.opends.server.core.DirectoryServer; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.plugin.DomainFakeCfg; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.plugin.ReplicationBroker; import org.opends.server.replication.plugin.ReplicationDomain; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.util.TimeThread; import org.testng.annotations.*; /** * Test that the dependencies are computed correctly when replaying * sequences of operations that requires to follow a given order * such as : ADD an entry, ADD a children entry. */ public class DependencyTest extends ReplicationTestCase { private static final String BASEDN_STRING = "dc=example,dc=com"; /** * Check that a sequence of dependents adds and mods is correctly ordered: * Using a deep dit : * dc=example,dc=com * | * dc=dependency1 * | * dc=dependency2 * | * dc=dependency2 * | * * | * dc=dependencyN * This test sends a sequence of interleaved ADD operations and MODIFY * operations to build such a dit. * * Then test that the sequence of Delete necessary to remove * all those entries is also correctly ordered. */ @SuppressWarnings("unchecked") @Test(groups="slow") public void addModDelDependencyTest() throws Exception { ReplicationServer replServer = null; ReplicationDomain domain = null; DN baseDn = DN.decode(BASEDN_STRING); SynchronizationProvider replicationPlugin = null; short brokerId = 2; short serverId = 1; short replServerId = 1; int AddSequenceLength = 30; cleanDB(); try { /* * FIRST PART : * Check that a sequence of dependent ADD is correctly ordered. * * - Create replication server * - Send sequence of ADD messages to the replication server * - Configure replication server * - check that the last entry has been correctly added */ String entryldif = "dn:" + BASEDN_STRING + "\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryuuid: " + stringUID(1) + "\n"; Entry entry = TestCaseUtils.entryFromLdifString(entryldif); AttributeType uidType = DirectoryServer.getSchema().getAttributeType("entryuuid"); // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int replServerPort = socket.getLocalPort(); socket.close(); replicationPlugin = new MultimasterReplication(); DirectoryServer.registerSynchronizationProvider(replicationPlugin); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(replServerPort, "addModDeldependency", 0, replServerId, 0, AddSequenceLength*5+100, null); replServer = new ReplicationServer(conf); ReplicationBroker broker = openReplicationSession(baseDn, brokerId, 1000, replServerPort, 1000, false); TimeThread.sleep(2000); // send a sequence of add operation String addDn = BASEDN_STRING; ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L); int sequence; for (sequence = 1; sequence<=AddSequenceLength; sequence ++) { entry.removeAttribute(uidType); entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)), new LinkedList<AttributeValue>()); addDn = "dc=dependency" + sequence + "," + addDn; AddMsg addMsg = new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1), stringUID(sequence), entry.getObjectClassAttribute(), entry.getAttributes(), null ); broker.publish(addMsg); ModifyMsg modifyMsg = new ModifyMsg(gen.NewChangeNumber(), DN.decode(addDn), generatemods("description", "test"), stringUID(sequence+1)); broker.publish(modifyMsg); } // configure and start replication of dc=example,dc=com on the server SortedSet<String> replServers = new TreeSet<String>(); replServers.add("localhost:"+replServerPort); DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, serverId, replServers); domainConf.setHeartbeatInterval(100000); domain = MultimasterReplication.createNewDomain(domainConf); // check that last entry in sequence got added. Entry lastEntry = getEntry(DN.decode(addDn), 30000, true); assertNotNull(lastEntry, "The last entry of the ADD sequence was not added."); // Check that all the modify have been replayed // (all the entries should have a description). addDn = BASEDN_STRING; for (sequence = 1; sequence<=AddSequenceLength; sequence ++) { addDn = "dc=dependency" + sequence + "," + addDn; boolean found = checkEntryHasAttribute(DN.decode(addDn), "description", "test", 10000, true); if (!found) { fail("The modification was not replayed on entry " + addDn); } } /* * SECOND PART * * Now check that the dependencies between delete are correctly * managed. * * disable the domain while we publish the delete message to * to replication server so that when we enable it it receives the * delete operation in bulk. */ domain.disable(); Thread.sleep(2000); // necesary because disable does not wait // for full termination of all threads. (issue 1571) DN deleteDN = DN.decode(addDn); while (sequence-->1) { DeleteMsg delMsg = new DeleteMsg(deleteDN.toString(), gen.NewChangeNumber(), stringUID(sequence + 1)); broker.publish(delMsg); deleteDN = deleteDN.getParent(); } domain.enable(); // check that entry just below the base entry was deleted. // (we can't delete the base entry because some other tests might // have added other children) DN node1 = DN.decode("dc=dependency1," + BASEDN_STRING); Entry baseEntry = getEntry(node1, 30000, false); assertNull(baseEntry, "The last entry of the DEL sequence was not deleted."); } finally { if (replServer != null) replServer.shutdown(); if (domain != null) MultimasterReplication.deleteDomain(baseDn); if (replicationPlugin != null) DirectoryServer.deregisterSynchronizationProvider(replicationPlugin); } } /** * Clean the database and replace with a single entry. * * @throws FileNotFoundException * @throws IOException * @throws Exception */ private void cleanDB() throws FileNotFoundException, IOException, Exception { String baseentryldif = "dn:" + BASEDN_STRING + "\n" + "objectClass: top\n" + "objectClass: domain\n" + "dc: example\n" + "entryuuid: " + stringUID(1) + "\n"; // Initialization : // Load the database with a single entry : String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = buildRoot + File.separator + "build" + File.separator + "unit-tests" + File.separator + "package"+ File.separator + "addModDelDependencyTest"; OutputStream out = new FileOutputStream(new File(path)); out.write(baseentryldif.getBytes()); task("dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks\n" + "objectclass: top\n" + "objectclass: ds-task\n" + "objectclass: ds-task-import\n" + "ds-task-class-name: org.opends.server.tasks.ImportTask\n" + "ds-task-import-backend-id: userRoot\n" + "ds-task-import-ldif-file: " + path + "\n" + "ds-task-import-reject-file: " + path + "reject\n"); } /** * Check that after a sequence of add/del/add done on the same DN * the second entry is in the database. * The unique id of the entry is used to check that the correct entry * has been added. * To increase the risks of failures a loop of add/del/add is done. */ @SuppressWarnings("unchecked") @Test(groups="slow") public void addDelAddDependencyTest() throws Exception { ReplicationServer replServer = null; ReplicationDomain domain = null; DN baseDn = DN.decode(BASEDN_STRING); SynchronizationProvider replicationPlugin = null; short brokerId = 2; short serverId = 1; short replServerId = 1; int AddSequenceLength = 30; cleanDB(); try { String entryldif = "dn:" + BASEDN_STRING + "\n" + "objectClass: top\n" + "objectClass: domain\n"; Entry entry = TestCaseUtils.entryFromLdifString(entryldif); AttributeType uidType = DirectoryServer.getSchema().getAttributeType("entryuuid"); // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int replServerPort = socket.getLocalPort(); socket.close(); replicationPlugin = new MultimasterReplication(); DirectoryServer.registerSynchronizationProvider(replicationPlugin); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(replServerPort, "addDelAdddependency", 0, replServerId, 0, 5*AddSequenceLength+100, null); replServer = new ReplicationServer(conf); ReplicationBroker broker = openReplicationSession(baseDn, brokerId, 100, replServerPort, 1000, false); // send a sequence of add/del/add operations String addDn = BASEDN_STRING; ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L); int sequence; for (sequence = 1; sequence<=AddSequenceLength; sequence ++) { // add the entry a first time entry.removeAttribute(uidType); entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)), new LinkedList<AttributeValue>()); addDn = "dc=dependency" + sequence + "," + addDn; AddMsg addMsg = new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1), stringUID(sequence == 1 ? sequence : sequence +1000), entry.getObjectClassAttribute(), entry.getAttributes(), null ); broker.publish(addMsg); // delete the entry DeleteMsg delMsg = new DeleteMsg(addDn, gen.NewChangeNumber(), stringUID(sequence+1)); broker.publish(delMsg); // add again the entry with a new entryuuid. entry.removeAttribute(uidType); entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1001)), new LinkedList<AttributeValue>()); addMsg = new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1001), stringUID(sequence == 1 ? sequence : sequence +1000), entry.getObjectClassAttribute(), entry.getAttributes(), null ); broker.publish(addMsg); } // configure and start replication of dc=example,dc=com on the server SortedSet<String> replServers = new TreeSet<String>(); replServers.add("localhost:"+replServerPort); DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, serverId, replServers); domain = MultimasterReplication.createNewDomain(domainConf); // check that all entries have been deleted and added // again by checking that they do have the correct entryuuid addDn = BASEDN_STRING; for (sequence = 1; sequence<=AddSequenceLength; sequence ++) { addDn = "dc=dependency" + sequence + "," + addDn; boolean found = checkEntryHasAttribute(DN.decode(addDn), "entryuuid", stringUID(sequence+1001), 30000, true); if (!found) { fail("The second add was not replayed on entry " + addDn); } } DN deleteDN = DN.decode(addDn); while (sequence-->1) { DeleteMsg delMsg = new DeleteMsg(deleteDN.toString(), gen.NewChangeNumber(), stringUID(sequence + 1001)); broker.publish(delMsg); deleteDN = deleteDN.getParent(); } // check that the database was cleaned successfully DN node1 = DN.decode("dc=dependency1," + BASEDN_STRING); Entry baseEntry = getEntry(node1, 30000, false); assertNull(baseEntry, "The entry were not removed succesfully after test completion."); } finally { if (replServer != null) replServer.shutdown(); if (domain != null) MultimasterReplication.deleteDomain(baseDn); if (replicationPlugin != null) DirectoryServer.deregisterSynchronizationProvider(replicationPlugin); } } /** * Check that the dependency of moddn operation are working by * issuing a set of Add operation followed by a modrdn of the added entry. */ @SuppressWarnings("unchecked") @Test(groups="slow") public void addModdnDependencyTest() throws Exception { ReplicationServer replServer = null; ReplicationDomain domain = null; DN baseDn = DN.decode(BASEDN_STRING); SynchronizationProvider replicationPlugin = null; short brokerId = 2; short serverId = 1; short replServerId = 1; int AddSequenceLength = 30; cleanDB(); try { String entryldif = "dn:" + BASEDN_STRING + "\n" + "objectClass: top\n" + "objectClass: domain\n"; Entry entry = TestCaseUtils.entryFromLdifString(entryldif); AttributeType uidType = DirectoryServer.getSchema().getAttributeType("entryuuid"); // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int replServerPort = socket.getLocalPort(); socket.close(); replicationPlugin = new MultimasterReplication(); DirectoryServer.registerSynchronizationProvider(replicationPlugin); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(replServerPort, "addModdndependency", 0, replServerId, 0, 5*AddSequenceLength+100, null); replServer = new ReplicationServer(conf); ReplicationBroker broker = openReplicationSession(baseDn, brokerId, 100, replServerPort, 1000, false); String addDn = BASEDN_STRING; ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L); // send a sequence of add/modrdn operations int sequence; for (sequence = 1; sequence<=AddSequenceLength; sequence ++) { // add the entry entry.removeAttribute(uidType); entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)), new LinkedList<AttributeValue>()); addDn = "dc=dependency" + sequence + "," + BASEDN_STRING; AddMsg addMsg = new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1), stringUID(1), entry.getObjectClassAttribute(), entry.getAttributes(), null ); broker.publish(addMsg); // rename the entry ModifyDNMsg moddnMsg = new ModifyDNMsg(addDn, gen.NewChangeNumber(), stringUID(sequence+1), stringUID(1), true, null, "dc=new_dep" + sequence); broker.publish(moddnMsg); } // configure and start replication of dc=example,dc=com on the server SortedSet<String> replServers = new TreeSet<String>(); replServers.add("localhost:"+replServerPort); DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, serverId, replServers); domain = MultimasterReplication.createNewDomain(domainConf); // check that all entries have been renamed for (sequence = 1; sequence<=AddSequenceLength; sequence ++) { addDn = "dc=new_dep" + sequence + "," + BASEDN_STRING; Entry baseEntry = getEntry(DN.decode(addDn), 30000, true); assertNotNull(baseEntry, "The rename was not applied correctly on :" + addDn); } // delete the entries to clean the database. for (sequence = 1; sequence<=AddSequenceLength; sequence ++) { addDn = "dc=new_dep" + sequence + "," + BASEDN_STRING; DeleteMsg delMsg = new DeleteMsg(addDn.toString(), gen.NewChangeNumber(), stringUID(sequence + 1001)); broker.publish(delMsg); } } finally { if (replServer != null) replServer.shutdown(); if (domain != null) MultimasterReplication.deleteDomain(baseDn); if (replicationPlugin != null) DirectoryServer.deregisterSynchronizationProvider(replicationPlugin); } } /** * Builds and return a uuid from an integer. * This methods assume that unique integers are used and does not make any * unicity checks. It is only responsible for generating a uid with a * correct syntax. */ private String stringUID(int i) { return String.format("11111111-1111-1111-1111-%012x", i); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -153,10 +153,6 @@ // This test suite depends on having the schema available. TestCaseUtils.startServer(); // Disable schema check schemaCheck = DirectoryServer.checkSchema(); DirectoryServer.setCheckSchema(false); baseDn = DN.decode("dc=example,dc=com"); updatedEntries = newLDIFEntries(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -32,8 +32,6 @@ import java.net.ServerSocket; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import org.opends.server.TestCaseUtils; @@ -47,16 +45,12 @@ import org.opends.server.replication.plugin.ReplicationBroker; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.ReplicationMessage; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.LDAPException; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.Operation; import org.opends.server.types.OperationType; import org.opends.server.types.ResultCode; @@ -242,10 +236,6 @@ // This test suite depends on having the schema available. TestCaseUtils.startServer(); // Disable schema check schemaCheck = DirectoryServer.checkSchema(); DirectoryServer.setCheckSchema(false); // Create an internal connection connection = InternalClientConnection.getRootConnection(); @@ -325,22 +315,6 @@ configureReplication(); } /** * @return */ private List<Modification> generatemods(String attrName, String attrValue) { AttributeType attrType = DirectoryServer.getAttributeType(attrName.toLowerCase(), true); LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); values.add(new AttributeValue(attrType, attrValue)); Attribute attr = new Attribute(attrType, attrName, values); List<Modification> mods = new ArrayList<Modification>(); Modification mod = new Modification(ModificationType.REPLACE, attr); mods.add(mod); return mods; } private void processModify(int count) { while (count>0) opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
@@ -26,9 +26,6 @@ */ package org.opends.server.replication; import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME; import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; import java.io.File; @@ -36,18 +33,12 @@ import java.util.UUID; import org.opends.server.TestCaseUtils; import org.opends.server.backends.task.TaskState; import org.opends.server.core.AddOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.schema.DirectoryStringSyntax; import org.opends.server.types.AttributeType; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchScope; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -237,73 +228,4 @@ } /** * Utility method to create, run a task and check its result. */ private void task(String task) throws Exception { Entry taskEntry = TestCaseUtils.makeEntry(task); InternalClientConnection connection = InternalClientConnection.getRootConnection(); // Add the task. AddOperation addOperation = connection.processAdd(taskEntry.getDN(), taskEntry.getObjectClasses(), taskEntry.getUserAttributes(), taskEntry.getOperationalAttributes()); assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS, "Add of the task definition was not successful"); // Wait until the task completes. AttributeType completionTimeType = DirectoryServer.getAttributeType( ATTR_TASK_COMPLETION_TIME.toLowerCase()); SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)"); Entry resultEntry = null; String completionTime = null; long startMillisecs = System.currentTimeMillis(); do { InternalSearchOperation searchOperation = connection.processSearch(taskEntry.getDN(), SearchScope.BASE_OBJECT, filter); try { resultEntry = searchOperation.getSearchEntries().getFirst(); } catch (Exception e) { continue; } completionTime = resultEntry.getAttributeValue(completionTimeType, DirectoryStringSyntax.DECODER); if (completionTime == null) { if (System.currentTimeMillis() - startMillisecs > 1000*30) { break; } Thread.sleep(10); } } while (completionTime == null); if (completionTime == null) { fail("The task has not completed after 30 seconds."); } // Check that the task state is as expected. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); String stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); TaskState taskState = TaskState.fromString(stateString); assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY, "The task completed in an unexpected state"); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -26,12 +26,16 @@ */ package org.opends.server.replication; import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME; import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE; import static org.opends.server.loggers.ErrorLogger.logError; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; import java.net.SocketException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.NoSuchElementException; import java.util.List; @@ -42,7 +46,10 @@ import org.opends.server.replication.common.ServerState; import org.opends.server.replication.plugin.ReplicationBroker; import org.opends.server.replication.plugin.PersistentServerState; import org.opends.server.schema.DirectoryStringSyntax; import org.opends.server.schema.IntegerSyntax; import org.opends.server.backends.task.TaskState; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.protocols.internal.InternalClientConnection; @@ -53,6 +60,10 @@ import org.opends.server.types.ByteStringFactory; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchScope; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.AttributeType; @@ -87,11 +98,6 @@ protected Entry replServerEntry; /** * schema check flag */ protected boolean schemaCheck; /** * The replication plugin entry */ protected String synchroPluginStringDN = @@ -108,7 +114,6 @@ { // This test suite depends on having the schema available. TestCaseUtils.startServer(); schemaCheck = DirectoryServer.checkSchema(); // Create an internal connection connection = InternalClientConnection.getRootConnection(); @@ -298,8 +303,6 @@ @AfterClass public void classCleanUp() throws Exception { DirectoryServer.setCheckSchema(schemaCheck); cleanConfigEntries(); cleanRealEntries(); } @@ -380,7 +383,7 @@ protected boolean checkEntryHasAttribute(DN dn, String attrTypeStr, String valueString, int timeout, boolean hasAttribute) throws Exception { boolean found; boolean found = false; int count = timeout/100; if (count<1) count=1; @@ -408,15 +411,15 @@ newEntry = DirectoryServer.getEntry(dn); if (newEntry == null) fail("The entry " + dn + " has incorrectly been deleted from the database."); if (newEntry != null) { List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr); Attribute tmpAttr = tmpAttrList.get(0); AttributeType attrType = DirectoryServer.getAttributeType(attrTypeStr, true); found = tmpAttr.hasValue(new AttributeValue(attrType, valueString)); } } finally @@ -478,4 +481,95 @@ } } /** * Generate a new modification replace with the given information. * * @param attrName The attribute to replace. * @param attrValue The new value for the attribute * * @return The modification replace. */ protected List<Modification> generatemods(String attrName, String attrValue) { AttributeType attrType = DirectoryServer.getAttributeType(attrName.toLowerCase(), true); LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); values.add(new AttributeValue(attrType, attrValue)); Attribute attr = new Attribute(attrType, attrName, values); List<Modification> mods = new ArrayList<Modification>(); Modification mod = new Modification(ModificationType.REPLACE, attr); mods.add(mod); return mods; } /** * Utility method to create, run a task and check its result. */ protected void task(String task) throws Exception { Entry taskEntry = TestCaseUtils.makeEntry(task); InternalClientConnection connection = InternalClientConnection.getRootConnection(); // Add the task. AddOperation addOperation = connection.processAdd(taskEntry.getDN(), taskEntry.getObjectClasses(), taskEntry.getUserAttributes(), taskEntry.getOperationalAttributes()); assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS, "Add of the task definition was not successful"); // Wait until the task completes. AttributeType completionTimeType = DirectoryServer.getAttributeType( ATTR_TASK_COMPLETION_TIME.toLowerCase()); SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)"); Entry resultEntry = null; String completionTime = null; long startMillisecs = System.currentTimeMillis(); do { InternalSearchOperation searchOperation = connection.processSearch(taskEntry.getDN(), SearchScope.BASE_OBJECT, filter); try { resultEntry = searchOperation.getSearchEntries().getFirst(); } catch (Exception e) { continue; } completionTime = resultEntry.getAttributeValue(completionTimeType, DirectoryStringSyntax.DECODER); if (completionTime == null) { if (System.currentTimeMillis() - startMillisecs > 1000*30) { break; } Thread.sleep(10); } } while (completionTime == null); if (completionTime == null) { fail("The task has not completed after 30 seconds."); } // Check that the task state is as expected. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); String stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); TaskState taskState = TaskState.fromString(stateString); assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY, "The task completed in an unexpected state"); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -82,7 +82,6 @@ { // This test suite depends on having the schema available. TestCaseUtils.startServer(); schemaCheck = DirectoryServer.checkSchema(); // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/StressTest.java
@@ -33,8 +33,6 @@ import static org.testng.Assert.fail; import java.net.ServerSocket; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; @@ -50,15 +48,12 @@ import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.ReplicationMessage; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.InitializationException; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.Operation; import org.opends.server.types.OperationType; import org.opends.server.types.ResultCode; @@ -187,10 +182,6 @@ // Create an internal connection connection = InternalClientConnection.getRootConnection(); // Disable schema check schemaCheck = DirectoryServer.checkSchema(); DirectoryServer.setCheckSchema(false); // Create backend top level entries String[] topEntries = new String[2]; topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n" @@ -263,22 +254,6 @@ configureReplication(); } /** * @return */ private List<Modification> generatemods(String attrName, String attrValue) { AttributeType attrType = DirectoryServer.getAttributeType(attrName.toLowerCase(), true); LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); values.add(new AttributeValue(attrType, attrValue)); Attribute attr = new Attribute(attrType, attrName, values); List<Modification> mods = new ArrayList<Modification>(); Modification mod = new Modification(ModificationType.REPLACE, attr); mods.add(mod); return mods; } private class BrokerWriter extends Thread { int count; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -100,10 +100,6 @@ // This test suite depends on having the schema available. TestCaseUtils.startServer(); // Disable schema check schemaCheck = DirectoryServer.checkSchema(); DirectoryServer.setCheckSchema(false); // Create an internal connection connection = InternalClientConnection.getRootConnection(); @@ -1006,19 +1002,6 @@ } } private List<Modification> generatemods(String attrName, String attrValue) { AttributeType attrType = DirectoryServer.getAttributeType(attrName.toLowerCase(), true); LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); values.add(new AttributeValue(attrType, attrValue)); Attribute attr = new Attribute(attrType, attrName, values); List<Modification> mods = new ArrayList<Modification>(); Modification mod = new Modification(ModificationType.REPLACE, attr); mods.add(mod); return mods; } /** * Get the entryUUID for a given DN. * opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
New file @@ -0,0 +1,183 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import java.util.SortedSet; import org.opends.server.admin.ManagedObjectDefinition; import org.opends.server.admin.PropertyProvider; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.client.MultimasterDomainCfgClient; import org.opends.server.admin.std.server.MultimasterDomainCfg; import org.opends.server.types.DN; /** * This class implement a configuration object for the MultimasterDomain * that can be used in unit tests to instantiate ReplicationDomain. */ public class DomainFakeCfg implements MultimasterDomainCfg { private DN baseDn; private int serverId; private SortedSet<String> replicationServers; private long heartbeatInterval = 1000; /** * Creates a new Domain with the provided information */ public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers) { this.baseDn = baseDn; this.serverId = serverId; this.replicationServers = replServers; } /** * {@inheritDoc} */ public void addChangeListener( ConfigurationChangeListener<MultimasterDomainCfg> listener) { } /** * {@inheritDoc} */ public ManagedObjectDefinition<? extends MultimasterDomainCfgClient, ? extends MultimasterDomainCfg> definition() { return null; } /** * {@inheritDoc} */ public long getHeartbeatInterval() { return heartbeatInterval ; } /** * {@inheritDoc} */ public long getMaxReceiveDelay() { return 0; } /** * {@inheritDoc} */ public int getMaxReceiveQueue() { return 0; } /** * {@inheritDoc} */ public long getMaxSendDelay() { return 0; } /** * {@inheritDoc} */ public int getMaxSendQueue() { return 0; } /** * {@inheritDoc} */ public DN getReplicationDN() { return baseDn; } /** * {@inheritDoc} */ public SortedSet<String> getReplicationServer() { return replicationServers; } /** * {@inheritDoc} */ public int getServerId() { return serverId; } /** * {@inheritDoc} */ public int getWindowSize() { return 100; } /** * {@inheritDoc} */ public void removeChangeListener( ConfigurationChangeListener<MultimasterDomainCfg> listener) { } /** * {@inheritDoc} */ public DN dn() { return null; } /** * {@inheritDoc} */ public PropertyProvider properties() { return null; } /** * Set the heartbeat interval. * * @param interval */ public void setHeartbeatInterval(long interval) { heartbeatInterval = interval; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -76,7 +76,7 @@ /** * The port of the replicationServer. */ private int changelogPort; private int replicationServerPort; private ChangeNumber firstChangeNumberServer1 = null; private ChangeNumber secondChangeNumberServer1 = null; @@ -97,11 +97,11 @@ // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); changelogPort = socket.getLocalPort(); replicationServerPort = socket.getLocalPort(); socket.close(); ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPort, null, 0, 1, 0, 0, null); new ReplServerFakeConfiguration(replicationServerPort, null, 0, 1, 0, 0, null); replicationServer = new ReplicationServer(conf); } @@ -124,10 +124,10 @@ * Open a sender session and a receiver session to the replicationServer */ server1 = openReplicationSession( DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, DN.decode("dc=example,dc=com"), (short) 1, 100, replicationServerPort, 1000, true); server2 = openReplicationSession( DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, DN.decode("dc=example,dc=com"), (short) 2, 100, replicationServerPort, 1000, true); /* @@ -240,7 +240,7 @@ try { broker = openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3, 100, changelogPort, 1000, false); 100, replicationServerPort, 1000, false); ReplicationMessage msg2 = broker.receive(); if (!(msg2 instanceof DeleteMsg)) @@ -277,7 +277,7 @@ try { broker = openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3, 100, changelogPort, 1000, state); 100, replicationServerPort, 1000, state); ReplicationMessage msg2 = broker.receive(); if (!(msg2 instanceof DeleteMsg)) @@ -425,7 +425,7 @@ * Open a sender session */ server = openReplicationSession( DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort, DN.decode("dc=example,dc=com"), (short) 5, 100, replicationServerPort, 1000, 1000, 0, true); BrokerReader reader = new BrokerReader(server); @@ -436,7 +436,7 @@ for (int i =0; i< CLIENT_THREADS; i++) { clientBroker[i] = openReplicationSession( DN.decode("dc=example,dc=com"), (short) (100+i), 100, changelogPort, DN.decode("dc=example,dc=com"), (short) (100+i), 100, replicationServerPort, 1000, true); client[i] = new BrokerReader(clientBroker[i]); } @@ -510,7 +510,7 @@ new ChangeNumberGenerator(serverId , (long) 0); ReplicationBroker broker = openReplicationSession( DN.decode("dc=example,dc=com"), serverId, 100, changelogPort, 1000, 1000, 0, true); 100, replicationServerPort, 1000, 1000, 0, true); producer[i] = new BrokerWriter(broker, gen, TOTAL_MSG/THREADS); reader[i] = new BrokerReader(broker);