From 6727fbc1a159686d355716af0f6d6815bf4054f0 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 29 May 2007 08:54:18 +0000
Subject: [PATCH]
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java | 6 +
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 63 +++++++++++-
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java | 160 +++++++++++++++++++++++++++++++
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java | 12 ++
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 42 ++++++++
5 files changed, 270 insertions(+), 13 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java
index 40a27dc..03d50b5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/Historical.java
@@ -129,10 +129,12 @@
*
* @param modifyOperation the operation to be processed
* @param modifiedEntry the entry that is being modified (before modification)
+ * @return true if the replayed operation was in conflict
*/
- public void replayOperation(ModifyOperation modifyOperation,
+ public boolean replayOperation(ModifyOperation modifyOperation,
Entry modifiedEntry)
{
+ boolean bConflict = false;
List<Modification> mods = modifyOperation.getModifications();
ChangeNumber changeNumber =
OperationContext.getChangeNumber(modifyOperation);
@@ -212,6 +214,7 @@
// TODO : FILL ME
break;
}
+ bConflict = true;
}
else
{
@@ -226,6 +229,7 @@
{
moreRecentChangenumber = changeNumber;
}
+ return bConflict;
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 7d2566a..985d2f0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -137,6 +137,9 @@
private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
private AtomicInteger numSentUpdates = new AtomicInteger(0);
private AtomicInteger numProcessedUpdates = new AtomicInteger();
+ private AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
+ private AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
+ private AtomicInteger numUnresolvedNamingConflicts = new AtomicInteger();
private int debugCount = 0;
private PersistentServerState state;
private int numReplayedPostOpCalled = 0;
@@ -618,7 +621,10 @@
Historical historicalInformation = Historical.load(modifiedEntry);
modifyOperation.setAttachment(HISTORICAL, historicalInformation);
- historicalInformation.replayOperation(modifyOperation, modifiedEntry);
+ if (historicalInformation.replayOperation(modifyOperation, modifiedEntry))
+ {
+ numResolvedModifyConflicts.incrementAndGet();
+ }
if (modifyOperation.getModifications().isEmpty())
{
@@ -1150,9 +1156,14 @@
String message = getMessage(msgID, op.toString());
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+ numUnresolvedNamingConflicts.incrementAndGet();
updateError(changeNumber);
}
+ else
+ {
+ numResolvedNamingConflicts.incrementAndGet();
+ }
}
catch (ASN1Exception e)
{
@@ -1557,6 +1568,7 @@
ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
msg.setDn(currentDN.toString());
modifyDnMsg.setNewSuperior(newSuperior.toString());
+ numUnresolvedNamingConflicts.incrementAndGet();
return false;
}
else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
@@ -1572,6 +1584,7 @@
modifyDnMsg.setNewRDN(generateConflictDn(entryUid,
modifyDnMsg.getNewRDN()));
modifyDnMsg.setNewSuperior(newSuperior.toString());
+ numUnresolvedNamingConflicts.incrementAndGet();
return false;
}
return true;
@@ -1677,6 +1690,33 @@
}
/**
+ * Get the number of modify conflicts successfully resolved.
+ * @return The number of modify conflicts successfully resolved.
+ */
+ public int getNumResolvedModifyConflicts()
+ {
+ return numResolvedModifyConflicts.get();
+ }
+
+ /**
+ * Get the number of namign conflicts successfully resolved.
+ * @return The number of naming conflicts successfully resolved.
+ */
+ public int getNumResolvedNamingConflicts()
+ {
+ return numResolvedNamingConflicts.get();
+ }
+
+ /**
+ * Get the number of unresolved conflicts.
+ * @return The number of unresolved conflicts.
+ */
+ public int getNumUnresolvedNamingConflicts()
+ {
+ return numUnresolvedNamingConflicts.get();
+ }
+
+ /**
* Check if the domain solve conflicts.
*
* @return a boolean indicating if the domain should sove conflicts.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
index 0618150..e6a45a0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
@@ -119,6 +119,18 @@
addMonitorData(attributes, "replayed-updates-ok",
domain.getNumReplayedPostOpCalled());
+ /* get number of modify conflicts */
+ addMonitorData(attributes, "resolved-modify-conflicts",
+ domain.getNumResolvedModifyConflicts());
+
+ /* get number of naming conflicts */
+ addMonitorData(attributes, "resolved-naming-conflicts",
+ domain.getNumResolvedNamingConflicts());
+
+ /* get number of unresolved naming conflicts */
+ addMonitorData(attributes, "unresolved-naming-conflicts",
+ domain.getNumUnresolvedNamingConflicts());
+
/* get window information */
addMonitorData(attributes, "max-rcv-window", domain.getMaxRcvWindow());
addMonitorData(attributes, "current-rcv-window",
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index d37f9cc..dfaa5eb 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -32,6 +32,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
+import static org.testng.Assert.assertTrue;
import java.net.SocketException;
import java.util.ArrayList;
@@ -45,6 +46,7 @@
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.schema.IntegerSyntax;
@@ -99,6 +101,18 @@
protected Entry replServerEntry;
/**
+ * Replication monitor stats
+ */
+ private DN monitorDn;
+ private String monitorAttr;
+ private long lastCount;
+
+ /**
+ * schema check flag
+ */
+ protected boolean schemaCheck;
+
+ /**
* The replication plugin entry
*/
protected String synchroPluginStringDN =
@@ -359,16 +373,17 @@
}
}
+
/**
- * Retrieve the number of replayed updates for a given replication
+ * Get the value of the specified attribute for a given replication
* domain from the monitor entry.
- * @return The number of replayed updates.
+ * @return The monitor value
* @throws Exception If an error occurs.
*/
- protected long getReplayedUpdatesCount(DN syncDN) throws Exception
+ protected long getMonitorAttrValue(DN baseDn, String attr) throws Exception
{
String monitorFilter =
- "(&(cn=replication*)(base-dn=" + syncDN + "))";
+ "(&(cn=replication plugin*)(base-dn=" + baseDn + "))";
InternalSearchOperation op;
int count = 0;
@@ -386,9 +401,8 @@
throw new Exception("Could not read monitoring information");
SearchResultEntry entry = op.getSearchEntries().getFirst();
-
AttributeType attrType =
- DirectoryServer.getDefaultAttributeType("replayed-updates");
+ DirectoryServer.getDefaultAttributeType(attr);
return entry.getAttributeValue(attrType, IntegerSyntax.DECODER).longValue();
}
@@ -499,7 +513,42 @@
LockManager.unlock(dn, lock);
}
}
-
+
+ /**
+ * Update the monitor count for the specified monitor attribute.
+ */
+ protected void updateMonitorCount(DN baseDn, String attr) {
+ monitorDn = baseDn;
+ monitorAttr = attr;
+ try
+ {
+ Thread.sleep(2000);
+ lastCount = getMonitorAttrValue(baseDn, attr);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ assertTrue(false);
+ }
+ }
+
+ /**
+ * Get the delta between the current / last monitor counts.
+ * @return The delta between the current and last monitor count.
+ */
+ protected long getMonitorDelta() {
+ long delta = 0;
+ try {
+ Thread.sleep(2000);
+ long currentCount = getMonitorAttrValue(monitorDn, monitorAttr);
+ delta = (currentCount - lastCount);
+ lastCount = currentCount;
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ assertTrue(false);
+ }
+ return delta;
+ }
/**
* Generate a new modification replace with the given information.
*
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
index 38fa771..a3d33e4 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -38,6 +38,7 @@
import org.opends.server.TestCaseUtils;
import org.opends.server.plugins.ShortCircuitPlugin;
+import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
@@ -399,6 +400,134 @@
}
/**
+ * Tests the modify conflict resolution code.
+ * In this test, the local server acts both as an LDAP server and
+ * a replicationServer that are inter-connected.
+ *
+ * The test creates an other session to the replicationServer using
+ * directly the ReplicationBroker API.
+ * It then uses this session to simulate conflicts and therefore
+ * test the modify conflict resolution code.
+ */
+ @Test(enabled=true, groups="slow")
+ public void modifyConflicts()
+ throws Exception
+ {
+ final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+ final DN dn1 = DN.decode("cn=test1," + baseDn.toString());
+ final AttributeType attrType =
+ DirectoryServer.getAttributeType("displayname");
+ final AttributeType entryuuidType =
+ DirectoryServer.getAttributeType("entryuuid");
+ String monitorAttr = "resolved-modify-conflicts";
+
+ /*
+ * Open a session to the replicationServer using the broker API.
+ * This must use a different serverId to that of the directory server.
+ */
+ ReplicationBroker broker =
+ openReplicationSession(baseDn, (short)2, 100, replServerPort, 1000, true);
+
+ // Add the first test entry.
+ TestCaseUtils.addEntry(
+ "dn: cn=test1," + baseDn.toString(),
+ "displayname: Test1",
+ "objectClass: top",
+ "objectClass: person",
+ "objectClass: organizationalPerson",
+ "objectClass: inetOrgPerson",
+ "cn: test1",
+ "sn: test"
+ );
+
+ // Read the entry back to get its UUID.
+ Entry entry = DirectoryServer.getEntry(dn1);
+ List<Attribute> attrs = entry.getAttribute(entryuuidType);
+ String entryuuid =
+ attrs.get(0).getValues().iterator().next().getStringValue();
+
+ // A change on a first server.
+ ChangeNumber t1 = new ChangeNumber(1, (short) 0, (short) 3);
+
+ // A change on a second server.
+ ChangeNumber t2 = new ChangeNumber(2, (short) 0, (short) 4);
+
+ // Simulate the ordering t2:replace:B followed by t1:add:A that
+ updateMonitorCount(baseDn, monitorAttr);
+
+ // Replay a replace of a value B at time t2 on a second server.
+ Attribute attr = new Attribute(attrType.getNormalizedPrimaryName(), "B");
+ Modification mod = new Modification(ModificationType.REPLACE, attr);
+ List<Modification> mods = new ArrayList<Modification>(1);
+ mods.add(mod);
+ ModifyMsg modMsg = new ModifyMsg(t2, dn1, mods, entryuuid);
+ broker.publish(modMsg);
+
+ Thread.sleep(2000);
+
+ // Replay an add of a value A at time t1 on a first server.
+ attr = new Attribute(attrType.getNormalizedPrimaryName(), "A");
+ mod = new Modification(ModificationType.ADD, attr);
+ mods = new ArrayList<Modification>(1);
+ mods.add(mod);
+ modMsg = new ModifyMsg(t1, dn1, mods, entryuuid);
+ broker.publish(modMsg);
+
+ Thread.sleep(2000);
+
+ // Read the entry to see how the conflict was resolved.
+ entry = DirectoryServer.getEntry(dn1);
+ attrs = entry.getAttribute(attrType);
+ String attrValue1 =
+ attrs.get(0).getValues().iterator().next().getStringValue();
+
+ // the value should be the last (time t2) value added
+ assertEquals(attrValue1, "B");
+ assertEquals(getMonitorDelta(), 1);
+
+ // Simulate the ordering t2:delete:displayname followed by
+ // t1:replace:displayname
+ // A change on a first server.
+ t1 = new ChangeNumber(3, (short) 0, (short) 3);
+
+ // A change on a second server.
+ t2 = new ChangeNumber(4, (short) 0, (short) 4);
+
+ // Simulate the ordering t2:delete:displayname followed by t1:replace:A
+ updateMonitorCount(baseDn, monitorAttr);
+
+ // Replay an delete of attribute displayname at time t2 on a second server.
+ attr = new Attribute(attrType);
+ mod = new Modification(ModificationType.DELETE, attr);
+ mods = new ArrayList<Modification>(1);
+ mods.add(mod);
+ modMsg = new ModifyMsg(t2, dn1, mods, entryuuid);
+ broker.publish(modMsg);
+
+ Thread.sleep(2000);
+
+ // Replay a replace of a value A at time t1 on a first server.
+ attr = new Attribute(attrType.getNormalizedPrimaryName(), "A");
+ mod = new Modification(ModificationType.REPLACE, attr);
+ mods = new ArrayList<Modification>(1);
+ mods.add(mod);
+ modMsg = new ModifyMsg(t1, dn1, mods, entryuuid);
+ broker.publish(modMsg);
+
+ Thread.sleep(2000);
+
+ // Read the entry to see how the conflict was resolved.
+ entry = DirectoryServer.getEntry(dn1);
+ attrs = entry.getAttribute(attrType);
+
+ // there should not be a value (delete at time t1)
+ assertNull(attrs);
+ assertEquals(getMonitorDelta(), 1);
+
+ broker.stop();
+ }
+
+ /**
* Tests the naming conflict resolution code.
* In this test, the local server act both as an LDAP server and
* a replicationServer that are inter-connected.
@@ -408,7 +537,7 @@
* It then uses this session to siomulate conflicts and therefore
* test the naming conflict resolution code.
*/
- @Test(enabled=true)
+ @Test(enabled=true, groups="slow")
public void namingConflicts() throws Exception
{
logError(ErrorLogCategory.SYNCHRONIZATION,
@@ -416,6 +545,7 @@
"Starting replication test : namingConflicts" , 1);
final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
+ String monitorAttr = "resolved-naming-conflicts";
/*
* Open a session to the replicationServer using the ReplicationServer broker API.
@@ -458,6 +588,7 @@
ModifyMsg modMsg = new ModifyMsg(gen.newChangeNumber(),
DN.decode("cn=something,ou=People,dc=example,dc=com"), mods,
user1entryUUID);
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(modMsg);
// check that the modify has been applied as if the entry had been renamed.
@@ -465,6 +596,7 @@
"telephonenumber", "01 02 45", 10000, true);
if (found == false)
fail("The modification has not been correctly replayed.");
+ assertEquals(getMonitorDelta(), 1);
/*
* Test that the conflict resolution code is able to detect
@@ -494,6 +626,7 @@
mods = generatemods("telephonenumber", "02 01 03 05");
modMsg = new ModifyMsg(gen.newChangeNumber(),
DN.decode(user1dn), mods, "10000000-9abc-def0-1234-1234567890ab");
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(modMsg);
// check that the modify has not been applied
@@ -501,6 +634,7 @@
"telephonenumber", "02 01 03 05", 10000, false);
if (found == true)
fail("The modification has been replayed while it should not.");
+ assertEquals(getMonitorDelta(), 1);
/*
@@ -515,6 +649,7 @@
DeleteMsg delMsg =
new DeleteMsg("cn=anotherdn,ou=People,dc=example,dc=com",
gen.newChangeNumber(), user1entryUUID);
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(delMsg);
// check that the delete operation has been applied
@@ -522,6 +657,7 @@
assertNull(resultEntry,
"The DELETE replication message was not replayed");
+ assertEquals(getMonitorDelta(), 1);
/*
* Test that two adds with the same DN but a different unique ID result
@@ -548,6 +684,7 @@
user1entrysecondUUID, baseUUID,
personWithSecondUniqueID.getObjectClassAttribute(),
personWithSecondUniqueID.getAttributes(), new ArrayList<Attribute>());
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(addMsg);
// Check that the entry has been renamed and created in the local DS.
@@ -556,6 +693,7 @@
10000, true);
assertNotNull(resultEntry,
"The ADD replication message was not applied");
+ assertEquals(getMonitorDelta(), 1);
// delete the entries to clean the database.
delMsg =
@@ -584,6 +722,7 @@
baseUUID,
personWithUUIDEntry.getObjectClassAttribute(),
personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(addMsg);
// Check that the entry has been renamed and created in the local DS.
@@ -591,6 +730,7 @@
DN.decode("uid=new person,ou=People,dc=example,dc=com"), 10000, true);
assertNotNull(resultEntry,
"The ADD replication message was not applied");
+ assertEquals(getMonitorDelta(), 1);
/*
* Check that when replaying delete the naming conflict code
@@ -604,6 +744,7 @@
delMsg =
new DeleteMsg("uid=new person,ou=People,dc=example,dc=com",
gen.newChangeNumber(), "11111111-9abc-def0-1234-1234567890ab");
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(delMsg);
resultEntry = getEntry(
DN.decode("uid=new person,ou=People,dc=example,dc=com"), 10000, true);
@@ -611,6 +752,7 @@
// check that the delete operation has not been applied
assertNotNull(resultEntry,
"The DELETE replication message was replayed when it should not");
+ assertEquals(getMonitorDelta(), 1);
/*
@@ -627,6 +769,7 @@
user1entryUUID, baseUUID, false,
"uid=wrong, ou=people,dc=example,dc=com",
"uid=newrdn");
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(modDnMsg);
resultEntry = getEntry(
@@ -635,6 +778,7 @@
// check that the operation has been correctly relayed
assertNotNull(resultEntry,
"The modify dn was not or badly replayed");
+ assertEquals(getMonitorDelta(), 1);
/*
* same test but by giving a bad entry DN
@@ -643,6 +787,7 @@
modDnMsg = new ModifyDNMsg(
"uid=wrong,ou=People,dc=example,dc=com", gen.newChangeNumber(),
user1entryUUID, baseUUID, false, null, "uid=reallynewrdn");
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(modDnMsg);
resultEntry = getEntry(
@@ -651,6 +796,7 @@
// check that the operation has been correctly relayed
assertNotNull(resultEntry,
"The modify dn was not or badly replayed");
+ assertEquals(getMonitorDelta(), 1);
/*
* Check that conflicting entries are renamed when a
@@ -664,6 +810,7 @@
baseUUID,
personWithSecondUniqueID.getObjectClassAttribute(),
personWithSecondUniqueID.getAttributes(), new ArrayList<Attribute>());
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(addMsg);
// check that the second entry has been added
@@ -671,11 +818,13 @@
// check that the add operation has been applied
assertNotNull(resultEntry, "The add operation was not replayed");
+ assertEquals(getMonitorDelta(), 1);
// try to rename the first entry
modDnMsg = new ModifyDNMsg(user1dn, gen.newChangeNumber(),
user1entrysecondUUID, baseUUID, false,
baseDn.toString(), "uid=reallynewrdn");
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(modDnMsg);
// check that the second entry has been renamed
@@ -685,6 +834,7 @@
// check that the delete operation has been applied
assertNotNull(resultEntry, "The modifyDN was not or incorrectly replayed");
+ assertEquals(getMonitorDelta(), 1);
// delete the entries to clean the database
delMsg =
@@ -790,6 +940,7 @@
// - publish msg
+ updateMonitorCount(baseDn, monitorAttr);
broker.publish(addMsg);
// - check that the Dn has been changed to baseDn2
@@ -803,6 +954,7 @@
assertNotNull(resultEntry,
"The ADD replication message was NOT applied under ou=baseDn2,"+baseDn);
entryList.add(resultEntry.getDN());
+ assertEquals(getMonitorDelta(), 1);
broker.stop();
@@ -1133,7 +1285,7 @@
assertEquals(addOp.getResultCode(), ResultCode.SUCCESS);
entryList.add(tmp.getDN());
- long initialCount = getReplayedUpdatesCount(baseDn);
+ long initialCount = getMonitorAttrValue(baseDn, "replayed-updates");
// Get the UUID of the test entry.
Entry resultEntry = getEntry(tmp.getDN(), 1, true);
@@ -1156,7 +1308,7 @@
// Wait for the operation to be replayed.
long endTime = System.currentTimeMillis() + 5000;
- while (getReplayedUpdatesCount(baseDn) == initialCount &&
+ while (getMonitorAttrValue(baseDn, "replayed-updates") == initialCount &&
System.currentTimeMillis() < endTime)
{
Thread.sleep(100);
@@ -1170,7 +1322,7 @@
// If the replication replay loop was detected and broken then the
// counter will still be updated even though the replay was unsuccessful.
- if (getReplayedUpdatesCount(baseDn) == initialCount)
+ if (getMonitorAttrValue(baseDn, "replayed-updates") == initialCount)
{
fail("Operation was not replayed");
}
--
Gitblit v1.10.0