From 88e5620001d65afa8d0d8e07d1361fa44705743e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 11 May 2007 13:19:28 +0000
Subject: [PATCH] This code allows the replication code to replay operation in the correct order when operation have dependencies (like adding child entry after parent)
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java | 26
opends/src/server/org/opends/server/replication/protocol/AddMsg.java | 5
opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java | 2
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 150 ++-
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java | 183 ++++
opends/src/server/org/opends/server/replication/server/DbHandler.java | 9
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java | 84 ++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java | 17
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java | 553 +++++++++++++
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java | 559 +++++++++++++
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 1
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 140 ++
opends/src/server/org/opends/server/replication/plugin/PendingChange.java | 71 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/StressTest.java | 25
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java | 1
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java | 86 --
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 20
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 428 +++++-----
opends/src/server/org/opends/server/replication/common/ServerState.java | 21
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java | 4
20 files changed, 1,900 insertions(+), 485 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index b7e64d0..38fd50d 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -304,4 +304,25 @@
{
return list.keySet().iterator();
}
+
+ /**
+ * Check that all the ChangeNumbers in the covered serverState are also in
+ * this serverState.
+ *
+ * @param covered The ServerState that needs to be checked.
+ * @return A boolean indicating if this ServerState covers the ServerState
+ * given in parameter.
+ */
+ public boolean cover(ServerState covered)
+ {
+ for (ChangeNumber coveredChange : covered.list.values())
+ {
+ ChangeNumber change = this.list.get(coveredChange.getServerId());
+ if ((change == null) || (change.older(coveredChange)))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index 6313387..27a2466 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -82,6 +82,68 @@
/**
+ * Finds the domain for a given DN.
+ *
+ * @param dn The DN for which the domain must be returned.
+ * @param op An optional operation for which the check is done.
+ * Can be null is the request has no associated operation.
+ * @return The domain for this DN.
+ */
+ public static ReplicationDomain findDomain(DN dn, Operation op)
+ {
+ /*
+ * Don't run the special replication code on Operation that are
+ * specifically marked as don't synchronize.
+ */
+ if ((op != null) && op.dontSynchronize())
+ return null;
+
+ ReplicationDomain domain = null;
+ DN temp = dn;
+ do
+ {
+ domain = domains.get(temp);
+ temp = temp.getParentDNInSuffix();
+ if (temp == null)
+ {
+ break;
+ }
+ } while (domain == null);
+
+ return domain;
+ }
+
+ /**
+ * Creates a new domain from its configEntry, do the
+ * necessary initialization and starts it so that it is
+ * fully operational when this method returns.
+ * @param configuration The entry whith the configuration of this domain.
+ * @return The domain created.
+ * @throws ConfigException When the configuration is not valid.
+ */
+ public static ReplicationDomain createNewDomain(
+ MultimasterDomainCfg configuration)
+ throws ConfigException
+ {
+ ReplicationDomain domain;
+ domain = new ReplicationDomain(configuration);
+ domains.put(domain.getBaseDN(), domain);
+ domain.start();
+ return domain;
+ }
+
+ /**
+ * Deletes a domain.
+ * @param dn : the base DN of the domain to delete.
+ */
+ public static void deleteDomain(DN dn)
+ {
+ ReplicationDomain domain = domains.remove(dn);
+ if (domain != null)
+ domain.shutdown();
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
@@ -149,23 +211,6 @@
}
/**
- * Creates a new domain from its configEntry, do the
- * necessary initialization and starts it so that it is
- * fully operational when this method returns.
- * @param configuration The entry whith the configuration of this domain.
- * @throws ConfigException When the configuration is not valid.
- */
- private void createNewDomain(
- MultimasterDomainCfg configuration)
- throws ConfigException
- {
- ReplicationDomain domain;
- domain = new ReplicationDomain(configuration);
- domains.put(domain.getBaseDN(), domain);
- domain.start();
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -354,55 +399,6 @@
}
/**
- * Finds the domain for a given DN.
- *
- * @param dn The DN for which the domain must be returned.
- * @param op An optional operation for which the check is done.
- * Can be null is the request has no associated operation.
- * @return The domain for this DN.
- */
- public static ReplicationDomain findDomain(DN dn, Operation op)
- {
- /*
- * Don't run the special replication code on Operation that are
- * specifically marked as don't synchronize.
- */
- if ((op != null) && op.dontSynchronize())
- return null;
-
- ReplicationDomain domain = null;
- DN temp = dn;
- do
- {
- domain = domains.get(temp);
- temp = temp.getParentDNInSuffix();
- if (temp == null)
- {
- break;
- }
- } while (domain == null);
-
- return domain;
- }
-
- /**
- * Generic code for all the postOperation entry point.
- *
- * @param operation The Operation for which the post-operation is called.
- * @param dn The Dn for which the post-operation is called.
- */
- private void genericPostOperation(Operation operation, DN dn)
- {
- ReplicationDomain domain = findDomain(dn, operation);
- if (domain == null)
- return;
-
- domain.synchronize(operation);
-
- return;
- }
-
- /**
* This method is called whenever the server detects a modification
* of the schema done by directly modifying the backing files
* of the schema backend.
@@ -535,10 +531,7 @@
public ConfigChangeResult applyConfigurationDelete(
MultimasterDomainCfg configuration)
{
- DN dn = configuration.getReplicationDN();
- ReplicationDomain domain = domains.remove(dn);
- if (domain != null)
- domain.shutdown();
+ deleteDomain(configuration.getReplicationDN());
return new ConfigChangeResult(ResultCode.SUCCESS, false);
}
@@ -551,6 +544,23 @@
{
return true;
}
+
+ /**
+ * Generic code for all the postOperation entry point.
+ *
+ * @param operation The Operation for which the post-operation is called.
+ * @param dn The Dn for which the post-operation is called.
+ */
+ private void genericPostOperation(Operation operation, DN dn)
+ {
+ ReplicationDomain domain = findDomain(dn, operation);
+ if (domain == null)
+ return;
+
+ domain.synchronize(operation);
+
+ return;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChange.java b/opends/src/server/org/opends/server/replication/plugin/PendingChange.java
index bb42a60..a73abe0 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PendingChange.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -27,19 +27,24 @@
package org.opends.server.replication.plugin;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMessage;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
import org.opends.server.types.Operation;
/**
- * This class is use to store the list of operations currently
+ * This class is use to store an operation currently
* in progress and not yet committed in the database.
*/
-public class PendingChange
+public class PendingChange implements Comparable<PendingChange>
{
private ChangeNumber changeNumber;
private boolean committed;
private UpdateMessage msg;
private Operation op;
+ private ServerState dependencyState = null;
+ private DN targetDN = null;
/**
* Construct a new PendingChange.
@@ -121,4 +126,66 @@
this.op = op;
}
+ /**
+ * Add the given ChangeNumber in the list of dependencies of this
+ * PendingChange.
+ *
+ * @param changeNumber The ChangeNumber to add in the list of dependencies
+ * of this PendingChange.
+ */
+ public void addDependency(ChangeNumber changeNumber)
+ {
+ if (dependencyState == null)
+ {
+ dependencyState = new ServerState();
+ }
+ dependencyState.update(changeNumber);
+ }
+
+ /**
+ * Check if the given ServerState covers the dependencies of this
+ * PendingChange.
+ *
+ * @param state The ServerState for which dependencies must be checked,
+ *
+ * @return A boolean indicating if the given ServerState covers the
+ * dependencies of this PendingChange.
+ */
+ public boolean dependenciesIsCovered(ServerState state)
+ {
+ return state.cover(dependencyState);
+ }
+
+ /**
+ * Get the Target DN of this message.
+ *
+ * @return The target DN of this message.
+ */
+ public DN getTargetDN()
+ {
+ synchronized (this)
+ {
+ if (targetDN != null)
+ return targetDN;
+ else
+ {
+ try
+ {
+ targetDN = DN.decode(msg.getDn());
+ }
+ catch (DirectoryException e)
+ {
+ }
+ }
+ return targetDN;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int compareTo(PendingChange o)
+ {
+ return this.getChangeNumber().compareTo(o.getChangeNumber());
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
new file mode 100644
index 0000000..8f5f295
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -0,0 +1,559 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import java.util.NoSuchElementException;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.opends.server.core.AddOperation;
+import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.ModifyOperation;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.replication.protocol.OperationContext;
+import org.opends.server.replication.protocol.UpdateMessage;
+import org.opends.server.types.DN;
+import org.opends.server.types.Operation;
+
+/**
+ *
+ * This class is use to store the list of operations currently
+ * in progress and not yet committed in the database.
+ */
+public class PendingChanges
+{
+ /**
+ * A map used to store the pending changes.
+ */
+ private SortedMap<ChangeNumber, PendingChange> pendingChanges =
+ new TreeMap<ChangeNumber, PendingChange>();
+
+ /**
+ * A sorted set containing the list of PendingChanges that have
+ * not been replayed correctly because they are dependent on
+ * another change to be completed.
+ */
+ private SortedSet<PendingChange> dependentChanges =
+ new TreeSet<PendingChange>();
+
+ /**
+ * The ChangeNumberGenerator to use to create new unique ChangeNumbers
+ * for each operation done on the replication domain.
+ */
+ private ChangeNumberGenerator changeNumberGenerator;
+
+ /**
+ * The Replicationbroker that will be used to send UpdateMessage.
+ */
+ private ReplicationBroker broker;
+
+ /**
+ * The ServerState that will be updated when UpdateMessage are committed.
+ */
+ private ServerState state;
+
+ /**
+ * Creates a new PendingChanges using the provided ChangeNumberGenerator.
+ *
+ * @param changeNumberGenerator The ChangeNumberGenerator to use to create
+ * new unique ChangeNumbers.
+ * @param broker The Replicationbroker that will be used to send
+ * UpdateMessage.
+ * @param state The ServerState that will be updated when UpdateMessage
+ * are committed.
+ */
+ public PendingChanges(
+ ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker,
+ ServerState state)
+ {
+ this.changeNumberGenerator = changeNumberGenerator;
+ this.broker = broker;
+ this.state = state;
+ }
+
+ /**
+ * Remove and return an update form the pending changes list.
+ *
+ * @param changeNumber The ChangeNumber of the update to remove.
+ *
+ * @return The UpdateMessage that was just removed.
+ */
+ public synchronized UpdateMessage remove(ChangeNumber changeNumber)
+ {
+ return pendingChanges.remove(changeNumber).getMsg();
+ }
+
+ /**
+ * Returns the number of update currently in the list.
+ *
+ * @return The number of update currently in the list.
+ */
+ public synchronized int size()
+ {
+ return pendingChanges.size();
+ }
+
+ /**
+ * Mark an update message as committed.
+ *
+ * @param changeNumber The ChangeNumber of the update message that must be
+ * set as committed.
+ * @param op The Operation associated of the update when the
+ * update is a local update.
+ * @param msg The message associated to the update when the update
+ * was received from a replication server.
+ */
+ public synchronized void commit(ChangeNumber changeNumber,
+ Operation op, UpdateMessage msg)
+ {
+ PendingChange curChange = pendingChanges.get(changeNumber);
+ if (curChange == null)
+ {
+ throw new NoSuchElementException();
+ }
+ curChange.setCommitted(true);
+
+ if (op.isSynchronizationOperation())
+ curChange.setOp(op);
+ else
+ curChange.setMsg(msg);
+ }
+
+ /**
+ * Mark an update message as committed.
+ *
+ * @param changeNumber The ChangeNumber of the update message that must be
+ * set as committed.
+ */
+ public synchronized void commit(ChangeNumber changeNumber)
+ {
+ PendingChange curChange = pendingChanges.get(changeNumber);
+ if (curChange == null)
+ {
+ throw new NoSuchElementException();
+ }
+ curChange.setCommitted(true);
+ }
+
+ /**
+ * Add a new UpdateMessage to the pending list from the provided local
+ * operation.
+ *
+ * @param operation The local operation for which an UpdateMessage mus
+ * be added in the pending list.
+ * @return The ChangeNumber now associated to the operation.
+ */
+ public synchronized ChangeNumber putLocalOperation(Operation operation)
+ {
+ ChangeNumber changeNumber;
+
+ changeNumber = changeNumberGenerator.NewChangeNumber();
+ PendingChange change = new PendingChange(changeNumber, operation, null);
+ pendingChanges.put(changeNumber, change);
+ return changeNumber;
+
+ }
+
+ /**
+ * Add a new UpdateMessage that was received from the replication server
+ * to the pendingList.
+ *
+ * @param update The UpdateMessage that was received from the replication
+ * server and that will be added to the pending list.
+ */
+ public synchronized void putRemoteUpdate(UpdateMessage update)
+ {
+ ChangeNumber changeNumber = update.getChangeNumber();
+ changeNumberGenerator.adjust(changeNumber);
+ pendingChanges.put(changeNumber, new PendingChange(changeNumber, null,
+ update));
+ }
+
+ /**
+ * Push all committed local changes to the replicationServer service.
+ *
+ * @return The number of pushed updates.
+ */
+ public synchronized int pushCommittedChanges()
+ {
+ int numSentUpdates = 0;
+ if (pendingChanges.isEmpty())
+ return numSentUpdates;
+
+ ChangeNumber firstChangeNumber = pendingChanges.firstKey();
+ PendingChange firstChange = pendingChanges.get(firstChangeNumber);
+
+ while ((firstChange != null) && firstChange.isCommitted())
+ {
+ if ((firstChange.getOp() != null ) &&
+ (firstChange.getOp().isSynchronizationOperation() == false))
+ {
+ numSentUpdates++;
+ broker.publish(firstChange.getMsg());
+ }
+ state.update(firstChangeNumber);
+ pendingChanges.remove(firstChangeNumber);
+
+ if (pendingChanges.isEmpty())
+ {
+ firstChange = null;
+ }
+ else
+ {
+ firstChangeNumber = pendingChanges.firstKey();
+ firstChange = pendingChanges.get(firstChangeNumber);
+ }
+ }
+ return numSentUpdates;
+ }
+
+ /**
+ * Get the first update in the list that have some dependencies cleared.
+ *
+ * @return The UpdateMessage to be handled.
+ */
+ public synchronized UpdateMessage getNextUpdate()
+ {
+ /*
+ * Parse the list of Update with dependencies and check if the dependencies
+ * are now cleared until an Update withour dependencies is found.
+ */
+ for (PendingChange change : dependentChanges)
+ {
+ if (change.dependenciesIsCovered(state))
+ {
+ dependentChanges.remove(change);
+ return change.getMsg();
+ }
+ }
+ return null;
+ }
+
+
+ /**
+ * Check if the given AddOperation has some dependencies on any
+ * currently running previous operation.
+ * Update the dependency list in the associated PendingChange if
+ * there are some dependencies.
+ * AddOperation depends on
+ *
+ * - DeleteOperation done on the same DN
+ * - ModifyDnOperation with the same target DN as the ADD DN
+ * - ModifyDnOperation with new DN equals to the ADD DN parent
+ * - AddOperation done on the parent DN of the ADD DN
+ *
+ * @param op The AddOperation to be checked.
+ *
+ * @return A boolean indicating if this operation has some dependencies.
+ */
+ public synchronized boolean checkDependencies(AddOperation op)
+ {
+ boolean hasDependencies = false;
+ DN targetDn = op.getEntryDN();
+ ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
+ PendingChange change = pendingChanges.get(changeNumber);
+ if (change == null)
+ return false;
+
+ for (PendingChange pendingChange : pendingChanges.values())
+ {
+ if (pendingChange.getChangeNumber().older(changeNumber))
+ {
+ UpdateMessage pendingMsg = pendingChange.getMsg();
+ if (pendingMsg != null)
+ {
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ /*
+ * Check is the operation to be run is a deleteOperation on the
+ * same DN.
+ */
+ if (pendingChange.getTargetDN().equals(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof AddMsg)
+ {
+ /*
+ * Check if the operation to be run is an addOperation on a
+ * parent of the current AddOperation.
+ */
+ if (pendingChange.getTargetDN().isAncestorOf(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ /*
+ * Check if the operation to be run is ModifyDnOperation with
+ * the same target DN as the ADD DN
+ * or a ModifyDnOperation with new DN equals to the ADD DN parent
+ */
+ if (pendingChange.getTargetDN().equals(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ else
+ {
+ ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
+ if (pendingModDn.newDNIsParent(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ }
+ }
+ }
+ }
+ return hasDependencies;
+ }
+
+ /**
+ * Check if the given ModifyOperation has some dependencies on any
+ * currently running previous operation.
+ * Update the dependency list in the associated PendingChange if
+ * there are some dependencies.
+ *
+ * ModifyOperation depends on
+ * - AddOperation done on the same DN
+ *
+ * @param op The ModifyOperation to be checked.
+ *
+ * @return A boolean indicating if this operation has some dependencies.
+ */
+ public synchronized boolean checkDependencies(ModifyOperation op)
+ {
+ boolean hasDependencies = false;
+ DN targetDn = op.getEntryDN();
+ ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
+ PendingChange change = pendingChanges.get(changeNumber);
+ if (change == null)
+ return false;
+
+ for (PendingChange pendingChange : pendingChanges.values())
+ {
+ if (pendingChange.getChangeNumber().older(changeNumber))
+ {
+ UpdateMessage pendingMsg = pendingChange.getMsg();
+ if (pendingMsg != null)
+ {
+ if (pendingMsg instanceof AddMsg)
+ {
+ /*
+ * Check if the operation to be run is an addOperation on a
+ * same DN.
+ */
+ if (pendingChange.getTargetDN().equals(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ }
+ }
+ }
+ return hasDependencies;
+ }
+
+ /**
+ * Mark the first pendingChange as dependent on the second PendingChange.
+ * @param dependentChange The PendingChange that depend on the second
+ * PendingChange.
+ * @param pendingChange The PendingChange on which the first PendingChange
+ * is dependent.
+ */
+ private void addDependency(
+ PendingChange dependentChange, PendingChange pendingChange)
+ {
+ dependentChange.addDependency(pendingChange.getChangeNumber());
+ dependentChanges.add(dependentChange);
+ }
+
+ /**
+ * Check if the given ModifyDNMsg has some dependencies on any
+ * currently running previous operation.
+ * Update the dependency list in the associated PendingChange if
+ * there are some dependencies.
+ *
+ * Modify DN Operation depends on
+ * - AddOperation done on the same DN as the target DN of the MODDN operation
+ * - AddOperation done on the new parent of the MODDN operation
+ * - DeleteOperation done on the new DN of the MODDN operation
+ * - ModifyDNOperation done from the new DN of the MODDN operation
+ *
+ * @param msg The ModifyDNMsg to be checked.
+ *
+ * @return A boolean indicating if this operation has some dependencies.
+ */
+ public synchronized boolean checkDependencies(ModifyDNMsg msg)
+ {
+ boolean hasDependencies = false;
+ ChangeNumber changeNumber = msg.getChangeNumber();
+ PendingChange change = pendingChanges.get(changeNumber);
+ if (change == null)
+ return false;
+
+ DN targetDn = change.getTargetDN();
+
+
+ for (PendingChange pendingChange : pendingChanges.values())
+ {
+ if (pendingChange.getChangeNumber().older(changeNumber))
+ {
+ UpdateMessage pendingMsg = pendingChange.getMsg();
+ if (pendingMsg != null)
+ {
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ // Check if the target of the Delete is the same
+ // as the new DN of this ModifyDN
+ if (msg.newDNIsEqual(pendingChange.getTargetDN()))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof AddMsg)
+ {
+ // Check if the Add Operation was done on the new parent of
+ // the MODDN operation
+ if (msg.newParentIsEqual(pendingChange.getTargetDN()))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ // Check if the AddOperation was done on the same DN as the
+ // target DN of the MODDN operation
+ if (pendingChange.getTargetDN().equals(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ // Check if the ModifyDNOperation was done from the new DN of
+ // the MODDN operation
+ if (msg.newDNIsEqual(pendingChange.getTargetDN()))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ }
+ }
+ }
+ return hasDependencies;
+ }
+
+ /**
+ * Check if the given DeleteOperation has some dependencies on any
+ * currently running previous operation.
+ * Update the dependency list in the associated PendingChange if
+ * there are some dependencies.
+ *
+ * DeleteOperation depends on
+ * - DeleteOperation done on children DN
+ * - ModifyDnOperation with target DN that are children of the DEL DN
+ * - AddOperation done on the same DN
+ *
+ *
+ * @param op The DeleteOperation to be checked.
+ *
+ * @return A boolean indicating if this operation has some dependencies.
+ */
+ public synchronized boolean checkDependencies(DeleteOperation op)
+ {
+ boolean hasDependencies = false;
+ DN targetDn = op.getEntryDN();
+ ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
+ PendingChange change = pendingChanges.get(changeNumber);
+ if (change == null)
+ return false;
+
+ for (PendingChange pendingChange : pendingChanges.values())
+ {
+ if (pendingChange.getChangeNumber().older(changeNumber))
+ {
+ UpdateMessage pendingMsg = pendingChange.getMsg();
+ if (pendingMsg != null)
+ {
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ /*
+ * Check if the operation to be run is a deleteOperation on a
+ * children of the current DeleteOperation.
+ */
+ if (pendingChange.getTargetDN().isDescendantOf(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof AddMsg)
+ {
+ /*
+ * Check if the operation to be run is an addOperation on a
+ * parent of the current DeleteOperation.
+ */
+ if (pendingChange.getTargetDN().equals(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ /*
+ * Check if the operation to be run is an ModifyDNOperation
+ * on a children of the current DeleteOperation
+ */
+ if (pendingChange.getTargetDN().isDescendantOf(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ }
+ }
+ }
+ return hasDependencies;
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index e0a9cb7..93d7ba1 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -49,6 +49,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -128,17 +129,14 @@
{
private ReplicationMonitor monitor;
- private ChangeNumberGenerator changeNumberGenerator;
private ReplicationBroker broker;
private List<ListenerThread> synchroThreads =
new ArrayList<ListenerThread>();
- private final SortedMap<ChangeNumber, PendingChange> pendingChanges =
- new TreeMap<ChangeNumber, PendingChange>();
private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
new TreeMap<ChangeNumber, UpdateMessage>();
- private int numRcvdUpdates = 0;
- private int numSentUpdates = 0;
+ private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
+ private AtomicInteger numSentUpdates = new AtomicInteger(0);
private AtomicInteger numProcessedUpdates = new AtomicInteger();
private int debugCount = 0;
private PersistentServerState state;
@@ -150,6 +148,19 @@
private int maxSendDelay = 0;
/**
+ * This object is used to store the list of update currently being
+ * done on the local database.
+ * It contain both the update that are done directly on this server
+ * and the updates that was done on another server, transmitted
+ * by the replication server and that are currently replayed.
+ * It is usefull to make sure that dependencies between operations
+ * are correctly fullfilled, that the local operations are sent in a
+ * correct order to the replication server and that the ServerState
+ * is not updated too early.
+ */
+ private PendingChanges pendingChanges;
+
+ /**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
*/
@@ -245,7 +256,7 @@
private ConfigEntry backendConfigEntry;
private List<DN> branches = new ArrayList<DN>(0);
- private int listenerThreadNumber = 1;
+ private int listenerThreadNumber = 10;
private boolean receiveStatus = true;
private Collection<String> replicationServers;
@@ -317,12 +328,6 @@
DirectoryServer.registerMonitorProvider(monitor);
/*
- * ChangeNumberGenerator is used to create new unique ChangeNumbers
- * for each operation done on the replication domain.
- */
- changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
-
- /*
* create the broker object used to publish and receive changes
*/
try
@@ -348,6 +353,14 @@
*/
}
+ /*
+ * ChangeNumberGenerator is used to create new unique ChangeNumbers
+ * for each operation done on the replication domain.
+ */
+ pendingChanges =
+ new PendingChanges(new ChangeNumberGenerator(serverId, state),
+ broker, state);
+
// listen for changes on the configuration
configuration.addChangeListener(this);
}
@@ -444,23 +457,37 @@
* of the parent entry
*/
- // There is a potential of perfs improvement here
- // if we could avoid the following parent entry retrieval
- DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
-
- if (parentDnFromCtx != null)
+ String parentUid = ctx.getParentUid();
+ // root entry have no parent,
+ // there is no need to check for it.
+ if (parentUid != null)
{
- DN entryDN = addOperation.getEntryDN();
- DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
- if ((parentDnFromEntryDn != null)
- && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
+ // There is a potential of perfs improvement here
+ // if we could avoid the following parent entry retrieval
+ DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
+
+ if (parentDnFromCtx == null)
{
- // parentEntry has been renamed
- // replication name conflict resolution is expected to fix that
- // later in the flow
+ // The parent does not exist with the specified unique id
+ // stop the operation with NO_SUCH_OBJECT and let the
+ // conflict resolution or the dependency resolution solve this.
addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
return new SynchronizationProviderResult(false);
}
+ else
+ {
+ DN entryDN = addOperation.getEntryDN();
+ DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
+ if ((parentDnFromEntryDn != null)
+ && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
+ {
+ // parentEntry has been renamed
+ // replication name conflict resolution is expected to fix that
+ // later in the flow
+ addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+ return new SynchronizationProviderResult(false);
+ }
+ }
}
}
return new SynchronizationProviderResult(true);
@@ -633,89 +660,96 @@
*/
public UpdateMessage receive()
{
- synchronized (broker)
+ UpdateMessage update = pendingChanges.getNextUpdate();
+
+ if (update == null)
{
- UpdateMessage update = null;
- while (update == null)
+ synchronized (broker)
{
- ReplicationMessage msg;
- try
+ while (update == null)
{
- msg = broker.receive();
- if (msg == null)
+ ReplicationMessage msg;
+ try
{
- // The server is in the shutdown process
- return null;
- }
- log("Broker received message :" + msg);
- if (msg instanceof AckMessage)
- {
- AckMessage ack = (AckMessage) msg;
- receiveAck(ack);
- }
- else if (msg instanceof UpdateMessage)
- {
- update = (UpdateMessage) msg;
- receiveUpdate(update);
- }
- else if (msg instanceof InitializeRequestMessage)
- {
- // Another server requests us to provide entries
- // for a total update
- InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
- try
+ msg = broker.receive();
+ if (msg == null)
{
- initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
- null);
+ // The server is in the shutdown process
+ return null;
}
- catch(DirectoryException de)
+ log("Broker received message :" + msg);
+ if (msg instanceof AckMessage)
{
- // Returns an error message to notify the sender
- int msgID = de.getMessageID();
- ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
- msgID, de.getMessage());
- broker.publish(errorMsg);
+ AckMessage ack = (AckMessage) msg;
+ receiveAck(ack);
}
- }
- else if (msg instanceof InitializeTargetMessage)
- {
- // Another server is exporting its entries to us
- InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
+ else if (msg instanceof InitializeRequestMessage)
+ {
+ // Another server requests us to provide entries
+ // for a total update
+ InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
+ try
+ {
+ initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+ null);
+ }
+ catch(DirectoryException de)
+ {
+ // Returns an error message to notify the sender
+ int msgID = de.getMessageID();
+ ErrorMessage errorMsg =
+ new ErrorMessage(initMsg.getsenderID(),
+ msgID, de.getMessage());
+ broker.publish(errorMsg);
+ }
+ }
+ else if (msg instanceof InitializeTargetMessage)
+ {
+ // Another server is exporting its entries to us
+ InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
- try
- {
- importBackend(initMsg);
+ try
+ {
+ importBackend(initMsg);
+ }
+ catch(DirectoryException de)
+ {
+ // Return an error message to notify the sender
+ int msgID = de.getMessageID();
+ ErrorMessage errorMsg =
+ new ErrorMessage(initMsg.getsenderID(),
+ msgID, de.getMessage());
+ log(getMessage(msgID,
+ backend.getBackendID()) + de.getMessage());
+ broker.publish(errorMsg);
+ }
}
- catch(DirectoryException de)
+ else if (msg instanceof ErrorMessage)
{
- // Return an error message to notify the sender
- int msgID = de.getMessageID();
- ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
- msgID, de.getMessage());
- log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
- broker.publish(errorMsg);
+ if (ieContext != null)
+ {
+ // This is an error termination for the 2 following cases :
+ // - either during an export
+ // - or before an import really started
+ // For example, when we publish a request and the
+ // replicationServer did not find any import source.
+ abandonImportExport((ErrorMessage)msg);
+ }
+ }
+ else if (msg instanceof UpdateMessage)
+ {
+ update = (UpdateMessage) msg;
+ receiveUpdate(update);
}
}
- else if (msg instanceof ErrorMessage)
+ catch (SocketTimeoutException e)
{
- if (ieContext != null)
- {
- // This is an error termination for the 2 following cases :
- // - either during an export
- // - or before an import really started
- // For example, when we publish a request and the
- // replicationServer did not find any import source.
- abandonImportExport((ErrorMessage)msg);
- }
+ // just retry
}
- } catch (SocketTimeoutException e)
- {
- // just retry
}
-
}
- return update;
}
+ return update;
}
/**
@@ -725,21 +759,8 @@
*/
public void receiveUpdate(UpdateMessage update)
{
- ChangeNumber changeNumber = update.getChangeNumber();
-
- synchronized (pendingChanges)
- {
- if (pendingChanges.containsKey(changeNumber))
- {
- /*
- * This should never happen,
- * TODO log error and throw exception
- */
- }
- pendingChanges.put(changeNumber,
- new PendingChange(changeNumber, null, update));
- numRcvdUpdates++;
- }
+ pendingChanges.putRemoteUpdate(update);
+ numRcvdUpdates.incrementAndGet();
}
/**
@@ -752,10 +773,9 @@
UpdateMessage update;
ChangeNumber changeNumber = ack.getChangeNumber();
- synchronized (pendingChanges)
+ synchronized (waitingAckMsgs)
{
- update = waitingAckMsgs.get(changeNumber);
- waitingAckMsgs.remove(changeNumber);
+ update = waitingAckMsgs.remove(changeNumber);
}
if (update != null)
{
@@ -798,61 +818,55 @@
* This is an operation type that we do not know about
* It should never happen.
*/
- synchronized (pendingChanges)
- {
- pendingChanges.remove(curChangeNumber);
- int msgID = MSGID_UNKNOWN_TYPE;
- String message = getMessage(msgID, op.getOperationType().toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- return;
- }
+ pendingChanges.remove(curChangeNumber);
+ int msgID = MSGID_UNKNOWN_TYPE;
+ String message = getMessage(msgID, op.getOperationType().toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ return;
}
}
- synchronized(pendingChanges)
+ if (result == ResultCode.SUCCESS)
{
- if (result == ResultCode.SUCCESS)
+ try
{
- PendingChange curChange = pendingChanges.get(curChangeNumber);
- if (curChange == null)
- {
- // This should never happen
- int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
- String message = getMessage(msgID, curChangeNumber.toString(),
- op.toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- return;
- }
- curChange.setCommitted(true);
+ pendingChanges.commit(curChangeNumber, op, msg);
+ }
+ catch (NoSuchElementException e)
+ {
+ int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
+ String message = getMessage(msgID, curChangeNumber.toString(),
+ op.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ return;
+ }
- if (op.isSynchronizationOperation())
- curChange.setOp(op);
- else
- curChange.setMsg(msg);
-
- if (msg != null && isAssured)
+ if (msg != null && isAssured)
+ {
+ synchronized (waitingAckMsgs)
{
- // Add the assured message to the list of those whose acknowledgements
- // we are awaiting.
+ // Add the assured message to the list of update that are
+ // waiting acknowledgements
waitingAckMsgs.put(curChangeNumber, msg);
}
}
- else if (!op.isSynchronizationOperation())
- {
- // Remove an unsuccessful non-replication operation from the pending
- // changes list.
- if (curChangeNumber != null)
- {
- pendingChanges.remove(curChangeNumber);
- }
- }
-
- pushCommittedChanges();
}
+ else if (!op.isSynchronizationOperation())
+ {
+ // Remove an unsuccessful non-replication operation from the pending
+ // changes list.
+ if (curChangeNumber != null)
+ {
+ pendingChanges.remove(curChangeNumber);
+ }
+ }
+
+ int pushedChanges = pendingChanges.pushCommittedChanges();
+ numSentUpdates.addAndGet(pushedChanges);
// Wait for acknowledgement of an assured message.
if (msg != null && isAssured)
@@ -880,7 +894,7 @@
*/
public int getNumRcvdUpdates()
{
- return numRcvdUpdates;
+ return numRcvdUpdates.get();
}
/**
@@ -890,11 +904,11 @@
*/
public int getNumSentUpdates()
{
- return numSentUpdates;
+ return numSentUpdates.get();
}
/**
- * get the number of updates in the pending list.
+ * Get the number of updates in the pending list.
*
* @return The number of updates in the pending list
*/
@@ -1052,45 +1066,63 @@
{
Operation op = null;
boolean done = false;
+ boolean dependency = false;
ChangeNumber changeNumber = null;
int retryCount = 10;
+ boolean firstTry = true;
try
{
- while (!done && retryCount-- > 0)
+ while ((!dependency) && (!done) && (retryCount-- > 0))
{
op = msg.createOperation(conn);
op.setInternalOperation(true);
op.setSynchronizationOperation(true);
changeNumber = OperationContext.getChangeNumber(op);
- if (changeNumber != null)
- changeNumberGenerator.adjust(changeNumber);
op.run();
ResultCode result = op.getResultCode();
+
if (result != ResultCode.SUCCESS)
{
if (op instanceof ModifyOperation)
{
ModifyOperation newOp = (ModifyOperation) op;
- done = solveNamingConflict(newOp, msg);
+ dependency = pendingChanges.checkDependencies(newOp);
+ if (!dependency)
+ {
+ done = solveNamingConflict(newOp, msg);
+ }
}
else if (op instanceof DeleteOperation)
{
DeleteOperation newOp = (DeleteOperation) op;
- done = solveNamingConflict(newOp, msg);
+ dependency = pendingChanges.checkDependencies(newOp);
+ if ((!dependency) && (!firstTry))
+ {
+ done = solveNamingConflict(newOp, msg);
+ }
}
else if (op instanceof AddOperation)
{
AddOperation newOp = (AddOperation) op;
- done = solveNamingConflict(newOp, msg);
-
- } else if (op instanceof ModifyDNOperation)
+ dependency = pendingChanges.checkDependencies(newOp);
+ if (!dependency)
+ {
+ done = solveNamingConflict(newOp, msg);
+ }
+ }
+ else if (op instanceof ModifyDNOperation)
{
- ModifyDNOperation newOp = (ModifyDNOperation) op;
- done = solveNamingConflict(newOp, msg);
+ ModifyDNMsg newMsg = (ModifyDNMsg) msg;
+ dependency = pendingChanges.checkDependencies(newMsg);
+ if (!dependency)
+ {
+ ModifyDNOperation newOp = (ModifyDNOperation) op;
+ done = solveNamingConflict(newOp, msg);
+ }
}
else
{
@@ -1108,9 +1140,10 @@
{
done = true;
}
+ firstTry = false;
}
- if (!done)
+ if (!done && !dependency)
{
// Continue with the next change but the servers could now become
// inconsistent.
@@ -1119,6 +1152,7 @@
String message = getMessage(msgID, op.toString());
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+
updateError(changeNumber);
}
}
@@ -1177,9 +1211,12 @@
}
finally
{
- if (msg.isAssured())
- ack(msg.getChangeNumber());
- incProcessedUpdates();
+ if (!dependency)
+ {
+ if (msg.isAssured())
+ ack(msg.getChangeNumber());
+ incProcessedUpdates();
+ }
}
}
@@ -1193,12 +1230,9 @@
*/
public void updateError(ChangeNumber changeNumber)
{
- synchronized (pendingChanges)
- {
- PendingChange change = pendingChanges.get(changeNumber);
- change.setCommitted(true);
- pushCommittedChanges();
- }
+ pendingChanges.commit(changeNumber);
+ int pushedChanges = pendingChanges.pushCommittedChanges();
+ numSentUpdates.addAndGet(pushedChanges);
}
/**
@@ -1210,15 +1244,7 @@
*/
private ChangeNumber generateChangeNumber(Operation operation)
{
- ChangeNumber changeNumber;
-
- changeNumber = changeNumberGenerator.NewChangeNumber();
- PendingChange change = new PendingChange(changeNumber, operation, null);
- synchronized(pendingChanges)
- {
- pendingChanges.put(changeNumber, change);
- }
- return changeNumber;
+ return pendingChanges.putLocalOperation(operation);
}
@@ -1604,42 +1630,6 @@
}
/**
- * Push all committed local changes to the replicationServer service.
- * PRECONDITION : The pendingChanges lock must be held before calling
- * this method.
- */
- private void pushCommittedChanges()
- {
- if (pendingChanges.isEmpty())
- return;
-
- ChangeNumber firstChangeNumber = pendingChanges.firstKey();
- PendingChange firstChange = pendingChanges.get(firstChangeNumber);
-
- while ((firstChange != null) && firstChange.isCommitted())
- {
- if ((firstChange.getOp() != null ) &&
- (firstChange.getOp().isSynchronizationOperation() == false))
- {
- numSentUpdates++;
- broker.publish(firstChange.getMsg());
- }
- state.update(firstChangeNumber);
- pendingChanges.remove(firstChangeNumber);
-
- if (pendingChanges.isEmpty())
- {
- firstChange = null;
- }
- else
- {
- firstChangeNumber = pendingChanges.firstKey();
- firstChange = pendingChanges.get(firstChangeNumber);
- }
- }
- }
-
- /**
* Get the maximum receive window size.
*
* @return The maximum receive window size.
diff --git a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
index b135225..f93bb98 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -139,8 +139,9 @@
for (Attribute a : userAttributes)
elems.add(new LDAPAttribute(a).encode());
- for (Attribute a : operationalAttributes)
- elems.add(new LDAPAttribute(a).encode());
+ if (operationalAttributes != null)
+ for (Attribute a : operationalAttributes)
+ elems.add(new LDAPAttribute(a).encode());
encodedAttributes = ASN1Element.encodeValue(elems);
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
index e7fde71..7a3ebd3 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -35,6 +35,8 @@
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
import org.opends.server.types.Operation;
/**
@@ -267,4 +269,86 @@
this.newRDN = newRDN;
}
+ /**
+ * Check if this MSG will change the DN of the target entry to be
+ * the same as the dn given as a parameter.
+ * @param targetDn the DN to use when checking if this MSG will change
+ * the DN of the entry to a given DN.
+ * @return A boolean indicating if the modify DN MSG will change the DN of
+ * the target entry to be the same as the dn given as a parameter.
+ */
+ public boolean newDNIsParent(DN targetDn)
+ {
+ try
+ {
+ String newStringDN = newRDN + "," + newSuperior;
+ DN newDN = DN.decode(newStringDN);
+
+ if (newDN.isAncestorOf(targetDn))
+ return true;
+ else
+ return false;
+ } catch (DirectoryException e)
+ {
+ // The DN was not a correct DN, and therefore does not a parent of the
+ // DN given as a parameter.
+ return false;
+ }
+ }
+
+ /**
+ * Check if the new dn of this ModifyDNMsg is the same as the targetDN
+ * given in parameter.
+ *
+ * @param targetDN The targetDN to use to check for equality.
+ *
+ * @return A boolean indicating if the targetDN if the same as the new DN of
+ * the ModifyDNMsg.
+ */
+ public boolean newDNIsEqual(DN targetDN)
+ {
+ try
+ {
+ String newStringDN = newRDN + "," + newSuperior;
+ DN newDN = DN.decode(newStringDN);
+
+ if (newDN.equals(targetDN))
+ return true;
+ else
+ return false;
+ } catch (DirectoryException e)
+ {
+ // The DN was not a correct DN, and therefore does not match the
+ // DN given as a parameter.
+ return false;
+ }
+ }
+
+ /**
+ * Check if the new parent of the modifyDNMsg is the same as the targetDN
+ * given in parameter.
+ *
+ * @param targetDN the targetDN to use when checking equality.
+ *
+ * @return A boolean indicating if the new parent of the modifyDNMsg is the
+ * same as the targetDN.
+ */
+ public boolean newParentIsEqual(DN targetDN)
+ {
+ try
+ {
+ DN newSuperiorDN = DN.decode(newSuperior);
+
+ if (newSuperiorDN.equals(targetDN))
+ return true;
+ else
+ return false;
+ } catch (DirectoryException e)
+ {
+ // The newsuperior was not a correct DN, and therefore does not match the
+ // DN given as a parameter.
+ return false;
+ }
+ }
+
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java b/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
index 805bcaa..6a1a305 100644
--- a/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
@@ -90,7 +90,7 @@
* @throws DataFormatException if the encoded byte array is not valid.
* @throws UnsupportedEncodingException if UTF-8 is not supprted.
*/
- public UpdateMessage(byte[] in) throws DataFormatException,
+ protected UpdateMessage(byte[] in) throws DataFormatException,
UnsupportedEncodingException
{
/* read the changeNumber */
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 9102301..4a562c5 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -238,10 +238,13 @@
{}
}
- if ( (recentChangeNumber != null) &&
- (recentChangeNumber.getTimeSec() - changeNumber.getTimeSec() < 2))
+ if ( (recentChangeNumber != null) && (changeNumber != null))
{
- flush();
+ if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
+ ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
+ {
+ flush();
+ }
}
return new ReplicationIterator(serverId, db, changeNumber);
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 3441a68..fd7a037 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -221,6 +221,7 @@
newSocket = listenSocket.accept();
newSocket.setReceiveBufferSize(1000000);
newSocket.setTcpNoDelay(true);
+ newSocket.setKeepAlive(true);
ServerHandler handler = new ServerHandler(
new SocketSession(newSocket), queueSize);
handler.start(null, serverId, serverURL, rcvWindow, this);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
new file mode 100644
index 0000000..e06ac23
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
@@ -0,0 +1,553 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication;
+
+import static org.testng.Assert.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.util.LinkedList;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.api.SynchronizationProvider;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.plugin.DomainFakeCfg;
+import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.replication.plugin.ReplicationDomain;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.replication.protocol.ModifyMsg;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.opends.server.util.TimeThread;
+import org.testng.annotations.*;
+
+/**
+ * Test that the dependencies are computed correctly when replaying
+ * sequences of operations that requires to follow a given order
+ * such as : ADD an entry, ADD a children entry.
+ */
+public class DependencyTest extends ReplicationTestCase
+{
+ private static final String BASEDN_STRING = "dc=example,dc=com";
+
+ /**
+ * Check that a sequence of dependents adds and mods is correctly ordered:
+ * Using a deep dit :
+ * dc=example,dc=com
+ * |
+ * dc=dependency1
+ * |
+ * dc=dependency2
+ * |
+ * dc=dependency2
+ * |
+ *
+ * |
+ * dc=dependencyN
+ * This test sends a sequence of interleaved ADD operations and MODIFY
+ * operations to build such a dit.
+ *
+ * Then test that the sequence of Delete necessary to remove
+ * all those entries is also correctly ordered.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(groups="slow")
+ public void addModDelDependencyTest() throws Exception
+ {
+ ReplicationServer replServer = null;
+ ReplicationDomain domain = null;
+ DN baseDn = DN.decode(BASEDN_STRING);
+ SynchronizationProvider replicationPlugin = null;
+ short brokerId = 2;
+ short serverId = 1;
+ short replServerId = 1;
+ int AddSequenceLength = 30;
+
+
+ cleanDB();
+
+ try
+ {
+ /*
+ * FIRST PART :
+ * Check that a sequence of dependent ADD is correctly ordered.
+ *
+ * - Create replication server
+ * - Send sequence of ADD messages to the replication server
+ * - Configure replication server
+ * - check that the last entry has been correctly added
+ */
+
+ String entryldif =
+ "dn:" + BASEDN_STRING + "\n"
+ + "objectClass: top\n"
+ + "objectClass: domain\n"
+ + "entryuuid: " + stringUID(1) + "\n";
+ Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
+ AttributeType uidType =
+ DirectoryServer.getSchema().getAttributeType("entryuuid");
+
+ // find a free port for the replicationServer
+ ServerSocket socket = TestCaseUtils.bindFreePort();
+ int replServerPort = socket.getLocalPort();
+ socket.close();
+
+ replicationPlugin = new MultimasterReplication();
+ DirectoryServer.registerSynchronizationProvider(replicationPlugin);
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(replServerPort, "addModDeldependency",
+ 0, replServerId, 0,
+ AddSequenceLength*5+100, null);
+ replServer = new ReplicationServer(conf);
+
+ ReplicationBroker broker =
+ openReplicationSession(baseDn, brokerId, 1000, replServerPort, 1000,
+ false);
+
+ TimeThread.sleep(2000);
+ // send a sequence of add operation
+
+ String addDn = BASEDN_STRING;
+ ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L);
+
+ int sequence;
+ for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+ {
+ entry.removeAttribute(uidType);
+ entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)),
+ new LinkedList<AttributeValue>());
+ addDn = "dc=dependency" + sequence + "," + addDn;
+ AddMsg addMsg =
+ new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1),
+ stringUID(sequence),
+ entry.getObjectClassAttribute(),
+ entry.getAttributes(), null );
+ broker.publish(addMsg);
+
+ ModifyMsg modifyMsg =
+ new ModifyMsg(gen.NewChangeNumber(), DN.decode(addDn),
+ generatemods("description", "test"),
+ stringUID(sequence+1));
+ broker.publish(modifyMsg);
+ }
+
+ // configure and start replication of dc=example,dc=com on the server
+ SortedSet<String> replServers = new TreeSet<String>();
+ replServers.add("localhost:"+replServerPort);
+ DomainFakeCfg domainConf =
+ new DomainFakeCfg(baseDn, serverId, replServers);
+ domainConf.setHeartbeatInterval(100000);
+
+ domain = MultimasterReplication.createNewDomain(domainConf);
+
+ // check that last entry in sequence got added.
+ Entry lastEntry = getEntry(DN.decode(addDn), 30000, true);
+ assertNotNull(lastEntry,
+ "The last entry of the ADD sequence was not added.");
+
+ // Check that all the modify have been replayed
+ // (all the entries should have a description).
+ addDn = BASEDN_STRING;
+ for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+ {
+ addDn = "dc=dependency" + sequence + "," + addDn;
+
+ boolean found =
+ checkEntryHasAttribute(DN.decode(addDn), "description", "test",
+ 10000, true);
+ if (!found)
+ {
+ fail("The modification was not replayed on entry " + addDn);
+ }
+ }
+
+ /*
+ * SECOND PART
+ *
+ * Now check that the dependencies between delete are correctly
+ * managed.
+ *
+ * disable the domain while we publish the delete message to
+ * to replication server so that when we enable it it receives the
+ * delete operation in bulk.
+ */
+
+ domain.disable();
+ Thread.sleep(2000); // necesary because disable does not wait
+ // for full termination of all threads. (issue 1571)
+
+ DN deleteDN = DN.decode(addDn);
+ while (sequence-->1)
+ {
+ DeleteMsg delMsg = new DeleteMsg(deleteDN.toString(),
+ gen.NewChangeNumber(),
+ stringUID(sequence + 1));
+ broker.publish(delMsg);
+ deleteDN = deleteDN.getParent();
+ }
+
+ domain.enable();
+
+ // check that entry just below the base entry was deleted.
+ // (we can't delete the base entry because some other tests might
+ // have added other children)
+ DN node1 = DN.decode("dc=dependency1," + BASEDN_STRING);
+ Entry baseEntry = getEntry(node1, 30000, false);
+ assertNull(baseEntry,
+ "The last entry of the DEL sequence was not deleted.");
+ }
+ finally
+ {
+ if (replServer != null)
+ replServer.shutdown();
+
+ if (domain != null)
+ MultimasterReplication.deleteDomain(baseDn);
+
+ if (replicationPlugin != null)
+ DirectoryServer.deregisterSynchronizationProvider(replicationPlugin);
+ }
+ }
+
+ /**
+ * Clean the database and replace with a single entry.
+ *
+ * @throws FileNotFoundException
+ * @throws IOException
+ * @throws Exception
+ */
+ private void cleanDB() throws FileNotFoundException, IOException, Exception
+ {
+ String baseentryldif =
+ "dn:" + BASEDN_STRING + "\n"
+ + "objectClass: top\n"
+ + "objectClass: domain\n"
+ + "dc: example\n"
+ + "entryuuid: " + stringUID(1) + "\n";
+
+
+ // Initialization :
+ // Load the database with a single entry :
+ String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+ String path = buildRoot + File.separator + "build" +
+ File.separator + "unit-tests" + File.separator +
+ "package"+ File.separator + "addModDelDependencyTest";
+ OutputStream out = new FileOutputStream(new File(path));
+ out.write(baseentryldif.getBytes());
+
+ task("dn: ds-task-id=" + UUID.randomUUID()
+ + ",cn=Scheduled Tasks,cn=Tasks\n"
+ + "objectclass: top\n"
+ + "objectclass: ds-task\n"
+ + "objectclass: ds-task-import\n"
+ + "ds-task-class-name: org.opends.server.tasks.ImportTask\n"
+ + "ds-task-import-backend-id: userRoot\n"
+ + "ds-task-import-ldif-file: " + path + "\n"
+ + "ds-task-import-reject-file: " + path + "reject\n");
+ }
+
+ /**
+ * Check that after a sequence of add/del/add done on the same DN
+ * the second entry is in the database.
+ * The unique id of the entry is used to check that the correct entry
+ * has been added.
+ * To increase the risks of failures a loop of add/del/add is done.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(groups="slow")
+ public void addDelAddDependencyTest() throws Exception
+ {
+ ReplicationServer replServer = null;
+ ReplicationDomain domain = null;
+ DN baseDn = DN.decode(BASEDN_STRING);
+ SynchronizationProvider replicationPlugin = null;
+ short brokerId = 2;
+ short serverId = 1;
+ short replServerId = 1;
+ int AddSequenceLength = 30;
+
+ cleanDB();
+
+ try
+ {
+ String entryldif = "dn:" + BASEDN_STRING + "\n"
+ + "objectClass: top\n"
+ + "objectClass: domain\n";
+ Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
+ AttributeType uidType =
+ DirectoryServer.getSchema().getAttributeType("entryuuid");
+
+ // find a free port for the replicationServer
+ ServerSocket socket = TestCaseUtils.bindFreePort();
+ int replServerPort = socket.getLocalPort();
+ socket.close();
+
+ replicationPlugin = new MultimasterReplication();
+ DirectoryServer.registerSynchronizationProvider(replicationPlugin);
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(replServerPort, "addDelAdddependency", 0,
+ replServerId,
+ 0, 5*AddSequenceLength+100, null);
+ replServer = new ReplicationServer(conf);
+
+ ReplicationBroker broker =
+ openReplicationSession(baseDn, brokerId, 100, replServerPort, 1000,
+ false);
+
+ // send a sequence of add/del/add operations
+
+ String addDn = BASEDN_STRING;
+ ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L);
+
+ int sequence;
+ for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+ {
+ // add the entry a first time
+ entry.removeAttribute(uidType);
+ entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)),
+ new LinkedList<AttributeValue>());
+ addDn = "dc=dependency" + sequence + "," + addDn;
+ AddMsg addMsg =
+ new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1),
+ stringUID(sequence == 1 ? sequence : sequence +1000),
+ entry.getObjectClassAttribute(),
+ entry.getAttributes(), null );
+ broker.publish(addMsg);
+
+ // delete the entry
+ DeleteMsg delMsg = new DeleteMsg(addDn, gen.NewChangeNumber(),
+ stringUID(sequence+1));
+ broker.publish(delMsg);
+
+ // add again the entry with a new entryuuid.
+ entry.removeAttribute(uidType);
+ entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1001)),
+ new LinkedList<AttributeValue>());
+ addMsg =
+ new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1001),
+ stringUID(sequence == 1 ? sequence : sequence +1000),
+ entry.getObjectClassAttribute(),
+ entry.getAttributes(), null );
+ broker.publish(addMsg);
+ }
+
+ // configure and start replication of dc=example,dc=com on the server
+ SortedSet<String> replServers = new TreeSet<String>();
+ replServers.add("localhost:"+replServerPort);
+ DomainFakeCfg domainConf =
+ new DomainFakeCfg(baseDn, serverId, replServers);
+
+ domain = MultimasterReplication.createNewDomain(domainConf);
+
+ // check that all entries have been deleted and added
+ // again by checking that they do have the correct entryuuid
+ addDn = BASEDN_STRING;
+ for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+ {
+ addDn = "dc=dependency" + sequence + "," + addDn;
+
+ boolean found =
+ checkEntryHasAttribute(DN.decode(addDn), "entryuuid",
+ stringUID(sequence+1001),
+ 30000, true);
+ if (!found)
+ {
+ fail("The second add was not replayed on entry " + addDn);
+ }
+ }
+
+ DN deleteDN = DN.decode(addDn);
+ while (sequence-->1)
+ {
+ DeleteMsg delMsg = new DeleteMsg(deleteDN.toString(),
+ gen.NewChangeNumber(),
+ stringUID(sequence + 1001));
+ broker.publish(delMsg);
+ deleteDN = deleteDN.getParent();
+ }
+
+ // check that the database was cleaned successfully
+ DN node1 = DN.decode("dc=dependency1," + BASEDN_STRING);
+ Entry baseEntry = getEntry(node1, 30000, false);
+ assertNull(baseEntry,
+ "The entry were not removed succesfully after test completion.");
+
+ }
+ finally
+ {
+ if (replServer != null)
+ replServer.shutdown();
+
+ if (domain != null)
+ MultimasterReplication.deleteDomain(baseDn);
+
+ if (replicationPlugin != null)
+ DirectoryServer.deregisterSynchronizationProvider(replicationPlugin);
+ }
+ }
+
+ /**
+ * Check that the dependency of moddn operation are working by
+ * issuing a set of Add operation followed by a modrdn of the added entry.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(groups="slow")
+ public void addModdnDependencyTest() throws Exception
+ {
+ ReplicationServer replServer = null;
+ ReplicationDomain domain = null;
+ DN baseDn = DN.decode(BASEDN_STRING);
+ SynchronizationProvider replicationPlugin = null;
+ short brokerId = 2;
+ short serverId = 1;
+ short replServerId = 1;
+ int AddSequenceLength = 30;
+
+ cleanDB();
+
+ try
+ {
+ String entryldif = "dn:" + BASEDN_STRING + "\n"
+ + "objectClass: top\n"
+ + "objectClass: domain\n";
+ Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
+ AttributeType uidType =
+ DirectoryServer.getSchema().getAttributeType("entryuuid");
+
+ // find a free port for the replicationServer
+ ServerSocket socket = TestCaseUtils.bindFreePort();
+ int replServerPort = socket.getLocalPort();
+ socket.close();
+
+ replicationPlugin = new MultimasterReplication();
+ DirectoryServer.registerSynchronizationProvider(replicationPlugin);
+ ReplServerFakeConfiguration conf =
+ new ReplServerFakeConfiguration(replServerPort, "addModdndependency", 0,
+ replServerId,
+ 0, 5*AddSequenceLength+100, null);
+ replServer = new ReplicationServer(conf);
+
+ ReplicationBroker broker =
+ openReplicationSession(baseDn, brokerId, 100, replServerPort, 1000,
+ false);
+
+
+ String addDn = BASEDN_STRING;
+ ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L);
+
+ // send a sequence of add/modrdn operations
+ int sequence;
+ for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+ {
+ // add the entry
+ entry.removeAttribute(uidType);
+ entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)),
+ new LinkedList<AttributeValue>());
+ addDn = "dc=dependency" + sequence + "," + BASEDN_STRING;
+ AddMsg addMsg =
+ new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1),
+ stringUID(1),
+ entry.getObjectClassAttribute(),
+ entry.getAttributes(), null );
+ broker.publish(addMsg);
+
+ // rename the entry
+ ModifyDNMsg moddnMsg =
+ new ModifyDNMsg(addDn, gen.NewChangeNumber(), stringUID(sequence+1),
+ stringUID(1), true, null, "dc=new_dep" + sequence);
+ broker.publish(moddnMsg);
+ }
+
+ // configure and start replication of dc=example,dc=com on the server
+ SortedSet<String> replServers = new TreeSet<String>();
+ replServers.add("localhost:"+replServerPort);
+ DomainFakeCfg domainConf =
+ new DomainFakeCfg(baseDn, serverId, replServers);
+
+ domain = MultimasterReplication.createNewDomain(domainConf);
+
+ // check that all entries have been renamed
+ for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+ {
+ addDn = "dc=new_dep" + sequence + "," + BASEDN_STRING;
+
+ Entry baseEntry = getEntry(DN.decode(addDn), 30000, true);
+ assertNotNull(baseEntry,
+ "The rename was not applied correctly on :" + addDn);
+ }
+
+ // delete the entries to clean the database.
+ for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
+ {
+ addDn = "dc=new_dep" + sequence + "," + BASEDN_STRING;
+ DeleteMsg delMsg = new DeleteMsg(addDn.toString(),
+ gen.NewChangeNumber(),
+ stringUID(sequence + 1001));
+ broker.publish(delMsg);
+ }
+ }
+ finally
+ {
+ if (replServer != null)
+ replServer.shutdown();
+
+ if (domain != null)
+ MultimasterReplication.deleteDomain(baseDn);
+
+ if (replicationPlugin != null)
+ DirectoryServer.deregisterSynchronizationProvider(replicationPlugin);
+ }
+ }
+
+ /**
+ * Builds and return a uuid from an integer.
+ * This methods assume that unique integers are used and does not make any
+ * unicity checks. It is only responsible for generating a uid with a
+ * correct syntax.
+ */
+ private String stringUID(int i)
+ {
+ return String.format("11111111-1111-1111-1111-%012x", i);
+ }
+
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index b14d47e..046cbec 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -153,10 +153,6 @@
// This test suite depends on having the schema available.
TestCaseUtils.startServer();
- // Disable schema check
- schemaCheck = DirectoryServer.checkSchema();
- DirectoryServer.setCheckSchema(false);
-
baseDn = DN.decode("dc=example,dc=com");
updatedEntries = newLDIFEntries();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
index 2aef96e..a6733f7 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -32,8 +32,6 @@
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
import java.util.List;
import org.opends.server.TestCaseUtils;
@@ -47,16 +45,12 @@
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.LDAPException;
import org.opends.server.types.Modification;
-import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.OperationType;
import org.opends.server.types.ResultCode;
@@ -242,10 +236,6 @@
// This test suite depends on having the schema available.
TestCaseUtils.startServer();
- // Disable schema check
- schemaCheck = DirectoryServer.checkSchema();
- DirectoryServer.setCheckSchema(false);
-
// Create an internal connection
connection = InternalClientConnection.getRootConnection();
@@ -325,22 +315,6 @@
configureReplication();
}
- /**
- * @return
- */
- private List<Modification> generatemods(String attrName, String attrValue)
- {
- AttributeType attrType =
- DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
- LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(attrType, attrValue));
- Attribute attr = new Attribute(attrType, attrName, values);
- List<Modification> mods = new ArrayList<Modification>();
- Modification mod = new Modification(ModificationType.REPLACE, attr);
- mods.add(mod);
- return mods;
- }
-
private void processModify(int count)
{
while (count>0)
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
index afc380c..b6b76e1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
@@ -26,9 +26,6 @@
*/
package org.opends.server.replication;
-import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
-import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.io.File;
@@ -36,18 +33,12 @@
import java.util.UUID;
import org.opends.server.TestCaseUtils;
-import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
-import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
-import org.opends.server.protocols.internal.InternalSearchOperation;
-import org.opends.server.schema.DirectoryStringSyntax;
-import org.opends.server.types.AttributeType;
+
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
-import org.opends.server.types.SearchFilter;
-import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -113,9 +104,9 @@
configureReplication();
- // Give some time to the replication to setup
+ // Give some time to the replication to setup
Thread.sleep(1000);
-
+
// Create a dummy entry
addEntry("dn: dc=dummy, dc=example,dc=com\n"
+ "objectClass: top\n" + "objectClass: domain\n");
@@ -209,7 +200,7 @@
String path = buildRoot + File.separator + "build" +
File.separator + "unit-tests" + File.separator +
"package"+ File.separator + "ReSynchTest";
-
+
task("dn: ds-task-id=" + UUID.randomUUID()
+ ",cn=Scheduled Tasks,cn=Tasks\n"
+ "objectclass: top\n"
@@ -237,73 +228,4 @@
}
- /**
- * Utility method to create, run a task and check its result.
- */
- private void task(String task) throws Exception
- {
- Entry taskEntry = TestCaseUtils.makeEntry(task);
-
- InternalClientConnection connection =
- InternalClientConnection.getRootConnection();
-
- // Add the task.
- AddOperation addOperation =
- connection.processAdd(taskEntry.getDN(),
- taskEntry.getObjectClasses(),
- taskEntry.getUserAttributes(),
- taskEntry.getOperationalAttributes());
- assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS,
- "Add of the task definition was not successful");
-
- // Wait until the task completes.
- AttributeType completionTimeType = DirectoryServer.getAttributeType(
- ATTR_TASK_COMPLETION_TIME.toLowerCase());
- SearchFilter filter =
- SearchFilter.createFilterFromString("(objectclass=*)");
- Entry resultEntry = null;
- String completionTime = null;
- long startMillisecs = System.currentTimeMillis();
- do
- {
- InternalSearchOperation searchOperation =
- connection.processSearch(taskEntry.getDN(),
- SearchScope.BASE_OBJECT,
- filter);
- try
- {
- resultEntry = searchOperation.getSearchEntries().getFirst();
- } catch (Exception e)
- {
- continue;
- }
- completionTime =
- resultEntry.getAttributeValue(completionTimeType,
- DirectoryStringSyntax.DECODER);
-
- if (completionTime == null)
- {
- if (System.currentTimeMillis() - startMillisecs > 1000*30)
- {
- break;
- }
- Thread.sleep(10);
- }
- } while (completionTime == null);
-
- if (completionTime == null)
- {
- fail("The task has not completed after 30 seconds.");
- }
-
- // Check that the task state is as expected.
- AttributeType taskStateType =
- DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
- String stateString =
- resultEntry.getAttributeValue(taskStateType,
- DirectoryStringSyntax.DECODER);
- TaskState taskState = TaskState.fromString(stateString);
- assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY,
- "The task completed in an unexpected state");
- }
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 9b083a6..33b3c3e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -26,12 +26,16 @@
*/
package org.opends.server.replication;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import java.net.SocketException;
import java.util.ArrayList;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.List;
@@ -42,7 +46,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.PersistentServerState;
+import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.schema.IntegerSyntax;
+import org.opends.server.backends.task.TaskState;
+import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -53,6 +60,10 @@
import org.opends.server.types.ByteStringFactory;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.AttributeType;
@@ -87,11 +98,6 @@
protected Entry replServerEntry;
/**
- * schema check flag
- */
- protected boolean schemaCheck;
-
- /**
* The replication plugin entry
*/
protected String synchroPluginStringDN =
@@ -108,7 +114,6 @@
{
// This test suite depends on having the schema available.
TestCaseUtils.startServer();
- schemaCheck = DirectoryServer.checkSchema();
// Create an internal connection
connection = InternalClientConnection.getRootConnection();
@@ -152,7 +157,7 @@
}
}
catch (Exception e)
- {
+ {
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.NOTICE,
"ReplicationTestCase/openChangelogSession" + e.getMessage(), 1);
@@ -244,7 +249,7 @@
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.NOTICE,
"cleaning config entry " + dn, 1);
-
+
op = new DeleteOperation(connection, InternalClientConnection
.nextOperationID(), InternalClientConnection.nextMessageID(), null,
dn);
@@ -256,7 +261,7 @@
// done
}
}
-
+
/**
* suppress all the real entries created by the tests in this class
*/
@@ -265,7 +270,7 @@
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.NOTICE,
"ReplicationTestCase/Cleaning entries" , 1);
-
+
DeleteOperation op;
// Delete entries
try
@@ -298,8 +303,6 @@
@AfterClass
public void classCleanUp() throws Exception
{
- DirectoryServer.setCheckSchema(schemaCheck);
-
cleanConfigEntries();
cleanRealEntries();
}
@@ -323,7 +326,7 @@
assertNotNull(DirectoryServer.getConfigEntry(DN
.decode(synchroPluginStringDN)),
"Unable to add the Multimaster replication plugin");
-
+
// domains container entry.
String domainsLdif = "dn: "
+ "cn=domains," + synchroPluginStringDN + "\n"
@@ -335,7 +338,7 @@
assertNotNull(DirectoryServer.getConfigEntry(
DN.decode(synchroPluginStringDN)),
"Unable to add the Multimaster replication plugin");
-
+
// Add the replication server
DirectoryServer.getConfigHandler().addEntry(replServerEntry, null);
@@ -380,7 +383,7 @@
protected boolean checkEntryHasAttribute(DN dn, String attrTypeStr,
String valueString, int timeout, boolean hasAttribute) throws Exception
{
- boolean found;
+ boolean found = false;
int count = timeout/100;
if (count<1)
count=1;
@@ -408,15 +411,15 @@
newEntry = DirectoryServer.getEntry(dn);
- if (newEntry == null)
- fail("The entry " + dn +
- " has incorrectly been deleted from the database.");
- List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr);
- Attribute tmpAttr = tmpAttrList.get(0);
+ if (newEntry != null)
+ {
+ List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr);
+ Attribute tmpAttr = tmpAttrList.get(0);
- AttributeType attrType =
- DirectoryServer.getAttributeType(attrTypeStr, true);
- found = tmpAttr.hasValue(new AttributeValue(attrType, valueString));
+ AttributeType attrType =
+ DirectoryServer.getAttributeType(attrTypeStr, true);
+ found = tmpAttr.hasValue(new AttributeValue(attrType, valueString));
+ }
}
finally
@@ -478,4 +481,95 @@
}
}
+ /**
+ * Generate a new modification replace with the given information.
+ *
+ * @param attrName The attribute to replace.
+ * @param attrValue The new value for the attribute
+ *
+ * @return The modification replace.
+ */
+ protected List<Modification> generatemods(String attrName, String attrValue)
+ {
+ AttributeType attrType =
+ DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
+ LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
+ values.add(new AttributeValue(attrType, attrValue));
+ Attribute attr = new Attribute(attrType, attrName, values);
+ List<Modification> mods = new ArrayList<Modification>();
+ Modification mod = new Modification(ModificationType.REPLACE, attr);
+ mods.add(mod);
+ return mods;
+ }
+
+ /**
+ * Utility method to create, run a task and check its result.
+ */
+ protected void task(String task) throws Exception
+ {
+ Entry taskEntry = TestCaseUtils.makeEntry(task);
+
+ InternalClientConnection connection =
+ InternalClientConnection.getRootConnection();
+
+ // Add the task.
+ AddOperation addOperation =
+ connection.processAdd(taskEntry.getDN(),
+ taskEntry.getObjectClasses(),
+ taskEntry.getUserAttributes(),
+ taskEntry.getOperationalAttributes());
+ assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS,
+ "Add of the task definition was not successful");
+
+ // Wait until the task completes.
+ AttributeType completionTimeType = DirectoryServer.getAttributeType(
+ ATTR_TASK_COMPLETION_TIME.toLowerCase());
+ SearchFilter filter =
+ SearchFilter.createFilterFromString("(objectclass=*)");
+ Entry resultEntry = null;
+ String completionTime = null;
+ long startMillisecs = System.currentTimeMillis();
+ do
+ {
+ InternalSearchOperation searchOperation =
+ connection.processSearch(taskEntry.getDN(),
+ SearchScope.BASE_OBJECT,
+ filter);
+ try
+ {
+ resultEntry = searchOperation.getSearchEntries().getFirst();
+ } catch (Exception e)
+ {
+ continue;
+ }
+ completionTime =
+ resultEntry.getAttributeValue(completionTimeType,
+ DirectoryStringSyntax.DECODER);
+
+ if (completionTime == null)
+ {
+ if (System.currentTimeMillis() - startMillisecs > 1000*30)
+ {
+ break;
+ }
+ Thread.sleep(10);
+ }
+ } while (completionTime == null);
+
+ if (completionTime == null)
+ {
+ fail("The task has not completed after 30 seconds.");
+ }
+
+ // Check that the task state is as expected.
+ AttributeType taskStateType =
+ DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
+ String stateString =
+ resultEntry.getAttributeValue(taskStateType,
+ DirectoryStringSyntax.DECODER);
+ TaskState taskState = TaskState.fromString(stateString);
+ assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY,
+ "The task completed in an unexpected state");
+ }
+
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
index ce43db3..ca032fb 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -82,7 +82,6 @@
{
// This test suite depends on having the schema available.
TestCaseUtils.startServer();
- schemaCheck = DirectoryServer.checkSchema();
// find a free port for the replicationServer
ServerSocket socket = TestCaseUtils.bindFreePort();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/StressTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/StressTest.java
index 62cdfca..4068f90 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/StressTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/StressTest.java
@@ -33,8 +33,6 @@
import static org.testng.Assert.fail;
import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -50,15 +48,12 @@
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Modification;
-import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.OperationType;
import org.opends.server.types.ResultCode;
@@ -187,10 +182,6 @@
// Create an internal connection
connection = InternalClientConnection.getRootConnection();
- // Disable schema check
- schemaCheck = DirectoryServer.checkSchema();
- DirectoryServer.setCheckSchema(false);
-
// Create backend top level entries
String[] topEntries = new String[2];
topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
@@ -263,22 +254,6 @@
configureReplication();
}
- /**
- * @return
- */
- private List<Modification> generatemods(String attrName, String attrValue)
- {
- AttributeType attrType =
- DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
- LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(attrType, attrValue));
- Attribute attr = new Attribute(attrType, attrName, values);
- List<Modification> mods = new ArrayList<Modification>();
- Modification mod = new Modification(ModificationType.REPLACE, attr);
- mods.add(mod);
- return mods;
- }
-
private class BrokerWriter extends Thread
{
int count;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
index 7604ffc..774255e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -100,10 +100,6 @@
// This test suite depends on having the schema available.
TestCaseUtils.startServer();
- // Disable schema check
- schemaCheck = DirectoryServer.checkSchema();
- DirectoryServer.setCheckSchema(false);
-
// Create an internal connection
connection = InternalClientConnection.getRootConnection();
@@ -1006,19 +1002,6 @@
}
}
- private List<Modification> generatemods(String attrName, String attrValue)
- {
- AttributeType attrType =
- DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
- LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(attrType, attrValue));
- Attribute attr = new Attribute(attrType, attrName, values);
- List<Modification> mods = new ArrayList<Modification>();
- Modification mod = new Modification(ModificationType.REPLACE, attr);
- mods.add(mod);
- return mods;
- }
-
/**
* Get the entryUUID for a given DN.
*
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
new file mode 100644
index 0000000..5922642
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -0,0 +1,183 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import java.util.SortedSet;
+
+import org.opends.server.admin.ManagedObjectDefinition;
+import org.opends.server.admin.PropertyProvider;
+import org.opends.server.admin.server.ConfigurationChangeListener;
+import org.opends.server.admin.std.client.MultimasterDomainCfgClient;
+import org.opends.server.admin.std.server.MultimasterDomainCfg;
+import org.opends.server.types.DN;
+
+/**
+ * This class implement a configuration object for the MultimasterDomain
+ * that can be used in unit tests to instantiate ReplicationDomain.
+ */
+public class DomainFakeCfg implements MultimasterDomainCfg
+{
+ private DN baseDn;
+ private int serverId;
+ private SortedSet<String> replicationServers;
+ private long heartbeatInterval = 1000;
+
+ /**
+ * Creates a new Domain with the provided information
+ */
+ public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers)
+ {
+ this.baseDn = baseDn;
+ this.serverId = serverId;
+ this.replicationServers = replServers;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void addChangeListener(
+ ConfigurationChangeListener<MultimasterDomainCfg> listener)
+ {
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public ManagedObjectDefinition<? extends MultimasterDomainCfgClient,
+ ? extends MultimasterDomainCfg> definition()
+ {
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public long getHeartbeatInterval()
+ {
+ return heartbeatInterval ;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public long getMaxReceiveDelay()
+ {
+ return 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int getMaxReceiveQueue()
+ {
+ return 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public long getMaxSendDelay()
+ {
+ return 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int getMaxSendQueue()
+ {
+ return 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public DN getReplicationDN()
+ {
+ return baseDn;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public SortedSet<String> getReplicationServer()
+ {
+ return replicationServers;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int getServerId()
+ {
+ return serverId;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int getWindowSize()
+ {
+ return 100;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void removeChangeListener(
+ ConfigurationChangeListener<MultimasterDomainCfg> listener)
+ {
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public DN dn()
+ {
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public PropertyProvider properties()
+ {
+ return null;
+ }
+
+ /**
+ * Set the heartbeat interval.
+ *
+ * @param interval
+ */
+ public void setHeartbeatInterval(long interval)
+ {
+ heartbeatInterval = interval;
+ }
+
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 5cde3e5..9428170 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -76,7 +76,7 @@
/**
* The port of the replicationServer.
*/
- private int changelogPort;
+ private int replicationServerPort;
private ChangeNumber firstChangeNumberServer1 = null;
private ChangeNumber secondChangeNumberServer1 = null;
@@ -97,11 +97,11 @@
// find a free port for the replicationServer
ServerSocket socket = TestCaseUtils.bindFreePort();
- changelogPort = socket.getLocalPort();
+ replicationServerPort = socket.getLocalPort();
socket.close();
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPort, null, 0, 1, 0, 0, null);
+ new ReplServerFakeConfiguration(replicationServerPort, null, 0, 1, 0, 0, null);
replicationServer = new ReplicationServer(conf);
}
@@ -124,10 +124,10 @@
* Open a sender session and a receiver session to the replicationServer
*/
server1 = openReplicationSession(
- DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort,
+ DN.decode("dc=example,dc=com"), (short) 1, 100, replicationServerPort,
1000, true);
server2 = openReplicationSession(
- DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort,
+ DN.decode("dc=example,dc=com"), (short) 2, 100, replicationServerPort,
1000, true);
/*
@@ -240,7 +240,7 @@
try {
broker =
openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3,
- 100, changelogPort, 1000, false);
+ 100, replicationServerPort, 1000, false);
ReplicationMessage msg2 = broker.receive();
if (!(msg2 instanceof DeleteMsg))
@@ -277,7 +277,7 @@
try {
broker =
openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3,
- 100, changelogPort, 1000, state);
+ 100, replicationServerPort, 1000, state);
ReplicationMessage msg2 = broker.receive();
if (!(msg2 instanceof DeleteMsg))
@@ -425,7 +425,7 @@
* Open a sender session
*/
server = openReplicationSession(
- DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort,
+ DN.decode("dc=example,dc=com"), (short) 5, 100, replicationServerPort,
1000, 1000, 0, true);
BrokerReader reader = new BrokerReader(server);
@@ -436,7 +436,7 @@
for (int i =0; i< CLIENT_THREADS; i++)
{
clientBroker[i] = openReplicationSession(
- DN.decode("dc=example,dc=com"), (short) (100+i), 100, changelogPort,
+ DN.decode("dc=example,dc=com"), (short) (100+i), 100, replicationServerPort,
1000, true);
client[i] = new BrokerReader(clientBroker[i]);
}
@@ -510,7 +510,7 @@
new ChangeNumberGenerator(serverId , (long) 0);
ReplicationBroker broker =
openReplicationSession( DN.decode("dc=example,dc=com"), serverId,
- 100, changelogPort, 1000, 1000, 0, true);
+ 100, replicationServerPort, 1000, 1000, 0, true);
producer[i] = new BrokerWriter(broker, gen, TOTAL_MSG/THREADS);
reader[i] = new BrokerReader(broker);
--
Gitblit v1.10.0