| | |
| | | { |
| | | return list.keySet().iterator(); |
| | | } |
| | | |
| | | /** |
| | | * Check that all the ChangeNumbers in the covered serverState are also in |
| | | * this serverState. |
| | | * |
| | | * @param covered The ServerState that needs to be checked. |
| | | * @return A boolean indicating if this ServerState covers the ServerState |
| | | * given in parameter. |
| | | */ |
| | | public boolean cover(ServerState covered) |
| | | { |
| | | for (ChangeNumber coveredChange : covered.list.values()) |
| | | { |
| | | ChangeNumber change = this.list.get(coveredChange.getServerId()); |
| | | if ((change == null) || (change.older(coveredChange))) |
| | | { |
| | | return false; |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | } |
| | |
| | | |
| | | |
| | | /** |
| | | * Finds the domain for a given DN. |
| | | * |
| | | * @param dn The DN for which the domain must be returned. |
| | | * @param op An optional operation for which the check is done. |
| | | * Can be null is the request has no associated operation. |
| | | * @return The domain for this DN. |
| | | */ |
| | | public static ReplicationDomain findDomain(DN dn, Operation op) |
| | | { |
| | | /* |
| | | * Don't run the special replication code on Operation that are |
| | | * specifically marked as don't synchronize. |
| | | */ |
| | | if ((op != null) && op.dontSynchronize()) |
| | | return null; |
| | | |
| | | ReplicationDomain domain = null; |
| | | DN temp = dn; |
| | | do |
| | | { |
| | | domain = domains.get(temp); |
| | | temp = temp.getParentDNInSuffix(); |
| | | if (temp == null) |
| | | { |
| | | break; |
| | | } |
| | | } while (domain == null); |
| | | |
| | | return domain; |
| | | } |
| | | |
| | | /** |
| | | * Creates a new domain from its configEntry, do the |
| | | * necessary initialization and starts it so that it is |
| | | * fully operational when this method returns. |
| | | * @param configuration The entry whith the configuration of this domain. |
| | | * @return The domain created. |
| | | * @throws ConfigException When the configuration is not valid. |
| | | */ |
| | | public static ReplicationDomain createNewDomain( |
| | | MultimasterDomainCfg configuration) |
| | | throws ConfigException |
| | | { |
| | | ReplicationDomain domain; |
| | | domain = new ReplicationDomain(configuration); |
| | | domains.put(domain.getBaseDN(), domain); |
| | | domain.start(); |
| | | return domain; |
| | | } |
| | | |
| | | /** |
| | | * Deletes a domain. |
| | | * @param dn : the base DN of the domain to delete. |
| | | */ |
| | | public static void deleteDomain(DN dn) |
| | | { |
| | | ReplicationDomain domain = domains.remove(dn); |
| | | if (domain != null) |
| | | domain.shutdown(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | |
| | | } |
| | | |
| | | /** |
| | | * Creates a new domain from its configEntry, do the |
| | | * necessary initialization and starts it so that it is |
| | | * fully operational when this method returns. |
| | | * @param configuration The entry whith the configuration of this domain. |
| | | * @throws ConfigException When the configuration is not valid. |
| | | */ |
| | | private void createNewDomain( |
| | | MultimasterDomainCfg configuration) |
| | | throws ConfigException |
| | | { |
| | | ReplicationDomain domain; |
| | | domain = new ReplicationDomain(configuration); |
| | | domains.put(domain.getBaseDN(), domain); |
| | | domain.start(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | |
| | | } |
| | | |
| | | /** |
| | | * Finds the domain for a given DN. |
| | | * |
| | | * @param dn The DN for which the domain must be returned. |
| | | * @param op An optional operation for which the check is done. |
| | | * Can be null is the request has no associated operation. |
| | | * @return The domain for this DN. |
| | | */ |
| | | public static ReplicationDomain findDomain(DN dn, Operation op) |
| | | { |
| | | /* |
| | | * Don't run the special replication code on Operation that are |
| | | * specifically marked as don't synchronize. |
| | | */ |
| | | if ((op != null) && op.dontSynchronize()) |
| | | return null; |
| | | |
| | | ReplicationDomain domain = null; |
| | | DN temp = dn; |
| | | do |
| | | { |
| | | domain = domains.get(temp); |
| | | temp = temp.getParentDNInSuffix(); |
| | | if (temp == null) |
| | | { |
| | | break; |
| | | } |
| | | } while (domain == null); |
| | | |
| | | return domain; |
| | | } |
| | | |
| | | /** |
| | | * Generic code for all the postOperation entry point. |
| | | * |
| | | * @param operation The Operation for which the post-operation is called. |
| | | * @param dn The Dn for which the post-operation is called. |
| | | */ |
| | | private void genericPostOperation(Operation operation, DN dn) |
| | | { |
| | | ReplicationDomain domain = findDomain(dn, operation); |
| | | if (domain == null) |
| | | return; |
| | | |
| | | domain.synchronize(operation); |
| | | |
| | | return; |
| | | } |
| | | |
| | | /** |
| | | * This method is called whenever the server detects a modification |
| | | * of the schema done by directly modifying the backing files |
| | | * of the schema backend. |
| | |
| | | public ConfigChangeResult applyConfigurationDelete( |
| | | MultimasterDomainCfg configuration) |
| | | { |
| | | DN dn = configuration.getReplicationDN(); |
| | | ReplicationDomain domain = domains.remove(dn); |
| | | if (domain != null) |
| | | domain.shutdown(); |
| | | deleteDomain(configuration.getReplicationDN()); |
| | | |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |
| | | } |
| | |
| | | { |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Generic code for all the postOperation entry point. |
| | | * |
| | | * @param operation The Operation for which the post-operation is called. |
| | | * @param dn The Dn for which the post-operation is called. |
| | | */ |
| | | private void genericPostOperation(Operation operation, DN dn) |
| | | { |
| | | ReplicationDomain domain = findDomain(dn, operation); |
| | | if (domain == null) |
| | | return; |
| | | |
| | | domain.synchronize(operation); |
| | | |
| | | return; |
| | | } |
| | | } |
| | | |
| | | |
| | |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Operation; |
| | | |
| | | /** |
| | | * This class is use to store the list of operations currently |
| | | * This class is use to store an operation currently |
| | | * in progress and not yet committed in the database. |
| | | */ |
| | | public class PendingChange |
| | | public class PendingChange implements Comparable<PendingChange> |
| | | { |
| | | private ChangeNumber changeNumber; |
| | | private boolean committed; |
| | | private UpdateMessage msg; |
| | | private Operation op; |
| | | private ServerState dependencyState = null; |
| | | private DN targetDN = null; |
| | | |
| | | /** |
| | | * Construct a new PendingChange. |
| | |
| | | this.op = op; |
| | | } |
| | | |
| | | /** |
| | | * Add the given ChangeNumber in the list of dependencies of this |
| | | * PendingChange. |
| | | * |
| | | * @param changeNumber The ChangeNumber to add in the list of dependencies |
| | | * of this PendingChange. |
| | | */ |
| | | public void addDependency(ChangeNumber changeNumber) |
| | | { |
| | | if (dependencyState == null) |
| | | { |
| | | dependencyState = new ServerState(); |
| | | } |
| | | dependencyState.update(changeNumber); |
| | | } |
| | | |
| | | /** |
| | | * Check if the given ServerState covers the dependencies of this |
| | | * PendingChange. |
| | | * |
| | | * @param state The ServerState for which dependencies must be checked, |
| | | * |
| | | * @return A boolean indicating if the given ServerState covers the |
| | | * dependencies of this PendingChange. |
| | | */ |
| | | public boolean dependenciesIsCovered(ServerState state) |
| | | { |
| | | return state.cover(dependencyState); |
| | | } |
| | | |
| | | /** |
| | | * Get the Target DN of this message. |
| | | * |
| | | * @return The target DN of this message. |
| | | */ |
| | | public DN getTargetDN() |
| | | { |
| | | synchronized (this) |
| | | { |
| | | if (targetDN != null) |
| | | return targetDN; |
| | | else |
| | | { |
| | | try |
| | | { |
| | | targetDN = DN.decode(msg.getDn()); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | } |
| | | } |
| | | return targetDN; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int compareTo(PendingChange o) |
| | | { |
| | | return this.getChangeNumber().compareTo(o.getChangeNumber()); |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.util.NoSuchElementException; |
| | | import java.util.SortedMap; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeMap; |
| | | import java.util.TreeSet; |
| | | |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | | import org.opends.server.replication.protocol.ModifyDNMsg; |
| | | import org.opends.server.replication.protocol.OperationContext; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Operation; |
| | | |
| | | /** |
| | | * |
| | | * This class is use to store the list of operations currently |
| | | * in progress and not yet committed in the database. |
| | | */ |
| | | public class PendingChanges |
| | | { |
| | | /** |
| | | * A map used to store the pending changes. |
| | | */ |
| | | private SortedMap<ChangeNumber, PendingChange> pendingChanges = |
| | | new TreeMap<ChangeNumber, PendingChange>(); |
| | | |
| | | /** |
| | | * A sorted set containing the list of PendingChanges that have |
| | | * not been replayed correctly because they are dependent on |
| | | * another change to be completed. |
| | | */ |
| | | private SortedSet<PendingChange> dependentChanges = |
| | | new TreeSet<PendingChange>(); |
| | | |
| | | /** |
| | | * The ChangeNumberGenerator to use to create new unique ChangeNumbers |
| | | * for each operation done on the replication domain. |
| | | */ |
| | | private ChangeNumberGenerator changeNumberGenerator; |
| | | |
| | | /** |
| | | * The Replicationbroker that will be used to send UpdateMessage. |
| | | */ |
| | | private ReplicationBroker broker; |
| | | |
| | | /** |
| | | * The ServerState that will be updated when UpdateMessage are committed. |
| | | */ |
| | | private ServerState state; |
| | | |
| | | /** |
| | | * Creates a new PendingChanges using the provided ChangeNumberGenerator. |
| | | * |
| | | * @param changeNumberGenerator The ChangeNumberGenerator to use to create |
| | | * new unique ChangeNumbers. |
| | | * @param broker The Replicationbroker that will be used to send |
| | | * UpdateMessage. |
| | | * @param state The ServerState that will be updated when UpdateMessage |
| | | * are committed. |
| | | */ |
| | | public PendingChanges( |
| | | ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker, |
| | | ServerState state) |
| | | { |
| | | this.changeNumberGenerator = changeNumberGenerator; |
| | | this.broker = broker; |
| | | this.state = state; |
| | | } |
| | | |
| | | /** |
| | | * Remove and return an update form the pending changes list. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update to remove. |
| | | * |
| | | * @return The UpdateMessage that was just removed. |
| | | */ |
| | | public synchronized UpdateMessage remove(ChangeNumber changeNumber) |
| | | { |
| | | return pendingChanges.remove(changeNumber).getMsg(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of update currently in the list. |
| | | * |
| | | * @return The number of update currently in the list. |
| | | */ |
| | | public synchronized int size() |
| | | { |
| | | return pendingChanges.size(); |
| | | } |
| | | |
| | | /** |
| | | * Mark an update message as committed. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update message that must be |
| | | * set as committed. |
| | | * @param op The Operation associated of the update when the |
| | | * update is a local update. |
| | | * @param msg The message associated to the update when the update |
| | | * was received from a replication server. |
| | | */ |
| | | public synchronized void commit(ChangeNumber changeNumber, |
| | | Operation op, UpdateMessage msg) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(changeNumber); |
| | | if (curChange == null) |
| | | { |
| | | throw new NoSuchElementException(); |
| | | } |
| | | curChange.setCommitted(true); |
| | | |
| | | if (op.isSynchronizationOperation()) |
| | | curChange.setOp(op); |
| | | else |
| | | curChange.setMsg(msg); |
| | | } |
| | | |
| | | /** |
| | | * Mark an update message as committed. |
| | | * |
| | | * @param changeNumber The ChangeNumber of the update message that must be |
| | | * set as committed. |
| | | */ |
| | | public synchronized void commit(ChangeNumber changeNumber) |
| | | { |
| | | PendingChange curChange = pendingChanges.get(changeNumber); |
| | | if (curChange == null) |
| | | { |
| | | throw new NoSuchElementException(); |
| | | } |
| | | curChange.setCommitted(true); |
| | | } |
| | | |
| | | /** |
| | | * Add a new UpdateMessage to the pending list from the provided local |
| | | * operation. |
| | | * |
| | | * @param operation The local operation for which an UpdateMessage mus |
| | | * be added in the pending list. |
| | | * @return The ChangeNumber now associated to the operation. |
| | | */ |
| | | public synchronized ChangeNumber putLocalOperation(Operation operation) |
| | | { |
| | | ChangeNumber changeNumber; |
| | | |
| | | changeNumber = changeNumberGenerator.NewChangeNumber(); |
| | | PendingChange change = new PendingChange(changeNumber, operation, null); |
| | | pendingChanges.put(changeNumber, change); |
| | | return changeNumber; |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Add a new UpdateMessage that was received from the replication server |
| | | * to the pendingList. |
| | | * |
| | | * @param update The UpdateMessage that was received from the replication |
| | | * server and that will be added to the pending list. |
| | | */ |
| | | public synchronized void putRemoteUpdate(UpdateMessage update) |
| | | { |
| | | ChangeNumber changeNumber = update.getChangeNumber(); |
| | | changeNumberGenerator.adjust(changeNumber); |
| | | pendingChanges.put(changeNumber, new PendingChange(changeNumber, null, |
| | | update)); |
| | | } |
| | | |
| | | /** |
| | | * Push all committed local changes to the replicationServer service. |
| | | * |
| | | * @return The number of pushed updates. |
| | | */ |
| | | public synchronized int pushCommittedChanges() |
| | | { |
| | | int numSentUpdates = 0; |
| | | if (pendingChanges.isEmpty()) |
| | | return numSentUpdates; |
| | | |
| | | ChangeNumber firstChangeNumber = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstChangeNumber); |
| | | |
| | | while ((firstChange != null) && firstChange.isCommitted()) |
| | | { |
| | | if ((firstChange.getOp() != null ) && |
| | | (firstChange.getOp().isSynchronizationOperation() == false)) |
| | | { |
| | | numSentUpdates++; |
| | | broker.publish(firstChange.getMsg()); |
| | | } |
| | | state.update(firstChangeNumber); |
| | | pendingChanges.remove(firstChangeNumber); |
| | | |
| | | if (pendingChanges.isEmpty()) |
| | | { |
| | | firstChange = null; |
| | | } |
| | | else |
| | | { |
| | | firstChangeNumber = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstChangeNumber); |
| | | } |
| | | } |
| | | return numSentUpdates; |
| | | } |
| | | |
| | | /** |
| | | * Get the first update in the list that have some dependencies cleared. |
| | | * |
| | | * @return The UpdateMessage to be handled. |
| | | */ |
| | | public synchronized UpdateMessage getNextUpdate() |
| | | { |
| | | /* |
| | | * Parse the list of Update with dependencies and check if the dependencies |
| | | * are now cleared until an Update withour dependencies is found. |
| | | */ |
| | | for (PendingChange change : dependentChanges) |
| | | { |
| | | if (change.dependenciesIsCovered(state)) |
| | | { |
| | | dependentChanges.remove(change); |
| | | return change.getMsg(); |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Check if the given AddOperation has some dependencies on any |
| | | * currently running previous operation. |
| | | * Update the dependency list in the associated PendingChange if |
| | | * there are some dependencies. |
| | | * AddOperation depends on |
| | | * |
| | | * - DeleteOperation done on the same DN |
| | | * - ModifyDnOperation with the same target DN as the ADD DN |
| | | * - ModifyDnOperation with new DN equals to the ADD DN parent |
| | | * - AddOperation done on the parent DN of the ADD DN |
| | | * |
| | | * @param op The AddOperation to be checked. |
| | | * |
| | | * @return A boolean indicating if this operation has some dependencies. |
| | | */ |
| | | public synchronized boolean checkDependencies(AddOperation op) |
| | | { |
| | | boolean hasDependencies = false; |
| | | DN targetDn = op.getEntryDN(); |
| | | ChangeNumber changeNumber = OperationContext.getChangeNumber(op); |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | if (change == null) |
| | | return false; |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | { |
| | | if (pendingChange.getChangeNumber().older(changeNumber)) |
| | | { |
| | | UpdateMessage pendingMsg = pendingChange.getMsg(); |
| | | if (pendingMsg != null) |
| | | { |
| | | if (pendingMsg instanceof DeleteMsg) |
| | | { |
| | | /* |
| | | * Check is the operation to be run is a deleteOperation on the |
| | | * same DN. |
| | | */ |
| | | if (pendingChange.getTargetDN().equals(targetDn)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof AddMsg) |
| | | { |
| | | /* |
| | | * Check if the operation to be run is an addOperation on a |
| | | * parent of the current AddOperation. |
| | | */ |
| | | if (pendingChange.getTargetDN().isAncestorOf(targetDn)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof ModifyDNMsg) |
| | | { |
| | | /* |
| | | * Check if the operation to be run is ModifyDnOperation with |
| | | * the same target DN as the ADD DN |
| | | * or a ModifyDnOperation with new DN equals to the ADD DN parent |
| | | */ |
| | | if (pendingChange.getTargetDN().equals(targetDn)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | else |
| | | { |
| | | ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg(); |
| | | if (pendingModDn.newDNIsParent(targetDn)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return hasDependencies; |
| | | } |
| | | |
| | | /** |
| | | * Check if the given ModifyOperation has some dependencies on any |
| | | * currently running previous operation. |
| | | * Update the dependency list in the associated PendingChange if |
| | | * there are some dependencies. |
| | | * |
| | | * ModifyOperation depends on |
| | | * - AddOperation done on the same DN |
| | | * |
| | | * @param op The ModifyOperation to be checked. |
| | | * |
| | | * @return A boolean indicating if this operation has some dependencies. |
| | | */ |
| | | public synchronized boolean checkDependencies(ModifyOperation op) |
| | | { |
| | | boolean hasDependencies = false; |
| | | DN targetDn = op.getEntryDN(); |
| | | ChangeNumber changeNumber = OperationContext.getChangeNumber(op); |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | if (change == null) |
| | | return false; |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | { |
| | | if (pendingChange.getChangeNumber().older(changeNumber)) |
| | | { |
| | | UpdateMessage pendingMsg = pendingChange.getMsg(); |
| | | if (pendingMsg != null) |
| | | { |
| | | if (pendingMsg instanceof AddMsg) |
| | | { |
| | | /* |
| | | * Check if the operation to be run is an addOperation on a |
| | | * same DN. |
| | | */ |
| | | if (pendingChange.getTargetDN().equals(targetDn)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return hasDependencies; |
| | | } |
| | | |
| | | /** |
| | | * Mark the first pendingChange as dependent on the second PendingChange. |
| | | * @param dependentChange The PendingChange that depend on the second |
| | | * PendingChange. |
| | | * @param pendingChange The PendingChange on which the first PendingChange |
| | | * is dependent. |
| | | */ |
| | | private void addDependency( |
| | | PendingChange dependentChange, PendingChange pendingChange) |
| | | { |
| | | dependentChange.addDependency(pendingChange.getChangeNumber()); |
| | | dependentChanges.add(dependentChange); |
| | | } |
| | | |
| | | /** |
| | | * Check if the given ModifyDNMsg has some dependencies on any |
| | | * currently running previous operation. |
| | | * Update the dependency list in the associated PendingChange if |
| | | * there are some dependencies. |
| | | * |
| | | * Modify DN Operation depends on |
| | | * - AddOperation done on the same DN as the target DN of the MODDN operation |
| | | * - AddOperation done on the new parent of the MODDN operation |
| | | * - DeleteOperation done on the new DN of the MODDN operation |
| | | * - ModifyDNOperation done from the new DN of the MODDN operation |
| | | * |
| | | * @param msg The ModifyDNMsg to be checked. |
| | | * |
| | | * @return A boolean indicating if this operation has some dependencies. |
| | | */ |
| | | public synchronized boolean checkDependencies(ModifyDNMsg msg) |
| | | { |
| | | boolean hasDependencies = false; |
| | | ChangeNumber changeNumber = msg.getChangeNumber(); |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | if (change == null) |
| | | return false; |
| | | |
| | | DN targetDn = change.getTargetDN(); |
| | | |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | { |
| | | if (pendingChange.getChangeNumber().older(changeNumber)) |
| | | { |
| | | UpdateMessage pendingMsg = pendingChange.getMsg(); |
| | | if (pendingMsg != null) |
| | | { |
| | | if (pendingMsg instanceof DeleteMsg) |
| | | { |
| | | // Check if the target of the Delete is the same |
| | | // as the new DN of this ModifyDN |
| | | if (msg.newDNIsEqual(pendingChange.getTargetDN())) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof AddMsg) |
| | | { |
| | | // Check if the Add Operation was done on the new parent of |
| | | // the MODDN operation |
| | | if (msg.newParentIsEqual(pendingChange.getTargetDN())) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | // Check if the AddOperation was done on the same DN as the |
| | | // target DN of the MODDN operation |
| | | if (pendingChange.getTargetDN().equals(targetDn)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof ModifyDNMsg) |
| | | { |
| | | // Check if the ModifyDNOperation was done from the new DN of |
| | | // the MODDN operation |
| | | if (msg.newDNIsEqual(pendingChange.getTargetDN())) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return hasDependencies; |
| | | } |
| | | |
| | | /** |
| | | * Check if the given DeleteOperation has some dependencies on any |
| | | * currently running previous operation. |
| | | * Update the dependency list in the associated PendingChange if |
| | | * there are some dependencies. |
| | | * |
| | | * DeleteOperation depends on |
| | | * - DeleteOperation done on children DN |
| | | * - ModifyDnOperation with target DN that are children of the DEL DN |
| | | * - AddOperation done on the same DN |
| | | * |
| | | * |
| | | * @param op The DeleteOperation to be checked. |
| | | * |
| | | * @return A boolean indicating if this operation has some dependencies. |
| | | */ |
| | | public synchronized boolean checkDependencies(DeleteOperation op) |
| | | { |
| | | boolean hasDependencies = false; |
| | | DN targetDn = op.getEntryDN(); |
| | | ChangeNumber changeNumber = OperationContext.getChangeNumber(op); |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | if (change == null) |
| | | return false; |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | { |
| | | if (pendingChange.getChangeNumber().older(changeNumber)) |
| | | { |
| | | UpdateMessage pendingMsg = pendingChange.getMsg(); |
| | | if (pendingMsg != null) |
| | | { |
| | | if (pendingMsg instanceof DeleteMsg) |
| | | { |
| | | /* |
| | | * Check if the operation to be run is a deleteOperation on a |
| | | * children of the current DeleteOperation. |
| | | */ |
| | | if (pendingChange.getTargetDN().isDescendantOf(targetDn)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof AddMsg) |
| | | { |
| | | /* |
| | | * Check if the operation to be run is an addOperation on a |
| | | * parent of the current DeleteOperation. |
| | | */ |
| | | if (pendingChange.getTargetDN().equals(targetDn)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | } |
| | | else if (pendingMsg instanceof ModifyDNMsg) |
| | | { |
| | | /* |
| | | * Check if the operation to be run is an ModifyDNOperation |
| | | * on a children of the current DeleteOperation |
| | | */ |
| | | if (pendingChange.getTargetDN().isDescendantOf(targetDn)) |
| | | { |
| | | hasDependencies = true; |
| | | addDependency(change, pendingChange); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return hasDependencies; |
| | | } |
| | | } |
| | |
| | | import java.util.LinkedHashSet; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | |
| | | { |
| | | private ReplicationMonitor monitor; |
| | | |
| | | private ChangeNumberGenerator changeNumberGenerator; |
| | | private ReplicationBroker broker; |
| | | |
| | | private List<ListenerThread> synchroThreads = |
| | | new ArrayList<ListenerThread>(); |
| | | private final SortedMap<ChangeNumber, PendingChange> pendingChanges = |
| | | new TreeMap<ChangeNumber, PendingChange>(); |
| | | private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs = |
| | | new TreeMap<ChangeNumber, UpdateMessage>(); |
| | | private int numRcvdUpdates = 0; |
| | | private int numSentUpdates = 0; |
| | | private AtomicInteger numRcvdUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numSentUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numProcessedUpdates = new AtomicInteger(); |
| | | private int debugCount = 0; |
| | | private PersistentServerState state; |
| | |
| | | private int maxSendDelay = 0; |
| | | |
| | | /** |
| | | * This object is used to store the list of update currently being |
| | | * done on the local database. |
| | | * It contain both the update that are done directly on this server |
| | | * and the updates that was done on another server, transmitted |
| | | * by the replication server and that are currently replayed. |
| | | * It is usefull to make sure that dependencies between operations |
| | | * are correctly fullfilled, that the local operations are sent in a |
| | | * correct order to the replication server and that the ServerState |
| | | * is not updated too early. |
| | | */ |
| | | private PendingChanges pendingChanges; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | |
| | | private ConfigEntry backendConfigEntry; |
| | | private List<DN> branches = new ArrayList<DN>(0); |
| | | |
| | | private int listenerThreadNumber = 1; |
| | | private int listenerThreadNumber = 10; |
| | | private boolean receiveStatus = true; |
| | | |
| | | private Collection<String> replicationServers; |
| | |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on the replication domain. |
| | | */ |
| | | changeNumberGenerator = new ChangeNumberGenerator(serverId, state); |
| | | |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | | try |
| | |
| | | */ |
| | | } |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on the replication domain. |
| | | */ |
| | | pendingChanges = |
| | | new PendingChanges(new ChangeNumberGenerator(serverId, state), |
| | | broker, state); |
| | | |
| | | // listen for changes on the configuration |
| | | configuration.addChangeListener(this); |
| | | } |
| | |
| | | * of the parent entry |
| | | */ |
| | | |
| | | // There is a potential of perfs improvement here |
| | | // if we could avoid the following parent entry retrieval |
| | | DN parentDnFromCtx = findEntryDN(ctx.getParentUid()); |
| | | |
| | | if (parentDnFromCtx != null) |
| | | String parentUid = ctx.getParentUid(); |
| | | // root entry have no parent, |
| | | // there is no need to check for it. |
| | | if (parentUid != null) |
| | | { |
| | | DN entryDN = addOperation.getEntryDN(); |
| | | DN parentDnFromEntryDn = entryDN.getParentDNInSuffix(); |
| | | if ((parentDnFromEntryDn != null) |
| | | && (!parentDnFromCtx.equals(parentDnFromEntryDn))) |
| | | // There is a potential of perfs improvement here |
| | | // if we could avoid the following parent entry retrieval |
| | | DN parentDnFromCtx = findEntryDN(ctx.getParentUid()); |
| | | |
| | | if (parentDnFromCtx == null) |
| | | { |
| | | // parentEntry has been renamed |
| | | // replication name conflict resolution is expected to fix that |
| | | // later in the flow |
| | | // The parent does not exist with the specified unique id |
| | | // stop the operation with NO_SUCH_OBJECT and let the |
| | | // conflict resolution or the dependency resolution solve this. |
| | | addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); |
| | | return new SynchronizationProviderResult(false); |
| | | } |
| | | else |
| | | { |
| | | DN entryDN = addOperation.getEntryDN(); |
| | | DN parentDnFromEntryDn = entryDN.getParentDNInSuffix(); |
| | | if ((parentDnFromEntryDn != null) |
| | | && (!parentDnFromCtx.equals(parentDnFromEntryDn))) |
| | | { |
| | | // parentEntry has been renamed |
| | | // replication name conflict resolution is expected to fix that |
| | | // later in the flow |
| | | addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); |
| | | return new SynchronizationProviderResult(false); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return new SynchronizationProviderResult(true); |
| | |
| | | */ |
| | | public UpdateMessage receive() |
| | | { |
| | | synchronized (broker) |
| | | UpdateMessage update = pendingChanges.getNextUpdate(); |
| | | |
| | | if (update == null) |
| | | { |
| | | UpdateMessage update = null; |
| | | while (update == null) |
| | | synchronized (broker) |
| | | { |
| | | ReplicationMessage msg; |
| | | try |
| | | while (update == null) |
| | | { |
| | | msg = broker.receive(); |
| | | if (msg == null) |
| | | ReplicationMessage msg; |
| | | try |
| | | { |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | log("Broker received message :" + msg); |
| | | if (msg instanceof AckMessage) |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | | receiveAck(ack); |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | update = (UpdateMessage) msg; |
| | | receiveUpdate(update); |
| | | } |
| | | else if (msg instanceof InitializeRequestMessage) |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | InitializeRequestMessage initMsg = (InitializeRequestMessage) msg; |
| | | try |
| | | msg = broker.receive(); |
| | | if (msg == null) |
| | | { |
| | | initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(), |
| | | null); |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | catch(DirectoryException de) |
| | | log("Broker received message :" + msg); |
| | | if (msg instanceof AckMessage) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | int msgID = de.getMessageID(); |
| | | ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), |
| | | msgID, de.getMessage()); |
| | | broker.publish(errorMsg); |
| | | AckMessage ack = (AckMessage) msg; |
| | | receiveAck(ack); |
| | | } |
| | | } |
| | | else if (msg instanceof InitializeTargetMessage) |
| | | { |
| | | // Another server is exporting its entries to us |
| | | InitializeTargetMessage initMsg = (InitializeTargetMessage) msg; |
| | | else if (msg instanceof InitializeRequestMessage) |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | InitializeRequestMessage initMsg = (InitializeRequestMessage) msg; |
| | | try |
| | | { |
| | | initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(), |
| | | null); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | int msgID = de.getMessageID(); |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(initMsg.getsenderID(), |
| | | msgID, de.getMessage()); |
| | | broker.publish(errorMsg); |
| | | } |
| | | } |
| | | else if (msg instanceof InitializeTargetMessage) |
| | | { |
| | | // Another server is exporting its entries to us |
| | | InitializeTargetMessage initMsg = (InitializeTargetMessage) msg; |
| | | |
| | | try |
| | | { |
| | | importBackend(initMsg); |
| | | try |
| | | { |
| | | importBackend(initMsg); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Return an error message to notify the sender |
| | | int msgID = de.getMessageID(); |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(initMsg.getsenderID(), |
| | | msgID, de.getMessage()); |
| | | log(getMessage(msgID, |
| | | backend.getBackendID()) + de.getMessage()); |
| | | broker.publish(errorMsg); |
| | | } |
| | | } |
| | | catch(DirectoryException de) |
| | | else if (msg instanceof ErrorMessage) |
| | | { |
| | | // Return an error message to notify the sender |
| | | int msgID = de.getMessageID(); |
| | | ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(), |
| | | msgID, de.getMessage()); |
| | | log(getMessage(msgID, backend.getBackendID()) + de.getMessage()); |
| | | broker.publish(errorMsg); |
| | | if (ieContext != null) |
| | | { |
| | | // This is an error termination for the 2 following cases : |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | } |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | update = (UpdateMessage) msg; |
| | | receiveUpdate(update); |
| | | } |
| | | } |
| | | else if (msg instanceof ErrorMessage) |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | // This is an error termination for the 2 following cases : |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | } |
| | | // just retry |
| | | } |
| | | } catch (SocketTimeoutException e) |
| | | { |
| | | // just retry |
| | | } |
| | | |
| | | } |
| | | return update; |
| | | } |
| | | return update; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void receiveUpdate(UpdateMessage update) |
| | | { |
| | | ChangeNumber changeNumber = update.getChangeNumber(); |
| | | |
| | | synchronized (pendingChanges) |
| | | { |
| | | if (pendingChanges.containsKey(changeNumber)) |
| | | { |
| | | /* |
| | | * This should never happen, |
| | | * TODO log error and throw exception |
| | | */ |
| | | } |
| | | pendingChanges.put(changeNumber, |
| | | new PendingChange(changeNumber, null, update)); |
| | | numRcvdUpdates++; |
| | | } |
| | | pendingChanges.putRemoteUpdate(update); |
| | | numRcvdUpdates.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | |
| | | UpdateMessage update; |
| | | ChangeNumber changeNumber = ack.getChangeNumber(); |
| | | |
| | | synchronized (pendingChanges) |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | update = waitingAckMsgs.get(changeNumber); |
| | | waitingAckMsgs.remove(changeNumber); |
| | | update = waitingAckMsgs.remove(changeNumber); |
| | | } |
| | | if (update != null) |
| | | { |
| | |
| | | * This is an operation type that we do not know about |
| | | * It should never happen. |
| | | */ |
| | | synchronized (pendingChanges) |
| | | { |
| | | pendingChanges.remove(curChangeNumber); |
| | | int msgID = MSGID_UNKNOWN_TYPE; |
| | | String message = getMessage(msgID, op.getOperationType().toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | return; |
| | | } |
| | | pendingChanges.remove(curChangeNumber); |
| | | int msgID = MSGID_UNKNOWN_TYPE; |
| | | String message = getMessage(msgID, op.getOperationType().toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | return; |
| | | } |
| | | } |
| | | |
| | | synchronized(pendingChanges) |
| | | if (result == ResultCode.SUCCESS) |
| | | { |
| | | if (result == ResultCode.SUCCESS) |
| | | try |
| | | { |
| | | PendingChange curChange = pendingChanges.get(curChangeNumber); |
| | | if (curChange == null) |
| | | { |
| | | // This should never happen |
| | | int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING; |
| | | String message = getMessage(msgID, curChangeNumber.toString(), |
| | | op.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | return; |
| | | } |
| | | curChange.setCommitted(true); |
| | | pendingChanges.commit(curChangeNumber, op, msg); |
| | | } |
| | | catch (NoSuchElementException e) |
| | | { |
| | | int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING; |
| | | String message = getMessage(msgID, curChangeNumber.toString(), |
| | | op.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | return; |
| | | } |
| | | |
| | | if (op.isSynchronizationOperation()) |
| | | curChange.setOp(op); |
| | | else |
| | | curChange.setMsg(msg); |
| | | |
| | | if (msg != null && isAssured) |
| | | if (msg != null && isAssured) |
| | | { |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | // Add the assured message to the list of those whose acknowledgements |
| | | // we are awaiting. |
| | | // Add the assured message to the list of update that are |
| | | // waiting acknowledgements |
| | | waitingAckMsgs.put(curChangeNumber, msg); |
| | | } |
| | | } |
| | | else if (!op.isSynchronizationOperation()) |
| | | { |
| | | // Remove an unsuccessful non-replication operation from the pending |
| | | // changes list. |
| | | if (curChangeNumber != null) |
| | | { |
| | | pendingChanges.remove(curChangeNumber); |
| | | } |
| | | } |
| | | |
| | | pushCommittedChanges(); |
| | | } |
| | | else if (!op.isSynchronizationOperation()) |
| | | { |
| | | // Remove an unsuccessful non-replication operation from the pending |
| | | // changes list. |
| | | if (curChangeNumber != null) |
| | | { |
| | | pendingChanges.remove(curChangeNumber); |
| | | } |
| | | } |
| | | |
| | | int pushedChanges = pendingChanges.pushCommittedChanges(); |
| | | numSentUpdates.addAndGet(pushedChanges); |
| | | |
| | | // Wait for acknowledgement of an assured message. |
| | | if (msg != null && isAssured) |
| | |
| | | */ |
| | | public int getNumRcvdUpdates() |
| | | { |
| | | return numRcvdUpdates; |
| | | return numRcvdUpdates.get(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public int getNumSentUpdates() |
| | | { |
| | | return numSentUpdates; |
| | | return numSentUpdates.get(); |
| | | } |
| | | |
| | | /** |
| | | * get the number of updates in the pending list. |
| | | * Get the number of updates in the pending list. |
| | | * |
| | | * @return The number of updates in the pending list |
| | | */ |
| | |
| | | { |
| | | Operation op = null; |
| | | boolean done = false; |
| | | boolean dependency = false; |
| | | ChangeNumber changeNumber = null; |
| | | int retryCount = 10; |
| | | boolean firstTry = true; |
| | | |
| | | try |
| | | { |
| | | while (!done && retryCount-- > 0) |
| | | while ((!dependency) && (!done) && (retryCount-- > 0)) |
| | | { |
| | | op = msg.createOperation(conn); |
| | | |
| | | op.setInternalOperation(true); |
| | | op.setSynchronizationOperation(true); |
| | | changeNumber = OperationContext.getChangeNumber(op); |
| | | if (changeNumber != null) |
| | | changeNumberGenerator.adjust(changeNumber); |
| | | |
| | | op.run(); |
| | | |
| | | ResultCode result = op.getResultCode(); |
| | | |
| | | if (result != ResultCode.SUCCESS) |
| | | { |
| | | if (op instanceof ModifyOperation) |
| | | { |
| | | ModifyOperation newOp = (ModifyOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | dependency = pendingChanges.checkDependencies(newOp); |
| | | if (!dependency) |
| | | { |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | } |
| | | else if (op instanceof DeleteOperation) |
| | | { |
| | | DeleteOperation newOp = (DeleteOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | dependency = pendingChanges.checkDependencies(newOp); |
| | | if ((!dependency) && (!firstTry)) |
| | | { |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | } |
| | | else if (op instanceof AddOperation) |
| | | { |
| | | AddOperation newOp = (AddOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | |
| | | } else if (op instanceof ModifyDNOperation) |
| | | dependency = pendingChanges.checkDependencies(newOp); |
| | | if (!dependency) |
| | | { |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | } |
| | | else if (op instanceof ModifyDNOperation) |
| | | { |
| | | ModifyDNOperation newOp = (ModifyDNOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | ModifyDNMsg newMsg = (ModifyDNMsg) msg; |
| | | dependency = pendingChanges.checkDependencies(newMsg); |
| | | if (!dependency) |
| | | { |
| | | ModifyDNOperation newOp = (ModifyDNOperation) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | } |
| | | else |
| | | { |
| | |
| | | { |
| | | done = true; |
| | | } |
| | | firstTry = false; |
| | | } |
| | | |
| | | if (!done) |
| | | if (!done && !dependency) |
| | | { |
| | | // Continue with the next change but the servers could now become |
| | | // inconsistent. |
| | |
| | | String message = getMessage(msgID, op.toString()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, message, msgID); |
| | | |
| | | updateError(changeNumber); |
| | | } |
| | | } |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (msg.isAssured()) |
| | | ack(msg.getChangeNumber()); |
| | | incProcessedUpdates(); |
| | | if (!dependency) |
| | | { |
| | | if (msg.isAssured()) |
| | | ack(msg.getChangeNumber()); |
| | | incProcessedUpdates(); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void updateError(ChangeNumber changeNumber) |
| | | { |
| | | synchronized (pendingChanges) |
| | | { |
| | | PendingChange change = pendingChanges.get(changeNumber); |
| | | change.setCommitted(true); |
| | | pushCommittedChanges(); |
| | | } |
| | | pendingChanges.commit(changeNumber); |
| | | int pushedChanges = pendingChanges.pushCommittedChanges(); |
| | | numSentUpdates.addAndGet(pushedChanges); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | private ChangeNumber generateChangeNumber(Operation operation) |
| | | { |
| | | ChangeNumber changeNumber; |
| | | |
| | | changeNumber = changeNumberGenerator.NewChangeNumber(); |
| | | PendingChange change = new PendingChange(changeNumber, operation, null); |
| | | synchronized(pendingChanges) |
| | | { |
| | | pendingChanges.put(changeNumber, change); |
| | | } |
| | | return changeNumber; |
| | | return pendingChanges.putLocalOperation(operation); |
| | | } |
| | | |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Push all committed local changes to the replicationServer service. |
| | | * PRECONDITION : The pendingChanges lock must be held before calling |
| | | * this method. |
| | | */ |
| | | private void pushCommittedChanges() |
| | | { |
| | | if (pendingChanges.isEmpty()) |
| | | return; |
| | | |
| | | ChangeNumber firstChangeNumber = pendingChanges.firstKey(); |
| | | PendingChange firstChange = pendingChanges.get(firstChangeNumber); |
| | | |
| | | while ((firstChange != null) && firstChange.isCommitted()) |
| | | { |
| | | if ((firstChange.getOp() != null ) && |
| | | (firstChange.getOp().isSynchronizationOperation() == false)) |
| | | { |
| | | numSentUpdates++; |
| | | broker.publish(firstChange.getMsg()); |
| | | } |
| | | state.update(firstChangeNumber); |
| | | pendingChanges.remove(firstChangeNumber); |
| | | |
| | | if (pendingChanges.isEmpty()) |
| | | { |
| | | firstChange = null; |
| | | } |
| | | else |
| | | { |
| | | firstChangeNumber = pendingChanges.firstKey(); |
| | | firstChange = pendingChanges.get(firstChangeNumber); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum receive window size. |
| | | * |
| | | * @return The maximum receive window size. |
| | |
| | | for (Attribute a : userAttributes) |
| | | elems.add(new LDAPAttribute(a).encode()); |
| | | |
| | | for (Attribute a : operationalAttributes) |
| | | elems.add(new LDAPAttribute(a).encode()); |
| | | if (operationalAttributes != null) |
| | | for (Attribute a : operationalAttributes) |
| | | elems.add(new LDAPAttribute(a).encode()); |
| | | |
| | | encodedAttributes = ASN1Element.encodeValue(elems); |
| | | } |
| | |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Operation; |
| | | |
| | | /** |
| | |
| | | this.newRDN = newRDN; |
| | | } |
| | | |
| | | /** |
| | | * Check if this MSG will change the DN of the target entry to be |
| | | * the same as the dn given as a parameter. |
| | | * @param targetDn the DN to use when checking if this MSG will change |
| | | * the DN of the entry to a given DN. |
| | | * @return A boolean indicating if the modify DN MSG will change the DN of |
| | | * the target entry to be the same as the dn given as a parameter. |
| | | */ |
| | | public boolean newDNIsParent(DN targetDn) |
| | | { |
| | | try |
| | | { |
| | | String newStringDN = newRDN + "," + newSuperior; |
| | | DN newDN = DN.decode(newStringDN); |
| | | |
| | | if (newDN.isAncestorOf(targetDn)) |
| | | return true; |
| | | else |
| | | return false; |
| | | } catch (DirectoryException e) |
| | | { |
| | | // The DN was not a correct DN, and therefore does not a parent of the |
| | | // DN given as a parameter. |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check if the new dn of this ModifyDNMsg is the same as the targetDN |
| | | * given in parameter. |
| | | * |
| | | * @param targetDN The targetDN to use to check for equality. |
| | | * |
| | | * @return A boolean indicating if the targetDN if the same as the new DN of |
| | | * the ModifyDNMsg. |
| | | */ |
| | | public boolean newDNIsEqual(DN targetDN) |
| | | { |
| | | try |
| | | { |
| | | String newStringDN = newRDN + "," + newSuperior; |
| | | DN newDN = DN.decode(newStringDN); |
| | | |
| | | if (newDN.equals(targetDN)) |
| | | return true; |
| | | else |
| | | return false; |
| | | } catch (DirectoryException e) |
| | | { |
| | | // The DN was not a correct DN, and therefore does not match the |
| | | // DN given as a parameter. |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check if the new parent of the modifyDNMsg is the same as the targetDN |
| | | * given in parameter. |
| | | * |
| | | * @param targetDN the targetDN to use when checking equality. |
| | | * |
| | | * @return A boolean indicating if the new parent of the modifyDNMsg is the |
| | | * same as the targetDN. |
| | | */ |
| | | public boolean newParentIsEqual(DN targetDN) |
| | | { |
| | | try |
| | | { |
| | | DN newSuperiorDN = DN.decode(newSuperior); |
| | | |
| | | if (newSuperiorDN.equals(targetDN)) |
| | | return true; |
| | | else |
| | | return false; |
| | | } catch (DirectoryException e) |
| | | { |
| | | // The newsuperior was not a correct DN, and therefore does not match the |
| | | // DN given as a parameter. |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | * @throws DataFormatException if the encoded byte array is not valid. |
| | | * @throws UnsupportedEncodingException if UTF-8 is not supprted. |
| | | */ |
| | | public UpdateMessage(byte[] in) throws DataFormatException, |
| | | protected UpdateMessage(byte[] in) throws DataFormatException, |
| | | UnsupportedEncodingException |
| | | { |
| | | /* read the changeNumber */ |
| | |
| | | {} |
| | | } |
| | | |
| | | if ( (recentChangeNumber != null) && |
| | | (recentChangeNumber.getTimeSec() - changeNumber.getTimeSec() < 2)) |
| | | if ( (recentChangeNumber != null) && (changeNumber != null)) |
| | | { |
| | | flush(); |
| | | if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) || |
| | | ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20)) |
| | | { |
| | | flush(); |
| | | } |
| | | } |
| | | |
| | | return new ReplicationIterator(serverId, db, changeNumber); |
| | |
| | | newSocket = listenSocket.accept(); |
| | | newSocket.setReceiveBufferSize(1000000); |
| | | newSocket.setTcpNoDelay(true); |
| | | newSocket.setKeepAlive(true); |
| | | ServerHandler handler = new ServerHandler( |
| | | new SocketSession(newSocket), queueSize); |
| | | handler.start(null, serverId, serverURL, rcvWindow, this); |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | | import static org.testng.Assert.*; |
| | | |
| | | import java.io.File; |
| | | import java.io.FileNotFoundException; |
| | | import java.io.FileOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.OutputStream; |
| | | import java.net.ServerSocket; |
| | | import java.util.LinkedList; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.UUID; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.api.SynchronizationProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.plugin.DomainFakeCfg; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.plugin.ReplicationDomain; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | | import org.opends.server.replication.protocol.ModifyDNMsg; |
| | | import org.opends.server.replication.protocol.ModifyMsg; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.testng.annotations.*; |
| | | |
| | | /** |
| | | * Test that the dependencies are computed correctly when replaying |
| | | * sequences of operations that requires to follow a given order |
| | | * such as : ADD an entry, ADD a children entry. |
| | | */ |
| | | public class DependencyTest extends ReplicationTestCase |
| | | { |
| | | private static final String BASEDN_STRING = "dc=example,dc=com"; |
| | | |
| | | /** |
| | | * Check that a sequence of dependents adds and mods is correctly ordered: |
| | | * Using a deep dit : |
| | | * dc=example,dc=com |
| | | * | |
| | | * dc=dependency1 |
| | | * | |
| | | * dc=dependency2 |
| | | * | |
| | | * dc=dependency2 |
| | | * | |
| | | * |
| | | * | |
| | | * dc=dependencyN |
| | | * This test sends a sequence of interleaved ADD operations and MODIFY |
| | | * operations to build such a dit. |
| | | * |
| | | * Then test that the sequence of Delete necessary to remove |
| | | * all those entries is also correctly ordered. |
| | | */ |
| | | @SuppressWarnings("unchecked") |
| | | @Test(groups="slow") |
| | | public void addModDelDependencyTest() throws Exception |
| | | { |
| | | ReplicationServer replServer = null; |
| | | ReplicationDomain domain = null; |
| | | DN baseDn = DN.decode(BASEDN_STRING); |
| | | SynchronizationProvider replicationPlugin = null; |
| | | short brokerId = 2; |
| | | short serverId = 1; |
| | | short replServerId = 1; |
| | | int AddSequenceLength = 30; |
| | | |
| | | |
| | | cleanDB(); |
| | | |
| | | try |
| | | { |
| | | /* |
| | | * FIRST PART : |
| | | * Check that a sequence of dependent ADD is correctly ordered. |
| | | * |
| | | * - Create replication server |
| | | * - Send sequence of ADD messages to the replication server |
| | | * - Configure replication server |
| | | * - check that the last entry has been correctly added |
| | | */ |
| | | |
| | | String entryldif = |
| | | "dn:" + BASEDN_STRING + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: domain\n" |
| | | + "entryuuid: " + stringUID(1) + "\n"; |
| | | Entry entry = TestCaseUtils.entryFromLdifString(entryldif); |
| | | AttributeType uidType = |
| | | DirectoryServer.getSchema().getAttributeType("entryuuid"); |
| | | |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | int replServerPort = socket.getLocalPort(); |
| | | socket.close(); |
| | | |
| | | replicationPlugin = new MultimasterReplication(); |
| | | DirectoryServer.registerSynchronizationProvider(replicationPlugin); |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(replServerPort, "addModDeldependency", |
| | | 0, replServerId, 0, |
| | | AddSequenceLength*5+100, null); |
| | | replServer = new ReplicationServer(conf); |
| | | |
| | | ReplicationBroker broker = |
| | | openReplicationSession(baseDn, brokerId, 1000, replServerPort, 1000, |
| | | false); |
| | | |
| | | TimeThread.sleep(2000); |
| | | // send a sequence of add operation |
| | | |
| | | String addDn = BASEDN_STRING; |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L); |
| | | |
| | | int sequence; |
| | | for (sequence = 1; sequence<=AddSequenceLength; sequence ++) |
| | | { |
| | | entry.removeAttribute(uidType); |
| | | entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)), |
| | | new LinkedList<AttributeValue>()); |
| | | addDn = "dc=dependency" + sequence + "," + addDn; |
| | | AddMsg addMsg = |
| | | new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1), |
| | | stringUID(sequence), |
| | | entry.getObjectClassAttribute(), |
| | | entry.getAttributes(), null ); |
| | | broker.publish(addMsg); |
| | | |
| | | ModifyMsg modifyMsg = |
| | | new ModifyMsg(gen.NewChangeNumber(), DN.decode(addDn), |
| | | generatemods("description", "test"), |
| | | stringUID(sequence+1)); |
| | | broker.publish(modifyMsg); |
| | | } |
| | | |
| | | // configure and start replication of dc=example,dc=com on the server |
| | | SortedSet<String> replServers = new TreeSet<String>(); |
| | | replServers.add("localhost:"+replServerPort); |
| | | DomainFakeCfg domainConf = |
| | | new DomainFakeCfg(baseDn, serverId, replServers); |
| | | domainConf.setHeartbeatInterval(100000); |
| | | |
| | | domain = MultimasterReplication.createNewDomain(domainConf); |
| | | |
| | | // check that last entry in sequence got added. |
| | | Entry lastEntry = getEntry(DN.decode(addDn), 30000, true); |
| | | assertNotNull(lastEntry, |
| | | "The last entry of the ADD sequence was not added."); |
| | | |
| | | // Check that all the modify have been replayed |
| | | // (all the entries should have a description). |
| | | addDn = BASEDN_STRING; |
| | | for (sequence = 1; sequence<=AddSequenceLength; sequence ++) |
| | | { |
| | | addDn = "dc=dependency" + sequence + "," + addDn; |
| | | |
| | | boolean found = |
| | | checkEntryHasAttribute(DN.decode(addDn), "description", "test", |
| | | 10000, true); |
| | | if (!found) |
| | | { |
| | | fail("The modification was not replayed on entry " + addDn); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * SECOND PART |
| | | * |
| | | * Now check that the dependencies between delete are correctly |
| | | * managed. |
| | | * |
| | | * disable the domain while we publish the delete message to |
| | | * to replication server so that when we enable it it receives the |
| | | * delete operation in bulk. |
| | | */ |
| | | |
| | | domain.disable(); |
| | | Thread.sleep(2000); // necesary because disable does not wait |
| | | // for full termination of all threads. (issue 1571) |
| | | |
| | | DN deleteDN = DN.decode(addDn); |
| | | while (sequence-->1) |
| | | { |
| | | DeleteMsg delMsg = new DeleteMsg(deleteDN.toString(), |
| | | gen.NewChangeNumber(), |
| | | stringUID(sequence + 1)); |
| | | broker.publish(delMsg); |
| | | deleteDN = deleteDN.getParent(); |
| | | } |
| | | |
| | | domain.enable(); |
| | | |
| | | // check that entry just below the base entry was deleted. |
| | | // (we can't delete the base entry because some other tests might |
| | | // have added other children) |
| | | DN node1 = DN.decode("dc=dependency1," + BASEDN_STRING); |
| | | Entry baseEntry = getEntry(node1, 30000, false); |
| | | assertNull(baseEntry, |
| | | "The last entry of the DEL sequence was not deleted."); |
| | | } |
| | | finally |
| | | { |
| | | if (replServer != null) |
| | | replServer.shutdown(); |
| | | |
| | | if (domain != null) |
| | | MultimasterReplication.deleteDomain(baseDn); |
| | | |
| | | if (replicationPlugin != null) |
| | | DirectoryServer.deregisterSynchronizationProvider(replicationPlugin); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clean the database and replace with a single entry. |
| | | * |
| | | * @throws FileNotFoundException |
| | | * @throws IOException |
| | | * @throws Exception |
| | | */ |
| | | private void cleanDB() throws FileNotFoundException, IOException, Exception |
| | | { |
| | | String baseentryldif = |
| | | "dn:" + BASEDN_STRING + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: domain\n" |
| | | + "dc: example\n" |
| | | + "entryuuid: " + stringUID(1) + "\n"; |
| | | |
| | | |
| | | // Initialization : |
| | | // Load the database with a single entry : |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | | String path = buildRoot + File.separator + "build" + |
| | | File.separator + "unit-tests" + File.separator + |
| | | "package"+ File.separator + "addModDelDependencyTest"; |
| | | OutputStream out = new FileOutputStream(new File(path)); |
| | | out.write(baseentryldif.getBytes()); |
| | | |
| | | task("dn: ds-task-id=" + UUID.randomUUID() |
| | | + ",cn=Scheduled Tasks,cn=Tasks\n" |
| | | + "objectclass: top\n" |
| | | + "objectclass: ds-task\n" |
| | | + "objectclass: ds-task-import\n" |
| | | + "ds-task-class-name: org.opends.server.tasks.ImportTask\n" |
| | | + "ds-task-import-backend-id: userRoot\n" |
| | | + "ds-task-import-ldif-file: " + path + "\n" |
| | | + "ds-task-import-reject-file: " + path + "reject\n"); |
| | | } |
| | | |
| | | /** |
| | | * Check that after a sequence of add/del/add done on the same DN |
| | | * the second entry is in the database. |
| | | * The unique id of the entry is used to check that the correct entry |
| | | * has been added. |
| | | * To increase the risks of failures a loop of add/del/add is done. |
| | | */ |
| | | @SuppressWarnings("unchecked") |
| | | @Test(groups="slow") |
| | | public void addDelAddDependencyTest() throws Exception |
| | | { |
| | | ReplicationServer replServer = null; |
| | | ReplicationDomain domain = null; |
| | | DN baseDn = DN.decode(BASEDN_STRING); |
| | | SynchronizationProvider replicationPlugin = null; |
| | | short brokerId = 2; |
| | | short serverId = 1; |
| | | short replServerId = 1; |
| | | int AddSequenceLength = 30; |
| | | |
| | | cleanDB(); |
| | | |
| | | try |
| | | { |
| | | String entryldif = "dn:" + BASEDN_STRING + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: domain\n"; |
| | | Entry entry = TestCaseUtils.entryFromLdifString(entryldif); |
| | | AttributeType uidType = |
| | | DirectoryServer.getSchema().getAttributeType("entryuuid"); |
| | | |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | int replServerPort = socket.getLocalPort(); |
| | | socket.close(); |
| | | |
| | | replicationPlugin = new MultimasterReplication(); |
| | | DirectoryServer.registerSynchronizationProvider(replicationPlugin); |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(replServerPort, "addDelAdddependency", 0, |
| | | replServerId, |
| | | 0, 5*AddSequenceLength+100, null); |
| | | replServer = new ReplicationServer(conf); |
| | | |
| | | ReplicationBroker broker = |
| | | openReplicationSession(baseDn, brokerId, 100, replServerPort, 1000, |
| | | false); |
| | | |
| | | // send a sequence of add/del/add operations |
| | | |
| | | String addDn = BASEDN_STRING; |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L); |
| | | |
| | | int sequence; |
| | | for (sequence = 1; sequence<=AddSequenceLength; sequence ++) |
| | | { |
| | | // add the entry a first time |
| | | entry.removeAttribute(uidType); |
| | | entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)), |
| | | new LinkedList<AttributeValue>()); |
| | | addDn = "dc=dependency" + sequence + "," + addDn; |
| | | AddMsg addMsg = |
| | | new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1), |
| | | stringUID(sequence == 1 ? sequence : sequence +1000), |
| | | entry.getObjectClassAttribute(), |
| | | entry.getAttributes(), null ); |
| | | broker.publish(addMsg); |
| | | |
| | | // delete the entry |
| | | DeleteMsg delMsg = new DeleteMsg(addDn, gen.NewChangeNumber(), |
| | | stringUID(sequence+1)); |
| | | broker.publish(delMsg); |
| | | |
| | | // add again the entry with a new entryuuid. |
| | | entry.removeAttribute(uidType); |
| | | entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1001)), |
| | | new LinkedList<AttributeValue>()); |
| | | addMsg = |
| | | new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1001), |
| | | stringUID(sequence == 1 ? sequence : sequence +1000), |
| | | entry.getObjectClassAttribute(), |
| | | entry.getAttributes(), null ); |
| | | broker.publish(addMsg); |
| | | } |
| | | |
| | | // configure and start replication of dc=example,dc=com on the server |
| | | SortedSet<String> replServers = new TreeSet<String>(); |
| | | replServers.add("localhost:"+replServerPort); |
| | | DomainFakeCfg domainConf = |
| | | new DomainFakeCfg(baseDn, serverId, replServers); |
| | | |
| | | domain = MultimasterReplication.createNewDomain(domainConf); |
| | | |
| | | // check that all entries have been deleted and added |
| | | // again by checking that they do have the correct entryuuid |
| | | addDn = BASEDN_STRING; |
| | | for (sequence = 1; sequence<=AddSequenceLength; sequence ++) |
| | | { |
| | | addDn = "dc=dependency" + sequence + "," + addDn; |
| | | |
| | | boolean found = |
| | | checkEntryHasAttribute(DN.decode(addDn), "entryuuid", |
| | | stringUID(sequence+1001), |
| | | 30000, true); |
| | | if (!found) |
| | | { |
| | | fail("The second add was not replayed on entry " + addDn); |
| | | } |
| | | } |
| | | |
| | | DN deleteDN = DN.decode(addDn); |
| | | while (sequence-->1) |
| | | { |
| | | DeleteMsg delMsg = new DeleteMsg(deleteDN.toString(), |
| | | gen.NewChangeNumber(), |
| | | stringUID(sequence + 1001)); |
| | | broker.publish(delMsg); |
| | | deleteDN = deleteDN.getParent(); |
| | | } |
| | | |
| | | // check that the database was cleaned successfully |
| | | DN node1 = DN.decode("dc=dependency1," + BASEDN_STRING); |
| | | Entry baseEntry = getEntry(node1, 30000, false); |
| | | assertNull(baseEntry, |
| | | "The entry were not removed succesfully after test completion."); |
| | | |
| | | } |
| | | finally |
| | | { |
| | | if (replServer != null) |
| | | replServer.shutdown(); |
| | | |
| | | if (domain != null) |
| | | MultimasterReplication.deleteDomain(baseDn); |
| | | |
| | | if (replicationPlugin != null) |
| | | DirectoryServer.deregisterSynchronizationProvider(replicationPlugin); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check that the dependency of moddn operation are working by |
| | | * issuing a set of Add operation followed by a modrdn of the added entry. |
| | | */ |
| | | @SuppressWarnings("unchecked") |
| | | @Test(groups="slow") |
| | | public void addModdnDependencyTest() throws Exception |
| | | { |
| | | ReplicationServer replServer = null; |
| | | ReplicationDomain domain = null; |
| | | DN baseDn = DN.decode(BASEDN_STRING); |
| | | SynchronizationProvider replicationPlugin = null; |
| | | short brokerId = 2; |
| | | short serverId = 1; |
| | | short replServerId = 1; |
| | | int AddSequenceLength = 30; |
| | | |
| | | cleanDB(); |
| | | |
| | | try |
| | | { |
| | | String entryldif = "dn:" + BASEDN_STRING + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: domain\n"; |
| | | Entry entry = TestCaseUtils.entryFromLdifString(entryldif); |
| | | AttributeType uidType = |
| | | DirectoryServer.getSchema().getAttributeType("entryuuid"); |
| | | |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | int replServerPort = socket.getLocalPort(); |
| | | socket.close(); |
| | | |
| | | replicationPlugin = new MultimasterReplication(); |
| | | DirectoryServer.registerSynchronizationProvider(replicationPlugin); |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(replServerPort, "addModdndependency", 0, |
| | | replServerId, |
| | | 0, 5*AddSequenceLength+100, null); |
| | | replServer = new ReplicationServer(conf); |
| | | |
| | | ReplicationBroker broker = |
| | | openReplicationSession(baseDn, brokerId, 100, replServerPort, 1000, |
| | | false); |
| | | |
| | | |
| | | String addDn = BASEDN_STRING; |
| | | ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L); |
| | | |
| | | // send a sequence of add/modrdn operations |
| | | int sequence; |
| | | for (sequence = 1; sequence<=AddSequenceLength; sequence ++) |
| | | { |
| | | // add the entry |
| | | entry.removeAttribute(uidType); |
| | | entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)), |
| | | new LinkedList<AttributeValue>()); |
| | | addDn = "dc=dependency" + sequence + "," + BASEDN_STRING; |
| | | AddMsg addMsg = |
| | | new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1), |
| | | stringUID(1), |
| | | entry.getObjectClassAttribute(), |
| | | entry.getAttributes(), null ); |
| | | broker.publish(addMsg); |
| | | |
| | | // rename the entry |
| | | ModifyDNMsg moddnMsg = |
| | | new ModifyDNMsg(addDn, gen.NewChangeNumber(), stringUID(sequence+1), |
| | | stringUID(1), true, null, "dc=new_dep" + sequence); |
| | | broker.publish(moddnMsg); |
| | | } |
| | | |
| | | // configure and start replication of dc=example,dc=com on the server |
| | | SortedSet<String> replServers = new TreeSet<String>(); |
| | | replServers.add("localhost:"+replServerPort); |
| | | DomainFakeCfg domainConf = |
| | | new DomainFakeCfg(baseDn, serverId, replServers); |
| | | |
| | | domain = MultimasterReplication.createNewDomain(domainConf); |
| | | |
| | | // check that all entries have been renamed |
| | | for (sequence = 1; sequence<=AddSequenceLength; sequence ++) |
| | | { |
| | | addDn = "dc=new_dep" + sequence + "," + BASEDN_STRING; |
| | | |
| | | Entry baseEntry = getEntry(DN.decode(addDn), 30000, true); |
| | | assertNotNull(baseEntry, |
| | | "The rename was not applied correctly on :" + addDn); |
| | | } |
| | | |
| | | // delete the entries to clean the database. |
| | | for (sequence = 1; sequence<=AddSequenceLength; sequence ++) |
| | | { |
| | | addDn = "dc=new_dep" + sequence + "," + BASEDN_STRING; |
| | | DeleteMsg delMsg = new DeleteMsg(addDn.toString(), |
| | | gen.NewChangeNumber(), |
| | | stringUID(sequence + 1001)); |
| | | broker.publish(delMsg); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | if (replServer != null) |
| | | replServer.shutdown(); |
| | | |
| | | if (domain != null) |
| | | MultimasterReplication.deleteDomain(baseDn); |
| | | |
| | | if (replicationPlugin != null) |
| | | DirectoryServer.deregisterSynchronizationProvider(replicationPlugin); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Builds and return a uuid from an integer. |
| | | * This methods assume that unique integers are used and does not make any |
| | | * unicity checks. It is only responsible for generating a uid with a |
| | | * correct syntax. |
| | | */ |
| | | private String stringUID(int i) |
| | | { |
| | | return String.format("11111111-1111-1111-1111-%012x", i); |
| | | } |
| | | |
| | | } |
| | |
| | | // This test suite depends on having the schema available. |
| | | TestCaseUtils.startServer(); |
| | | |
| | | // Disable schema check |
| | | schemaCheck = DirectoryServer.checkSchema(); |
| | | DirectoryServer.setCheckSchema(false); |
| | | |
| | | baseDn = DN.decode("dc=example,dc=com"); |
| | | |
| | | updatedEntries = newLDIFEntries(); |
| | |
| | | |
| | | import java.net.ServerSocket; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.types.LDAPException; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.ModificationType; |
| | | import org.opends.server.types.Operation; |
| | | import org.opends.server.types.OperationType; |
| | | import org.opends.server.types.ResultCode; |
| | |
| | | // This test suite depends on having the schema available. |
| | | TestCaseUtils.startServer(); |
| | | |
| | | // Disable schema check |
| | | schemaCheck = DirectoryServer.checkSchema(); |
| | | DirectoryServer.setCheckSchema(false); |
| | | |
| | | // Create an internal connection |
| | | connection = InternalClientConnection.getRootConnection(); |
| | | |
| | |
| | | configureReplication(); |
| | | } |
| | | |
| | | /** |
| | | * @return |
| | | */ |
| | | private List<Modification> generatemods(String attrName, String attrValue) |
| | | { |
| | | AttributeType attrType = |
| | | DirectoryServer.getAttributeType(attrName.toLowerCase(), true); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(attrType, attrValue)); |
| | | Attribute attr = new Attribute(attrType, attrName, values); |
| | | List<Modification> mods = new ArrayList<Modification>(); |
| | | Modification mod = new Modification(ModificationType.REPLACE, attr); |
| | | mods.add(mod); |
| | | return mods; |
| | | } |
| | | |
| | | private void processModify(int count) |
| | | { |
| | | while (count>0) |
| | |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | | import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME; |
| | | import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE; |
| | | import static org.testng.Assert.assertEquals; |
| | | import static org.testng.Assert.fail; |
| | | |
| | | import java.io.File; |
| | |
| | | import java.util.UUID; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.backends.task.TaskState; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.schema.DirectoryStringSyntax; |
| | | import org.opends.server.types.AttributeType; |
| | | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SearchFilter; |
| | | import org.opends.server.types.SearchScope; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | |
| | | |
| | | configureReplication(); |
| | | |
| | | // Give some time to the replication to setup |
| | | // Give some time to the replication to setup |
| | | Thread.sleep(1000); |
| | | |
| | | |
| | | // Create a dummy entry |
| | | addEntry("dn: dc=dummy, dc=example,dc=com\n" |
| | | + "objectClass: top\n" + "objectClass: domain\n"); |
| | |
| | | String path = buildRoot + File.separator + "build" + |
| | | File.separator + "unit-tests" + File.separator + |
| | | "package"+ File.separator + "ReSynchTest"; |
| | | |
| | | |
| | | task("dn: ds-task-id=" + UUID.randomUUID() |
| | | + ",cn=Scheduled Tasks,cn=Tasks\n" |
| | | + "objectclass: top\n" |
| | |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Utility method to create, run a task and check its result. |
| | | */ |
| | | private void task(String task) throws Exception |
| | | { |
| | | Entry taskEntry = TestCaseUtils.makeEntry(task); |
| | | |
| | | InternalClientConnection connection = |
| | | InternalClientConnection.getRootConnection(); |
| | | |
| | | // Add the task. |
| | | AddOperation addOperation = |
| | | connection.processAdd(taskEntry.getDN(), |
| | | taskEntry.getObjectClasses(), |
| | | taskEntry.getUserAttributes(), |
| | | taskEntry.getOperationalAttributes()); |
| | | assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS, |
| | | "Add of the task definition was not successful"); |
| | | |
| | | // Wait until the task completes. |
| | | AttributeType completionTimeType = DirectoryServer.getAttributeType( |
| | | ATTR_TASK_COMPLETION_TIME.toLowerCase()); |
| | | SearchFilter filter = |
| | | SearchFilter.createFilterFromString("(objectclass=*)"); |
| | | Entry resultEntry = null; |
| | | String completionTime = null; |
| | | long startMillisecs = System.currentTimeMillis(); |
| | | do |
| | | { |
| | | InternalSearchOperation searchOperation = |
| | | connection.processSearch(taskEntry.getDN(), |
| | | SearchScope.BASE_OBJECT, |
| | | filter); |
| | | try |
| | | { |
| | | resultEntry = searchOperation.getSearchEntries().getFirst(); |
| | | } catch (Exception e) |
| | | { |
| | | continue; |
| | | } |
| | | completionTime = |
| | | resultEntry.getAttributeValue(completionTimeType, |
| | | DirectoryStringSyntax.DECODER); |
| | | |
| | | if (completionTime == null) |
| | | { |
| | | if (System.currentTimeMillis() - startMillisecs > 1000*30) |
| | | { |
| | | break; |
| | | } |
| | | Thread.sleep(10); |
| | | } |
| | | } while (completionTime == null); |
| | | |
| | | if (completionTime == null) |
| | | { |
| | | fail("The task has not completed after 30 seconds."); |
| | | } |
| | | |
| | | // Check that the task state is as expected. |
| | | AttributeType taskStateType = |
| | | DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); |
| | | String stateString = |
| | | resultEntry.getAttributeValue(taskStateType, |
| | | DirectoryStringSyntax.DECODER); |
| | | TaskState taskState = TaskState.fromString(stateString); |
| | | assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY, |
| | | "The task completed in an unexpected state"); |
| | | } |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | | import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME; |
| | | import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.testng.Assert.assertEquals; |
| | | import static org.testng.Assert.assertNotNull; |
| | | import static org.testng.Assert.fail; |
| | | |
| | | import java.net.SocketException; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.LinkedList; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.List; |
| | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.ReplicationBroker; |
| | | import org.opends.server.replication.plugin.PersistentServerState; |
| | | import org.opends.server.schema.DirectoryStringSyntax; |
| | | import org.opends.server.schema.IntegerSyntax; |
| | | import org.opends.server.backends.task.TaskState; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | |
| | | import org.opends.server.types.ByteStringFactory; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.ModificationType; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SearchFilter; |
| | | import org.opends.server.types.SearchScope; |
| | | import org.opends.server.types.SearchResultEntry; |
| | | import org.opends.server.types.AttributeType; |
| | |
| | | protected Entry replServerEntry; |
| | | |
| | | /** |
| | | * schema check flag |
| | | */ |
| | | protected boolean schemaCheck; |
| | | |
| | | /** |
| | | * The replication plugin entry |
| | | */ |
| | | protected String synchroPluginStringDN = |
| | |
| | | { |
| | | // This test suite depends on having the schema available. |
| | | TestCaseUtils.startServer(); |
| | | schemaCheck = DirectoryServer.checkSchema(); |
| | | |
| | | // Create an internal connection |
| | | connection = InternalClientConnection.getRootConnection(); |
| | |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | { |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "ReplicationTestCase/openChangelogSession" + e.getMessage(), 1); |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "cleaning config entry " + dn, 1); |
| | | |
| | | |
| | | op = new DeleteOperation(connection, InternalClientConnection |
| | | .nextOperationID(), InternalClientConnection.nextMessageID(), null, |
| | | dn); |
| | |
| | | // done |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * suppress all the real entries created by the tests in this class |
| | | */ |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "ReplicationTestCase/Cleaning entries" , 1); |
| | | |
| | | |
| | | DeleteOperation op; |
| | | // Delete entries |
| | | try |
| | |
| | | @AfterClass |
| | | public void classCleanUp() throws Exception |
| | | { |
| | | DirectoryServer.setCheckSchema(schemaCheck); |
| | | |
| | | cleanConfigEntries(); |
| | | cleanRealEntries(); |
| | | } |
| | |
| | | assertNotNull(DirectoryServer.getConfigEntry(DN |
| | | .decode(synchroPluginStringDN)), |
| | | "Unable to add the Multimaster replication plugin"); |
| | | |
| | | |
| | | // domains container entry. |
| | | String domainsLdif = "dn: " |
| | | + "cn=domains," + synchroPluginStringDN + "\n" |
| | |
| | | assertNotNull(DirectoryServer.getConfigEntry( |
| | | DN.decode(synchroPluginStringDN)), |
| | | "Unable to add the Multimaster replication plugin"); |
| | | |
| | | |
| | | |
| | | // Add the replication server |
| | | DirectoryServer.getConfigHandler().addEntry(replServerEntry, null); |
| | |
| | | protected boolean checkEntryHasAttribute(DN dn, String attrTypeStr, |
| | | String valueString, int timeout, boolean hasAttribute) throws Exception |
| | | { |
| | | boolean found; |
| | | boolean found = false; |
| | | int count = timeout/100; |
| | | if (count<1) |
| | | count=1; |
| | |
| | | newEntry = DirectoryServer.getEntry(dn); |
| | | |
| | | |
| | | if (newEntry == null) |
| | | fail("The entry " + dn + |
| | | " has incorrectly been deleted from the database."); |
| | | List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr); |
| | | Attribute tmpAttr = tmpAttrList.get(0); |
| | | if (newEntry != null) |
| | | { |
| | | List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr); |
| | | Attribute tmpAttr = tmpAttrList.get(0); |
| | | |
| | | AttributeType attrType = |
| | | DirectoryServer.getAttributeType(attrTypeStr, true); |
| | | found = tmpAttr.hasValue(new AttributeValue(attrType, valueString)); |
| | | AttributeType attrType = |
| | | DirectoryServer.getAttributeType(attrTypeStr, true); |
| | | found = tmpAttr.hasValue(new AttributeValue(attrType, valueString)); |
| | | } |
| | | |
| | | } |
| | | finally |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Generate a new modification replace with the given information. |
| | | * |
| | | * @param attrName The attribute to replace. |
| | | * @param attrValue The new value for the attribute |
| | | * |
| | | * @return The modification replace. |
| | | */ |
| | | protected List<Modification> generatemods(String attrName, String attrValue) |
| | | { |
| | | AttributeType attrType = |
| | | DirectoryServer.getAttributeType(attrName.toLowerCase(), true); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(attrType, attrValue)); |
| | | Attribute attr = new Attribute(attrType, attrName, values); |
| | | List<Modification> mods = new ArrayList<Modification>(); |
| | | Modification mod = new Modification(ModificationType.REPLACE, attr); |
| | | mods.add(mod); |
| | | return mods; |
| | | } |
| | | |
| | | /** |
| | | * Utility method to create, run a task and check its result. |
| | | */ |
| | | protected void task(String task) throws Exception |
| | | { |
| | | Entry taskEntry = TestCaseUtils.makeEntry(task); |
| | | |
| | | InternalClientConnection connection = |
| | | InternalClientConnection.getRootConnection(); |
| | | |
| | | // Add the task. |
| | | AddOperation addOperation = |
| | | connection.processAdd(taskEntry.getDN(), |
| | | taskEntry.getObjectClasses(), |
| | | taskEntry.getUserAttributes(), |
| | | taskEntry.getOperationalAttributes()); |
| | | assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS, |
| | | "Add of the task definition was not successful"); |
| | | |
| | | // Wait until the task completes. |
| | | AttributeType completionTimeType = DirectoryServer.getAttributeType( |
| | | ATTR_TASK_COMPLETION_TIME.toLowerCase()); |
| | | SearchFilter filter = |
| | | SearchFilter.createFilterFromString("(objectclass=*)"); |
| | | Entry resultEntry = null; |
| | | String completionTime = null; |
| | | long startMillisecs = System.currentTimeMillis(); |
| | | do |
| | | { |
| | | InternalSearchOperation searchOperation = |
| | | connection.processSearch(taskEntry.getDN(), |
| | | SearchScope.BASE_OBJECT, |
| | | filter); |
| | | try |
| | | { |
| | | resultEntry = searchOperation.getSearchEntries().getFirst(); |
| | | } catch (Exception e) |
| | | { |
| | | continue; |
| | | } |
| | | completionTime = |
| | | resultEntry.getAttributeValue(completionTimeType, |
| | | DirectoryStringSyntax.DECODER); |
| | | |
| | | if (completionTime == null) |
| | | { |
| | | if (System.currentTimeMillis() - startMillisecs > 1000*30) |
| | | { |
| | | break; |
| | | } |
| | | Thread.sleep(10); |
| | | } |
| | | } while (completionTime == null); |
| | | |
| | | if (completionTime == null) |
| | | { |
| | | fail("The task has not completed after 30 seconds."); |
| | | } |
| | | |
| | | // Check that the task state is as expected. |
| | | AttributeType taskStateType = |
| | | DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); |
| | | String stateString = |
| | | resultEntry.getAttributeValue(taskStateType, |
| | | DirectoryStringSyntax.DECODER); |
| | | TaskState taskState = TaskState.fromString(stateString); |
| | | assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY, |
| | | "The task completed in an unexpected state"); |
| | | } |
| | | |
| | | } |
| | |
| | | { |
| | | // This test suite depends on having the schema available. |
| | | TestCaseUtils.startServer(); |
| | | schemaCheck = DirectoryServer.checkSchema(); |
| | | |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | |
| | | import static org.testng.Assert.fail; |
| | | |
| | | import java.net.ServerSocket; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | |
| | |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.ModificationType; |
| | | import org.opends.server.types.Operation; |
| | | import org.opends.server.types.OperationType; |
| | | import org.opends.server.types.ResultCode; |
| | |
| | | // Create an internal connection |
| | | connection = InternalClientConnection.getRootConnection(); |
| | | |
| | | // Disable schema check |
| | | schemaCheck = DirectoryServer.checkSchema(); |
| | | DirectoryServer.setCheckSchema(false); |
| | | |
| | | // Create backend top level entries |
| | | String[] topEntries = new String[2]; |
| | | topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n" |
| | |
| | | configureReplication(); |
| | | } |
| | | |
| | | /** |
| | | * @return |
| | | */ |
| | | private List<Modification> generatemods(String attrName, String attrValue) |
| | | { |
| | | AttributeType attrType = |
| | | DirectoryServer.getAttributeType(attrName.toLowerCase(), true); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(attrType, attrValue)); |
| | | Attribute attr = new Attribute(attrType, attrName, values); |
| | | List<Modification> mods = new ArrayList<Modification>(); |
| | | Modification mod = new Modification(ModificationType.REPLACE, attr); |
| | | mods.add(mod); |
| | | return mods; |
| | | } |
| | | |
| | | private class BrokerWriter extends Thread |
| | | { |
| | | int count; |
| | |
| | | // This test suite depends on having the schema available. |
| | | TestCaseUtils.startServer(); |
| | | |
| | | // Disable schema check |
| | | schemaCheck = DirectoryServer.checkSchema(); |
| | | DirectoryServer.setCheckSchema(false); |
| | | |
| | | // Create an internal connection |
| | | connection = InternalClientConnection.getRootConnection(); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private List<Modification> generatemods(String attrName, String attrValue) |
| | | { |
| | | AttributeType attrType = |
| | | DirectoryServer.getAttributeType(attrName.toLowerCase(), true); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(attrType, attrValue)); |
| | | Attribute attr = new Attribute(attrType, attrName, values); |
| | | List<Modification> mods = new ArrayList<Modification>(); |
| | | Modification mod = new Modification(ModificationType.REPLACE, attr); |
| | | mods.add(mod); |
| | | return mods; |
| | | } |
| | | |
| | | /** |
| | | * Get the entryUUID for a given DN. |
| | | * |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.util.SortedSet; |
| | | |
| | | import org.opends.server.admin.ManagedObjectDefinition; |
| | | import org.opends.server.admin.PropertyProvider; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.client.MultimasterDomainCfgClient; |
| | | import org.opends.server.admin.std.server.MultimasterDomainCfg; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * This class implement a configuration object for the MultimasterDomain |
| | | * that can be used in unit tests to instantiate ReplicationDomain. |
| | | */ |
| | | public class DomainFakeCfg implements MultimasterDomainCfg |
| | | { |
| | | private DN baseDn; |
| | | private int serverId; |
| | | private SortedSet<String> replicationServers; |
| | | private long heartbeatInterval = 1000; |
| | | |
| | | /** |
| | | * Creates a new Domain with the provided information |
| | | */ |
| | | public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.serverId = serverId; |
| | | this.replicationServers = replServers; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void addChangeListener( |
| | | ConfigurationChangeListener<MultimasterDomainCfg> listener) |
| | | { |
| | | |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public ManagedObjectDefinition<? extends MultimasterDomainCfgClient, |
| | | ? extends MultimasterDomainCfg> definition() |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public long getHeartbeatInterval() |
| | | { |
| | | return heartbeatInterval ; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public long getMaxReceiveDelay() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int getMaxReceiveQueue() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public long getMaxSendDelay() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int getMaxSendQueue() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public DN getReplicationDN() |
| | | { |
| | | return baseDn; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public SortedSet<String> getReplicationServer() |
| | | { |
| | | return replicationServers; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int getWindowSize() |
| | | { |
| | | return 100; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void removeChangeListener( |
| | | ConfigurationChangeListener<MultimasterDomainCfg> listener) |
| | | { |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public DN dn() |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public PropertyProvider properties() |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Set the heartbeat interval. |
| | | * |
| | | * @param interval |
| | | */ |
| | | public void setHeartbeatInterval(long interval) |
| | | { |
| | | heartbeatInterval = interval; |
| | | } |
| | | |
| | | } |
| | |
| | | /** |
| | | * The port of the replicationServer. |
| | | */ |
| | | private int changelogPort; |
| | | private int replicationServerPort; |
| | | |
| | | private ChangeNumber firstChangeNumberServer1 = null; |
| | | private ChangeNumber secondChangeNumberServer1 = null; |
| | |
| | | |
| | | // find a free port for the replicationServer |
| | | ServerSocket socket = TestCaseUtils.bindFreePort(); |
| | | changelogPort = socket.getLocalPort(); |
| | | replicationServerPort = socket.getLocalPort(); |
| | | socket.close(); |
| | | |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(changelogPort, null, 0, 1, 0, 0, null); |
| | | new ReplServerFakeConfiguration(replicationServerPort, null, 0, 1, 0, 0, null); |
| | | replicationServer = new ReplicationServer(conf); |
| | | } |
| | | |
| | |
| | | * Open a sender session and a receiver session to the replicationServer |
| | | */ |
| | | server1 = openReplicationSession( |
| | | DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort, |
| | | DN.decode("dc=example,dc=com"), (short) 1, 100, replicationServerPort, |
| | | 1000, true); |
| | | server2 = openReplicationSession( |
| | | DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort, |
| | | DN.decode("dc=example,dc=com"), (short) 2, 100, replicationServerPort, |
| | | 1000, true); |
| | | |
| | | /* |
| | |
| | | try { |
| | | broker = |
| | | openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3, |
| | | 100, changelogPort, 1000, false); |
| | | 100, replicationServerPort, 1000, false); |
| | | |
| | | ReplicationMessage msg2 = broker.receive(); |
| | | if (!(msg2 instanceof DeleteMsg)) |
| | |
| | | try { |
| | | broker = |
| | | openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3, |
| | | 100, changelogPort, 1000, state); |
| | | 100, replicationServerPort, 1000, state); |
| | | |
| | | ReplicationMessage msg2 = broker.receive(); |
| | | if (!(msg2 instanceof DeleteMsg)) |
| | |
| | | * Open a sender session |
| | | */ |
| | | server = openReplicationSession( |
| | | DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort, |
| | | DN.decode("dc=example,dc=com"), (short) 5, 100, replicationServerPort, |
| | | 1000, 1000, 0, true); |
| | | |
| | | BrokerReader reader = new BrokerReader(server); |
| | |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | | { |
| | | clientBroker[i] = openReplicationSession( |
| | | DN.decode("dc=example,dc=com"), (short) (100+i), 100, changelogPort, |
| | | DN.decode("dc=example,dc=com"), (short) (100+i), 100, replicationServerPort, |
| | | 1000, true); |
| | | client[i] = new BrokerReader(clientBroker[i]); |
| | | } |
| | |
| | | new ChangeNumberGenerator(serverId , (long) 0); |
| | | ReplicationBroker broker = |
| | | openReplicationSession( DN.decode("dc=example,dc=com"), serverId, |
| | | 100, changelogPort, 1000, 1000, 0, true); |
| | | 100, replicationServerPort, 1000, 1000, 0, true); |
| | | |
| | | producer[i] = new BrokerWriter(broker, gen, TOTAL_MSG/THREADS); |
| | | reader[i] = new BrokerReader(broker); |