From 8c4cf3b40f27ad043b961512a507a05bf8c1c566 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 23 Dec 2013 10:18:05 +0000
Subject: [PATCH] OPENDJ-1263 Changenumber does not progress on the second replication server of a topology
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 26 ++++++++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java | 8 ++
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 23 +++----
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 117 +++++++++++++++++++++++++-------------
4 files changed, 119 insertions(+), 55 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index a031d43..1a2e8c8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -222,9 +222,10 @@
public static void deleteDomain(DN dn)
{
LDAPReplicationDomain domain = domains.remove(dn);
-
if (domain != null)
+ {
domain.delete();
+ }
// No replay threads running if no replication need
if (domains.size() == 0) {
@@ -257,8 +258,7 @@
// Create the list of domains that are already defined.
for (String name : configuration.listReplicationDomains())
{
- ReplicationDomainCfg domain = configuration.getReplicationDomain(name);
- createNewDomain(domain);
+ createNewDomain(configuration.getReplicationDomain(name));
}
/*
@@ -458,9 +458,7 @@
modifyOperation.setAttachment(EntryHistorical.HISTORICAL,
historicalInformation);
}
-
historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
-
historicalInformation.setHistoricalAttrToOperation(modifyOperation);
if (modifyOperation.getModifications().isEmpty())
@@ -509,7 +507,6 @@
modifyDNOperation.setAttachment(EntryHistorical.HISTORICAL,
historicalInformation);
}
-
historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
// Add to the operation the historical attribute : "dn:changeNumber:moddn"
@@ -798,18 +795,20 @@
}
/**
- * Returns whether the provided baseDN is disabled for the external changelog.
+ * Returns whether the provided baseDN represents a replication domain enabled
+ * for the external changelog.
*
* @param baseDN
- * the domain to check
- * @return true if the provided baseDN is disabled for the external changelog,
- * false otherwise
+ * the replication domain to check
+ * @return true if the provided baseDN is enabled for the external changelog,
+ * false if the provided baseDN is disabled for the external changelog
+ * or unknown to multimaster replication.
*/
- public static boolean isECLDisabledDomain(DN baseDN)
+ public static boolean isECLEnabledDomain(DN baseDN)
{
for (LDAPReplicationDomain domain : domains.values())
{
- if (!domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))
+ if (domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))
{
return true;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 3b2c115..a9320a4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -176,7 +176,7 @@
*/
public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
{
- if (MultimasterReplication.isECLDisabledDomain(baseDN))
+ if (!isECLEnabledDomain(baseDN))
{
return;
}
@@ -198,7 +198,7 @@
public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
throws ChangelogException
{
- if (MultimasterReplication.isECLDisabledDomain(baseDN))
+ if (!isECLEnabledDomain(baseDN))
{
return;
}
@@ -211,6 +211,23 @@
}
/**
+ * Returns whether the provided baseDN represents a replication domain enabled
+ * for the external changelog.
+ * <p>
+ * This method is a test seam that break the dependency on a static method.
+ *
+ * @param baseDN
+ * the replication domain to check
+ * @return true if the provided baseDN is enabled for the external changelog,
+ * false if the provided baseDN is disabled for the external changelog
+ * or unknown to multimaster replication.
+ */
+ protected boolean isECLEnabledDomain(DN baseDN)
+ {
+ return MultimasterReplication.isECLEnabledDomain(baseDN);
+ }
+
+ /**
* Returns the last time each serverId was seen alive for the specified
* replication domain.
*
@@ -288,6 +305,11 @@
: changelogState.getDomainToServerIds().entrySet())
{
final DN baseDN = entry.getKey();
+ if (!isECLEnabledDomain(baseDN))
+ {
+ continue;
+ }
+
for (Integer serverId : entry.getValue())
{
final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index b6cd624..8af100b 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -93,8 +93,10 @@
}
private static DN BASE_DN;
+ private static DN ADMIN_DATA_DN;
private static final int serverId1 = 101;
private static final int serverId2 = 102;
+ private static final int serverId3 = 103;
private ChangelogDB changelogDB;
private ChangeNumberIndexDB cnIndexDB;
@@ -102,7 +104,7 @@
private Map<Pair<DN, Integer>, SequentialDBCursor> cursors =
new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
private ChangelogState initialState;
- private ChangeNumberIndexer indexer;
+ private ChangeNumberIndexer cnIndexer;
private MultiDomainServerState initialCookie;
@BeforeClass
@@ -110,6 +112,7 @@
{
TestCaseUtils.startFakeServer();
BASE_DN = DN.decode("dc=example,dc=com");
+ ADMIN_DATA_DN = DN.decode("cn=admin data");
}
@AfterClass
@@ -134,7 +137,7 @@
@AfterMethod
public void tearDown() throws Exception
{
- stopIndexer();
+ stopCNIndexer();
}
private static final String EMPTY_DB_NO_DS = "emptyDBNoDS";
@@ -142,19 +145,19 @@
@Test
public void emptyDBNoDS() throws Exception
{
- startIndexer();
- verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
+ startCNIndexer();
+ assertExternalChangelogContent();
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneInitialDS() throws Exception
{
addReplica(BASE_DN, serverId1);
- startIndexer();
+ startCNIndexer();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
publishUpdateMsg(msg1);
- assertAddedRecords(msg1);
+ assertExternalChangelogContent(msg1);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -163,11 +166,11 @@
final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
addReplica(BASE_DN, serverId1);
setDBInitialRecords(msg1);
- startIndexer();
+ startCNIndexer();
final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
publishUpdateMsg(msg2);
- assertAddedRecords(msg2);
+ assertExternalChangelogContent(msg2);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -175,12 +178,12 @@
{
addReplica(BASE_DN, serverId1);
addReplica(BASE_DN, serverId2);
- startIndexer();
+ startCNIndexer();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
publishUpdateMsg(msg2, msg1);
- assertAddedRecords(msg1);
+ assertExternalChangelogContent(msg1);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -191,20 +194,20 @@
addReplica(BASE_DN, serverId1);
addReplica(BASE_DN, serverId2);
setDBInitialRecords(msg1, msg2);
- startIndexer();
+ startCNIndexer();
final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
publishUpdateMsg(msg3, msg4);
- assertAddedRecords(msg3);
+ assertExternalChangelogContent(msg3);
final ReplicatedUpdateMsg msg5 = msg(BASE_DN, serverId1, 5);
publishUpdateMsg(msg5);
- assertAddedRecords(msg3);
+ assertExternalChangelogContent(msg3);
final ReplicatedUpdateMsg msg6 = msg(BASE_DN, serverId2, 6);
publishUpdateMsg(msg6);
- assertAddedRecords(msg3, msg4, msg5);
+ assertExternalChangelogContent(msg3, msg4, msg5);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -212,7 +215,7 @@
{
addReplica(BASE_DN, serverId1);
addReplica(BASE_DN, serverId2);
- startIndexer();
+ startCNIndexer();
final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN, serverId2, 1);
final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN, serverId2);
@@ -220,14 +223,34 @@
final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
// simulate no messages received during some time for replica 2
publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
- assertAddedRecords(msg1Sid2, msg2Sid1);
+ assertExternalChangelogContent(msg1Sid2, msg2Sid1);
+ }
+
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception
+ {
+ addReplica(ADMIN_DATA_DN, serverId1);
+ addReplica(BASE_DN, serverId2);
+ addReplica(BASE_DN, serverId3);
+ startCNIndexer();
+
+ // cn=admin data will does not participate in the external changelog
+ // so it cannot add to it
+ final ReplicatedUpdateMsg msg1 = msg(ADMIN_DATA_DN, serverId1, 1);
+ publishUpdateMsg(msg1);
+ assertExternalChangelogContent();
+
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId3, 3);
+ publishUpdateMsg(msg2, msg3);
+ assertExternalChangelogContent(msg2);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
{
addReplica(BASE_DN, serverId1);
- startIndexer();
+ startCNIndexer();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
publishUpdateMsg(msg1);
@@ -235,11 +258,11 @@
addReplica(BASE_DN, serverId2);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
publishUpdateMsg(msg2);
- assertAddedRecords(msg1);
+ assertExternalChangelogContent(msg1);
final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId1, 3);
publishUpdateMsg(msg3);
- assertAddedRecords(msg1, msg2);
+ assertExternalChangelogContent(msg1, msg2);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -247,15 +270,15 @@
{
addReplica(BASE_DN, serverId1);
addReplica(BASE_DN, serverId2);
- startIndexer();
+ startCNIndexer();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
publishUpdateMsg(msg1, msg2);
- assertAddedRecords(msg1);
+ assertExternalChangelogContent(msg1);
sendHeartbeat(BASE_DN, serverId1, 3);
- assertAddedRecords(msg1, msg2);
+ assertExternalChangelogContent(msg1, msg2);
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -263,22 +286,22 @@
{
addReplica(BASE_DN, serverId1);
addReplica(BASE_DN, serverId2);
- startIndexer();
+ startCNIndexer();
final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
publishUpdateMsg(msg1, msg2);
- assertAddedRecords(msg1);
+ assertExternalChangelogContent(msg1);
replicaOffline(BASE_DN, serverId2, 3);
// MCP cannot move forward since no new updates from serverId1
- assertAddedRecords(msg1);
+ assertExternalChangelogContent(msg1);
final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
publishUpdateMsg(msg4);
// MCP moved forward after receiving update from serverId1
// (last replica in the domain)
- assertAddedRecords(msg1, msg2, msg4);
+ assertExternalChangelogContent(msg1, msg2, msg4);
}
@@ -292,16 +315,23 @@
initialState.addServerIdToDomain(serverId, baseDN);
}
- private void startIndexer()
+ private void startCNIndexer()
{
- indexer = new ChangeNumberIndexer(changelogDB, initialState);
- indexer.start();
- waitForWaitingState(indexer);
+ cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
+ {
+ @Override
+ protected boolean isECLEnabledDomain(DN baseDN)
+ {
+ return BASE_DN.equals(baseDN);
+ }
+ };
+ cnIndexer.start();
+ waitForWaitingState(cnIndexer);
}
- private void stopIndexer()
+ private void stopCNIndexer()
{
- indexer.initiateShutdown();
+ cnIndexer.initiateShutdown();
}
private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time)
@@ -356,22 +386,22 @@
{
if (!msg.isEmptyCursor())
{
- indexer.publishUpdateMsg(msg.getBaseDN(), msg);
+ cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
}
}
- waitForWaitingState(indexer);
+ waitForWaitingState(cnIndexer);
}
private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception
{
- indexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
- waitForWaitingState(indexer);
+ cnIndexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
+ waitForWaitingState(cnIndexer);
}
private void replicaOffline(DN baseDN, int serverId, int time) throws Exception
{
- indexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
- waitForWaitingState(indexer);
+ cnIndexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
+ waitForWaitingState(cnIndexer);
}
private void waitForWaitingState(final Thread t)
@@ -391,8 +421,15 @@
* Asserts which records have been added to the CNIndexDB since starting the
* {@link ChangeNumberIndexer} thread.
*/
- private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception
+ private void assertExternalChangelogContent(ReplicatedUpdateMsg... msgs)
+ throws Exception
{
+ if (msgs.length == 0)
+ {
+ verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
+ return;
+ }
+
final ArgumentCaptor<ChangeNumberIndexRecord> arg =
ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
@@ -410,7 +447,7 @@
final ChangeNumberIndexRecord record = allValues.get(i);
if (previousCookie.isEmpty())
{
- // ugly hack to go round strange legacy code
+ // ugly hack to go round strange legacy code @see OPENDJ-67
previousCookie.replace(record.getBaseDN(), new ServerState());
}
// check content in order
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
index 46376e3..bd8c613 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
@@ -47,6 +47,12 @@
null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
false, false, false, false);
+ AttributeType COMMON_NAME = new AttributeType(
+ "( 2.5.4.3 NAME ( 'cn' 'commonName' ) SUP name X-ORIGIN 'RFC 4519' )",
+ "commonName", Arrays.asList("cn", "commonName"), "2.5.4.3",
+ null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
+ false, false, false, false);
+
AttributeType ORGANIZATION_NAME = new AttributeType(
"( 2.5.4.10 NAME ( 'o' 'organizationName' ) SUP name X-ORIGIN 'RFC 4519' )",
"organizationName", Arrays.asList("o", "organizationName"), "2.5.4.10",
@@ -67,7 +73,7 @@
null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
false, false, false, false);
- AttributeType[] ALL = { OBJECT_CLASS, ORGANIZATION_NAME,
+ AttributeType[] ALL = { OBJECT_CLASS, COMMON_NAME, ORGANIZATION_NAME,
ORGANIZATIONAL_UNIT_NAME, DOMAIN_COMPONENT, };
}
--
Gitblit v1.10.0