mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.18.2013 ff99696311668f339200080a826bbc6efc708291
OPENDJ-1263 Changenumber does not progress on the second replication server of a topology


The medium consistency point was not progressing on DSRS 2 because the ChangeNumberIndexer mediumConsistencyCSN had been polluted by CSN from "cn=admin data".
Problem is that ChangeNumberIndexer's publishUpdateMsg() was filtering out updates from ECL disabled domains, but not from domains unknown to ECL.
On start up, update messages can be received from replication domains have not been been configured yet on the current replication server.
The fix consists in only updating the medium consistency point with update messages or replica heartbeats coming from explicitly enabled replication domains.
After this fix, changes from ECL disabled domains will still be stored in dedicated replicaDBs, there is no change in this functionality.


MultimasterReplication.java
Renamed isECLDisabledDomain() to isECLEnabledDomain() so it caters better for the 3 states of the domains: enabled, disabled and unknown
Code cleanup.

ChangeNumberIndexer.java:
Extracted method isECLEnabledDomain() to use as a seam for unit testing which calls to MultimasterReplication.isECLEnabledDomain().

ChangeNumberIndexerTest.java
Added one test emptyDBThreeInitialDSsOneIsNotECLEnabledDomain().
Renamed assertAddedRecords() to assertExternalChangelogContent(), startIndexer() to startCNIndexer(), stopIndexer() to stopCNIndexer() and indexer to cnIndexer.
In startCNIndexer(), overrode ChangeNumberIndexer.isNotECLEnabledDomain().

AttributeTypeConstants.java:
Added support for "cn"
4 files modified
174 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 23 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 26 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 117 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -222,9 +222,10 @@
  public static void deleteDomain(DN dn)
  {
    LDAPReplicationDomain domain = domains.remove(dn);
    if (domain != null)
    {
      domain.delete();
    }
    // No replay threads running if no replication need
    if (domains.size() == 0) {
@@ -257,8 +258,7 @@
    //  Create the list of domains that are already defined.
    for (String name : configuration.listReplicationDomains())
    {
      ReplicationDomainCfg domain = configuration.getReplicationDomain(name);
      createNewDomain(domain);
      createNewDomain(configuration.getReplicationDomain(name));
    }
    /*
@@ -458,9 +458,7 @@
      modifyOperation.setAttachment(EntryHistorical.HISTORICAL,
          historicalInformation);
    }
    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
    historicalInformation.setHistoricalAttrToOperation(modifyOperation);
    if (modifyOperation.getModifications().isEmpty())
@@ -509,7 +507,6 @@
      modifyDNOperation.setAttachment(EntryHistorical.HISTORICAL,
          historicalInformation);
    }
    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
    // Add to the operation the historical attribute : "dn:changeNumber:moddn"
@@ -798,18 +795,20 @@
  }
  /**
   * Returns whether the provided baseDN is disabled for the external changelog.
   * Returns whether the provided baseDN represents a replication domain enabled
   * for the external changelog.
   *
   * @param baseDN
   *          the domain to check
   * @return true if the provided baseDN is disabled for the external changelog,
   *         false otherwise
   *          the replication domain to check
   * @return true if the provided baseDN is enabled for the external changelog,
   *         false if the provided baseDN is disabled for the external changelog
   *         or unknown to multimaster replication.
   */
  public static boolean isECLDisabledDomain(DN baseDN)
  public static boolean isECLEnabledDomain(DN baseDN)
  {
    for (LDAPReplicationDomain domain : domains.values())
    {
      if (!domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))
      if (domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))
      {
        return true;
      }
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -176,7 +176,7 @@
   */
  public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
  {
    if (MultimasterReplication.isECLDisabledDomain(baseDN))
    if (!isECLEnabledDomain(baseDN))
    {
      return;
    }
@@ -198,7 +198,7 @@
  public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
      throws ChangelogException
  {
    if (MultimasterReplication.isECLDisabledDomain(baseDN))
    if (!isECLEnabledDomain(baseDN))
    {
      return;
    }
@@ -211,6 +211,23 @@
  }
  /**
   * Returns whether the provided baseDN represents a replication domain enabled
   * for the external changelog.
   * <p>
   * This method is a test seam that break the dependency on a static method.
   *
   * @param baseDN
   *          the replication domain to check
   * @return true if the provided baseDN is enabled for the external changelog,
   *         false if the provided baseDN is disabled for the external changelog
   *         or unknown to multimaster replication.
   */
  protected boolean isECLEnabledDomain(DN baseDN)
  {
    return MultimasterReplication.isECLEnabledDomain(baseDN);
  }
  /**
   * Returns the last time each serverId was seen alive for the specified
   * replication domain.
   *
@@ -288,6 +305,11 @@
        : changelogState.getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
      if (!isECLEnabledDomain(baseDN))
      {
        continue;
      }
      for (Integer serverId : entry.getValue())
      {
        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -93,8 +93,10 @@
  }
  private static DN BASE_DN;
  private static DN ADMIN_DATA_DN;
  private static final int serverId1 = 101;
  private static final int serverId2 = 102;
  private static final int serverId3 = 103;
  private ChangelogDB changelogDB;
  private ChangeNumberIndexDB cnIndexDB;
@@ -102,7 +104,7 @@
  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors =
      new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
  private ChangelogState initialState;
  private ChangeNumberIndexer indexer;
  private ChangeNumberIndexer cnIndexer;
  private MultiDomainServerState initialCookie;
  @BeforeClass
@@ -110,6 +112,7 @@
  {
    TestCaseUtils.startFakeServer();
    BASE_DN = DN.decode("dc=example,dc=com");
    ADMIN_DATA_DN = DN.decode("cn=admin data");
  }
  @AfterClass
@@ -134,7 +137,7 @@
  @AfterMethod
  public void tearDown() throws Exception
  {
    stopIndexer();
    stopCNIndexer();
  }
  private static final String EMPTY_DB_NO_DS = "emptyDBNoDS";
@@ -142,19 +145,19 @@
  @Test
  public void emptyDBNoDS() throws Exception
  {
    startIndexer();
    verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
    startCNIndexer();
    assertExternalChangelogContent();
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDS() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    publishUpdateMsg(msg1);
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -163,11 +166,11 @@
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    addReplica(BASE_DN, serverId1);
    setDBInitialRecords(msg1);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
    publishUpdateMsg(msg2);
    assertAddedRecords(msg2);
    assertExternalChangelogContent(msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -175,12 +178,12 @@
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg2, msg1);
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -191,20 +194,20 @@
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    setDBInitialRecords(msg1, msg2);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
    publishUpdateMsg(msg3, msg4);
    assertAddedRecords(msg3);
    assertExternalChangelogContent(msg3);
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN, serverId1, 5);
    publishUpdateMsg(msg5);
    assertAddedRecords(msg3);
    assertExternalChangelogContent(msg3);
    final ReplicatedUpdateMsg msg6 = msg(BASE_DN, serverId2, 6);
    publishUpdateMsg(msg6);
    assertAddedRecords(msg3, msg4, msg5);
    assertExternalChangelogContent(msg3, msg4, msg5);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -212,7 +215,7 @@
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN, serverId2, 1);
    final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN, serverId2);
@@ -220,14 +223,34 @@
    final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
    // simulate no messages received during some time for replica 2
    publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
    assertAddedRecords(msg1Sid2, msg2Sid1);
    assertExternalChangelogContent(msg1Sid2, msg2Sid1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception
  {
    addReplica(ADMIN_DATA_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    addReplica(BASE_DN, serverId3);
    startCNIndexer();
    // cn=admin data will does not participate in the external changelog
    // so it cannot add to it
    final ReplicatedUpdateMsg msg1 = msg(ADMIN_DATA_DN, serverId1, 1);
    publishUpdateMsg(msg1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId3, 3);
    publishUpdateMsg(msg2, msg3);
    assertExternalChangelogContent(msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -235,11 +258,11 @@
    addReplica(BASE_DN, serverId2);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg2);
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId1, 3);
    publishUpdateMsg(msg3);
    assertAddedRecords(msg1, msg2);
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -247,15 +270,15 @@
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg1, msg2);
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
    sendHeartbeat(BASE_DN, serverId1, 3);
    assertAddedRecords(msg1, msg2);
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -263,22 +286,22 @@
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg1, msg2);
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
    replicaOffline(BASE_DN, serverId2, 3);
    // MCP cannot move forward since no new updates from serverId1
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
    publishUpdateMsg(msg4);
    // MCP moved forward after receiving update from serverId1
    // (last replica in the domain)
    assertAddedRecords(msg1, msg2, msg4);
    assertExternalChangelogContent(msg1, msg2, msg4);
  }
@@ -292,16 +315,23 @@
    initialState.addServerIdToDomain(serverId, baseDN);
  }
  private void startIndexer()
  private void startCNIndexer()
  {
    indexer = new ChangeNumberIndexer(changelogDB, initialState);
    indexer.start();
    waitForWaitingState(indexer);
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
    {
      @Override
      protected boolean isECLEnabledDomain(DN baseDN)
      {
        return BASE_DN.equals(baseDN);
      }
    };
    cnIndexer.start();
    waitForWaitingState(cnIndexer);
  }
  private void stopIndexer()
  private void stopCNIndexer()
  {
    indexer.initiateShutdown();
    cnIndexer.initiateShutdown();
  }
  private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time)
@@ -356,22 +386,22 @@
    {
      if (!msg.isEmptyCursor())
      {
        indexer.publishUpdateMsg(msg.getBaseDN(), msg);
        cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
      }
    }
    waitForWaitingState(indexer);
    waitForWaitingState(cnIndexer);
  }
  private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception
  {
    indexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
    waitForWaitingState(indexer);
    cnIndexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
    waitForWaitingState(cnIndexer);
  }
  private void replicaOffline(DN baseDN, int serverId, int time) throws Exception
  {
    indexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
    waitForWaitingState(indexer);
    cnIndexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
    waitForWaitingState(cnIndexer);
  }
  private void waitForWaitingState(final Thread t)
@@ -391,8 +421,15 @@
   * Asserts which records have been added to the CNIndexDB since starting the
   * {@link ChangeNumberIndexer} thread.
   */
  private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception
  private void assertExternalChangelogContent(ReplicatedUpdateMsg... msgs)
      throws Exception
  {
    if (msgs.length == 0)
    {
      verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
      return;
    }
    final ArgumentCaptor<ChangeNumberIndexRecord> arg =
        ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
    verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
@@ -410,7 +447,7 @@
      final ChangeNumberIndexRecord record = allValues.get(i);
      if (previousCookie.isEmpty())
      {
        // ugly hack to go round strange legacy code
        // ugly hack to go round strange legacy code @see OPENDJ-67
        previousCookie.replace(record.getBaseDN(), new ServerState());
      }
      // check content in order
opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
@@ -47,6 +47,12 @@
      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
      false, false, false, false);
  AttributeType COMMON_NAME = new AttributeType(
      "( 2.5.4.3 NAME ( 'cn' 'commonName' ) SUP name X-ORIGIN 'RFC 4519' )",
      "commonName", Arrays.asList("cn", "commonName"), "2.5.4.3",
      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
      false, false, false, false);
  AttributeType ORGANIZATION_NAME = new AttributeType(
      "( 2.5.4.10 NAME ( 'o' 'organizationName' ) SUP name X-ORIGIN 'RFC 4519' )",
      "organizationName", Arrays.asList("o", "organizationName"), "2.5.4.10",
@@ -67,7 +73,7 @@
      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
      false, false, false, false);
  AttributeType[] ALL = { OBJECT_CLASS, ORGANIZATION_NAME,
  AttributeType[] ALL = { OBJECT_CLASS, COMMON_NAME, ORGANIZATION_NAME,
    ORGANIZATIONAL_UNIT_NAME, DOMAIN_COMPONENT, };
}