From 8c4cf3b40f27ad043b961512a507a05bf8c1c566 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 23 Dec 2013 10:18:05 +0000
Subject: [PATCH] OPENDJ-1263 Changenumber does not progress on the second replication server of a topology

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |   26 ++++++++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java                            |    8 ++
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java                                       |   23 +++----
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |  117 +++++++++++++++++++++++++-------------
 4 files changed, 119 insertions(+), 55 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index a031d43..1a2e8c8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -222,9 +222,10 @@
   public static void deleteDomain(DN dn)
   {
     LDAPReplicationDomain domain = domains.remove(dn);
-
     if (domain != null)
+    {
       domain.delete();
+    }
 
     // No replay threads running if no replication need
     if (domains.size() == 0) {
@@ -257,8 +258,7 @@
     //  Create the list of domains that are already defined.
     for (String name : configuration.listReplicationDomains())
     {
-      ReplicationDomainCfg domain = configuration.getReplicationDomain(name);
-      createNewDomain(domain);
+      createNewDomain(configuration.getReplicationDomain(name));
     }
 
     /*
@@ -458,9 +458,7 @@
       modifyOperation.setAttachment(EntryHistorical.HISTORICAL,
           historicalInformation);
     }
-
     historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
-
     historicalInformation.setHistoricalAttrToOperation(modifyOperation);
 
     if (modifyOperation.getModifications().isEmpty())
@@ -509,7 +507,6 @@
       modifyDNOperation.setAttachment(EntryHistorical.HISTORICAL,
           historicalInformation);
     }
-
     historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
 
     // Add to the operation the historical attribute : "dn:changeNumber:moddn"
@@ -798,18 +795,20 @@
   }
 
   /**
-   * Returns whether the provided baseDN is disabled for the external changelog.
+   * Returns whether the provided baseDN represents a replication domain enabled
+   * for the external changelog.
    *
    * @param baseDN
-   *          the domain to check
-   * @return true if the provided baseDN is disabled for the external changelog,
-   *         false otherwise
+   *          the replication domain to check
+   * @return true if the provided baseDN is enabled for the external changelog,
+   *         false if the provided baseDN is disabled for the external changelog
+   *         or unknown to multimaster replication.
    */
-  public static boolean isECLDisabledDomain(DN baseDN)
+  public static boolean isECLEnabledDomain(DN baseDN)
   {
     for (LDAPReplicationDomain domain : domains.values())
     {
-      if (!domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))
+      if (domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))
       {
         return true;
       }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 3b2c115..a9320a4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -176,7 +176,7 @@
    */
   public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
   {
-    if (MultimasterReplication.isECLDisabledDomain(baseDN))
+    if (!isECLEnabledDomain(baseDN))
     {
       return;
     }
@@ -198,7 +198,7 @@
   public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
       throws ChangelogException
   {
-    if (MultimasterReplication.isECLDisabledDomain(baseDN))
+    if (!isECLEnabledDomain(baseDN))
     {
       return;
     }
@@ -211,6 +211,23 @@
   }
 
   /**
+   * Returns whether the provided baseDN represents a replication domain enabled
+   * for the external changelog.
+   * <p>
+   * This method is a test seam that break the dependency on a static method.
+   *
+   * @param baseDN
+   *          the replication domain to check
+   * @return true if the provided baseDN is enabled for the external changelog,
+   *         false if the provided baseDN is disabled for the external changelog
+   *         or unknown to multimaster replication.
+   */
+  protected boolean isECLEnabledDomain(DN baseDN)
+  {
+    return MultimasterReplication.isECLEnabledDomain(baseDN);
+  }
+
+  /**
    * Returns the last time each serverId was seen alive for the specified
    * replication domain.
    *
@@ -288,6 +305,11 @@
         : changelogState.getDomainToServerIds().entrySet())
     {
       final DN baseDN = entry.getKey();
+      if (!isECLEnabledDomain(baseDN))
+      {
+        continue;
+      }
+
       for (Integer serverId : entry.getValue())
       {
         final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index b6cd624..8af100b 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -93,8 +93,10 @@
   }
 
   private static DN BASE_DN;
+  private static DN ADMIN_DATA_DN;
   private static final int serverId1 = 101;
   private static final int serverId2 = 102;
+  private static final int serverId3 = 103;
 
   private ChangelogDB changelogDB;
   private ChangeNumberIndexDB cnIndexDB;
@@ -102,7 +104,7 @@
   private Map<Pair<DN, Integer>, SequentialDBCursor> cursors =
       new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
   private ChangelogState initialState;
-  private ChangeNumberIndexer indexer;
+  private ChangeNumberIndexer cnIndexer;
   private MultiDomainServerState initialCookie;
 
   @BeforeClass
@@ -110,6 +112,7 @@
   {
     TestCaseUtils.startFakeServer();
     BASE_DN = DN.decode("dc=example,dc=com");
+    ADMIN_DATA_DN = DN.decode("cn=admin data");
   }
 
   @AfterClass
@@ -134,7 +137,7 @@
   @AfterMethod
   public void tearDown() throws Exception
   {
-    stopIndexer();
+    stopCNIndexer();
   }
 
   private static final String EMPTY_DB_NO_DS = "emptyDBNoDS";
@@ -142,19 +145,19 @@
   @Test
   public void emptyDBNoDS() throws Exception
   {
-    startIndexer();
-    verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
+    startCNIndexer();
+    assertExternalChangelogContent();
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBOneInitialDS() throws Exception
   {
     addReplica(BASE_DN, serverId1);
-    startIndexer();
+    startCNIndexer();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
     publishUpdateMsg(msg1);
-    assertAddedRecords(msg1);
+    assertExternalChangelogContent(msg1);
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -163,11 +166,11 @@
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
     addReplica(BASE_DN, serverId1);
     setDBInitialRecords(msg1);
-    startIndexer();
+    startCNIndexer();
 
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
     publishUpdateMsg(msg2);
-    assertAddedRecords(msg2);
+    assertExternalChangelogContent(msg2);
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -175,12 +178,12 @@
   {
     addReplica(BASE_DN, serverId1);
     addReplica(BASE_DN, serverId2);
-    startIndexer();
+    startCNIndexer();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
     publishUpdateMsg(msg2, msg1);
-    assertAddedRecords(msg1);
+    assertExternalChangelogContent(msg1);
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -191,20 +194,20 @@
     addReplica(BASE_DN, serverId1);
     addReplica(BASE_DN, serverId2);
     setDBInitialRecords(msg1, msg2);
-    startIndexer();
+    startCNIndexer();
 
     final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
     final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
     publishUpdateMsg(msg3, msg4);
-    assertAddedRecords(msg3);
+    assertExternalChangelogContent(msg3);
 
     final ReplicatedUpdateMsg msg5 = msg(BASE_DN, serverId1, 5);
     publishUpdateMsg(msg5);
-    assertAddedRecords(msg3);
+    assertExternalChangelogContent(msg3);
 
     final ReplicatedUpdateMsg msg6 = msg(BASE_DN, serverId2, 6);
     publishUpdateMsg(msg6);
-    assertAddedRecords(msg3, msg4, msg5);
+    assertExternalChangelogContent(msg3, msg4, msg5);
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -212,7 +215,7 @@
   {
     addReplica(BASE_DN, serverId1);
     addReplica(BASE_DN, serverId2);
-    startIndexer();
+    startCNIndexer();
 
     final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN, serverId2, 1);
     final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN, serverId2);
@@ -220,14 +223,34 @@
     final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
     // simulate no messages received during some time for replica 2
     publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
-    assertAddedRecords(msg1Sid2, msg2Sid1);
+    assertExternalChangelogContent(msg1Sid2, msg2Sid1);
+  }
+
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception
+  {
+    addReplica(ADMIN_DATA_DN, serverId1);
+    addReplica(BASE_DN, serverId2);
+    addReplica(BASE_DN, serverId3);
+    startCNIndexer();
+
+    // cn=admin data will does not participate in the external changelog
+    // so it cannot add to it
+    final ReplicatedUpdateMsg msg1 = msg(ADMIN_DATA_DN, serverId1, 1);
+    publishUpdateMsg(msg1);
+    assertExternalChangelogContent();
+
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId3, 3);
+    publishUpdateMsg(msg2, msg3);
+    assertExternalChangelogContent(msg2);
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
   {
     addReplica(BASE_DN, serverId1);
-    startIndexer();
+    startCNIndexer();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
     publishUpdateMsg(msg1);
@@ -235,11 +258,11 @@
     addReplica(BASE_DN, serverId2);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
     publishUpdateMsg(msg2);
-    assertAddedRecords(msg1);
+    assertExternalChangelogContent(msg1);
 
     final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId1, 3);
     publishUpdateMsg(msg3);
-    assertAddedRecords(msg1, msg2);
+    assertExternalChangelogContent(msg1, msg2);
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -247,15 +270,15 @@
   {
     addReplica(BASE_DN, serverId1);
     addReplica(BASE_DN, serverId2);
-    startIndexer();
+    startCNIndexer();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
     publishUpdateMsg(msg1, msg2);
-    assertAddedRecords(msg1);
+    assertExternalChangelogContent(msg1);
 
     sendHeartbeat(BASE_DN, serverId1, 3);
-    assertAddedRecords(msg1, msg2);
+    assertExternalChangelogContent(msg1, msg2);
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -263,22 +286,22 @@
   {
     addReplica(BASE_DN, serverId1);
     addReplica(BASE_DN, serverId2);
-    startIndexer();
+    startCNIndexer();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
     publishUpdateMsg(msg1, msg2);
-    assertAddedRecords(msg1);
+    assertExternalChangelogContent(msg1);
 
     replicaOffline(BASE_DN, serverId2, 3);
     // MCP cannot move forward since no new updates from serverId1
-    assertAddedRecords(msg1);
+    assertExternalChangelogContent(msg1);
 
     final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
     publishUpdateMsg(msg4);
     // MCP moved forward after receiving update from serverId1
     // (last replica in the domain)
-    assertAddedRecords(msg1, msg2, msg4);
+    assertExternalChangelogContent(msg1, msg2, msg4);
   }
 
 
@@ -292,16 +315,23 @@
     initialState.addServerIdToDomain(serverId, baseDN);
   }
 
-  private void startIndexer()
+  private void startCNIndexer()
   {
-    indexer = new ChangeNumberIndexer(changelogDB, initialState);
-    indexer.start();
-    waitForWaitingState(indexer);
+    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
+    {
+      @Override
+      protected boolean isECLEnabledDomain(DN baseDN)
+      {
+        return BASE_DN.equals(baseDN);
+      }
+    };
+    cnIndexer.start();
+    waitForWaitingState(cnIndexer);
   }
 
-  private void stopIndexer()
+  private void stopCNIndexer()
   {
-    indexer.initiateShutdown();
+    cnIndexer.initiateShutdown();
   }
 
   private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time)
@@ -356,22 +386,22 @@
     {
       if (!msg.isEmptyCursor())
       {
-        indexer.publishUpdateMsg(msg.getBaseDN(), msg);
+        cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
       }
     }
-    waitForWaitingState(indexer);
+    waitForWaitingState(cnIndexer);
   }
 
   private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception
   {
-    indexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
-    waitForWaitingState(indexer);
+    cnIndexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
+    waitForWaitingState(cnIndexer);
   }
 
   private void replicaOffline(DN baseDN, int serverId, int time) throws Exception
   {
-    indexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
-    waitForWaitingState(indexer);
+    cnIndexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
+    waitForWaitingState(cnIndexer);
   }
 
   private void waitForWaitingState(final Thread t)
@@ -391,8 +421,15 @@
    * Asserts which records have been added to the CNIndexDB since starting the
    * {@link ChangeNumberIndexer} thread.
    */
-  private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception
+  private void assertExternalChangelogContent(ReplicatedUpdateMsg... msgs)
+      throws Exception
   {
+    if (msgs.length == 0)
+    {
+      verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
+      return;
+    }
+
     final ArgumentCaptor<ChangeNumberIndexRecord> arg =
         ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
     verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
@@ -410,7 +447,7 @@
       final ChangeNumberIndexRecord record = allValues.get(i);
       if (previousCookie.isEmpty())
       {
-        // ugly hack to go round strange legacy code
+        // ugly hack to go round strange legacy code @see OPENDJ-67
         previousCookie.replace(record.getBaseDN(), new ServerState());
       }
       // check content in order
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
index 46376e3..bd8c613 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
@@ -47,6 +47,12 @@
       null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
       false, false, false, false);
 
+  AttributeType COMMON_NAME = new AttributeType(
+      "( 2.5.4.3 NAME ( 'cn' 'commonName' ) SUP name X-ORIGIN 'RFC 4519' )",
+      "commonName", Arrays.asList("cn", "commonName"), "2.5.4.3",
+      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
+      false, false, false, false);
+
   AttributeType ORGANIZATION_NAME = new AttributeType(
       "( 2.5.4.10 NAME ( 'o' 'organizationName' ) SUP name X-ORIGIN 'RFC 4519' )",
       "organizationName", Arrays.asList("o", "organizationName"), "2.5.4.10",
@@ -67,7 +73,7 @@
       null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
       false, false, false, false);
 
-  AttributeType[] ALL = { OBJECT_CLASS, ORGANIZATION_NAME,
+  AttributeType[] ALL = { OBJECT_CLASS, COMMON_NAME, ORGANIZATION_NAME,
     ORGANIZATIONAL_UNIT_NAME, DOMAIN_COMPONENT, };
 
 }

--
Gitblit v1.10.0