mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.18.2013 ff99696311668f339200080a826bbc6efc708291
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -222,9 +222,10 @@
  public static void deleteDomain(DN dn)
  {
    LDAPReplicationDomain domain = domains.remove(dn);
    if (domain != null)
    {
      domain.delete();
    }
    // No replay threads running if no replication need
    if (domains.size() == 0) {
@@ -257,8 +258,7 @@
    //  Create the list of domains that are already defined.
    for (String name : configuration.listReplicationDomains())
    {
      ReplicationDomainCfg domain = configuration.getReplicationDomain(name);
      createNewDomain(domain);
      createNewDomain(configuration.getReplicationDomain(name));
    }
    /*
@@ -458,9 +458,7 @@
      modifyOperation.setAttachment(EntryHistorical.HISTORICAL,
          historicalInformation);
    }
    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
    historicalInformation.setHistoricalAttrToOperation(modifyOperation);
    if (modifyOperation.getModifications().isEmpty())
@@ -509,7 +507,6 @@
      modifyDNOperation.setAttachment(EntryHistorical.HISTORICAL,
          historicalInformation);
    }
    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
    // Add to the operation the historical attribute : "dn:changeNumber:moddn"
@@ -798,18 +795,20 @@
  }
  /**
   * Returns whether the provided baseDN is disabled for the external changelog.
   * Returns whether the provided baseDN represents a replication domain enabled
   * for the external changelog.
   *
   * @param baseDN
   *          the domain to check
   * @return true if the provided baseDN is disabled for the external changelog,
   *         false otherwise
   *          the replication domain to check
   * @return true if the provided baseDN is enabled for the external changelog,
   *         false if the provided baseDN is disabled for the external changelog
   *         or unknown to multimaster replication.
   */
  public static boolean isECLDisabledDomain(DN baseDN)
  public static boolean isECLEnabledDomain(DN baseDN)
  {
    for (LDAPReplicationDomain domain : domains.values())
    {
      if (!domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))
      if (domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))
      {
        return true;
      }
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -176,7 +176,7 @@
   */
  public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
  {
    if (MultimasterReplication.isECLDisabledDomain(baseDN))
    if (!isECLEnabledDomain(baseDN))
    {
      return;
    }
@@ -198,7 +198,7 @@
  public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
      throws ChangelogException
  {
    if (MultimasterReplication.isECLDisabledDomain(baseDN))
    if (!isECLEnabledDomain(baseDN))
    {
      return;
    }
@@ -211,6 +211,23 @@
  }
  /**
   * Returns whether the provided baseDN represents a replication domain enabled
   * for the external changelog.
   * <p>
   * This method is a test seam that break the dependency on a static method.
   *
   * @param baseDN
   *          the replication domain to check
   * @return true if the provided baseDN is enabled for the external changelog,
   *         false if the provided baseDN is disabled for the external changelog
   *         or unknown to multimaster replication.
   */
  protected boolean isECLEnabledDomain(DN baseDN)
  {
    return MultimasterReplication.isECLEnabledDomain(baseDN);
  }
  /**
   * Returns the last time each serverId was seen alive for the specified
   * replication domain.
   *
@@ -288,6 +305,11 @@
        : changelogState.getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
      if (!isECLEnabledDomain(baseDN))
      {
        continue;
      }
      for (Integer serverId : entry.getValue())
      {
        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -93,8 +93,10 @@
  }
  private static DN BASE_DN;
  private static DN ADMIN_DATA_DN;
  private static final int serverId1 = 101;
  private static final int serverId2 = 102;
  private static final int serverId3 = 103;
  private ChangelogDB changelogDB;
  private ChangeNumberIndexDB cnIndexDB;
@@ -102,7 +104,7 @@
  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors =
      new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
  private ChangelogState initialState;
  private ChangeNumberIndexer indexer;
  private ChangeNumberIndexer cnIndexer;
  private MultiDomainServerState initialCookie;
  @BeforeClass
@@ -110,6 +112,7 @@
  {
    TestCaseUtils.startFakeServer();
    BASE_DN = DN.decode("dc=example,dc=com");
    ADMIN_DATA_DN = DN.decode("cn=admin data");
  }
  @AfterClass
@@ -134,7 +137,7 @@
  @AfterMethod
  public void tearDown() throws Exception
  {
    stopIndexer();
    stopCNIndexer();
  }
  private static final String EMPTY_DB_NO_DS = "emptyDBNoDS";
@@ -142,19 +145,19 @@
  @Test
  public void emptyDBNoDS() throws Exception
  {
    startIndexer();
    verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
    startCNIndexer();
    assertExternalChangelogContent();
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDS() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    publishUpdateMsg(msg1);
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -163,11 +166,11 @@
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    addReplica(BASE_DN, serverId1);
    setDBInitialRecords(msg1);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
    publishUpdateMsg(msg2);
    assertAddedRecords(msg2);
    assertExternalChangelogContent(msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -175,12 +178,12 @@
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg2, msg1);
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -191,20 +194,20 @@
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    setDBInitialRecords(msg1, msg2);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
    publishUpdateMsg(msg3, msg4);
    assertAddedRecords(msg3);
    assertExternalChangelogContent(msg3);
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN, serverId1, 5);
    publishUpdateMsg(msg5);
    assertAddedRecords(msg3);
    assertExternalChangelogContent(msg3);
    final ReplicatedUpdateMsg msg6 = msg(BASE_DN, serverId2, 6);
    publishUpdateMsg(msg6);
    assertAddedRecords(msg3, msg4, msg5);
    assertExternalChangelogContent(msg3, msg4, msg5);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -212,7 +215,7 @@
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN, serverId2, 1);
    final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN, serverId2);
@@ -220,14 +223,34 @@
    final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
    // simulate no messages received during some time for replica 2
    publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
    assertAddedRecords(msg1Sid2, msg2Sid1);
    assertExternalChangelogContent(msg1Sid2, msg2Sid1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception
  {
    addReplica(ADMIN_DATA_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    addReplica(BASE_DN, serverId3);
    startCNIndexer();
    // cn=admin data will does not participate in the external changelog
    // so it cannot add to it
    final ReplicatedUpdateMsg msg1 = msg(ADMIN_DATA_DN, serverId1, 1);
    publishUpdateMsg(msg1);
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId3, 3);
    publishUpdateMsg(msg2, msg3);
    assertExternalChangelogContent(msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
  {
    addReplica(BASE_DN, serverId1);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    publishUpdateMsg(msg1);
@@ -235,11 +258,11 @@
    addReplica(BASE_DN, serverId2);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg2);
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId1, 3);
    publishUpdateMsg(msg3);
    assertAddedRecords(msg1, msg2);
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -247,15 +270,15 @@
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg1, msg2);
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
    sendHeartbeat(BASE_DN, serverId1, 3);
    assertAddedRecords(msg1, msg2);
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
@@ -263,22 +286,22 @@
  {
    addReplica(BASE_DN, serverId1);
    addReplica(BASE_DN, serverId2);
    startIndexer();
    startCNIndexer();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
    publishUpdateMsg(msg1, msg2);
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
    replicaOffline(BASE_DN, serverId2, 3);
    // MCP cannot move forward since no new updates from serverId1
    assertAddedRecords(msg1);
    assertExternalChangelogContent(msg1);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
    publishUpdateMsg(msg4);
    // MCP moved forward after receiving update from serverId1
    // (last replica in the domain)
    assertAddedRecords(msg1, msg2, msg4);
    assertExternalChangelogContent(msg1, msg2, msg4);
  }
@@ -292,16 +315,23 @@
    initialState.addServerIdToDomain(serverId, baseDN);
  }
  private void startIndexer()
  private void startCNIndexer()
  {
    indexer = new ChangeNumberIndexer(changelogDB, initialState);
    indexer.start();
    waitForWaitingState(indexer);
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
    {
      @Override
      protected boolean isECLEnabledDomain(DN baseDN)
      {
        return BASE_DN.equals(baseDN);
      }
    };
    cnIndexer.start();
    waitForWaitingState(cnIndexer);
  }
  private void stopIndexer()
  private void stopCNIndexer()
  {
    indexer.initiateShutdown();
    cnIndexer.initiateShutdown();
  }
  private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time)
@@ -356,22 +386,22 @@
    {
      if (!msg.isEmptyCursor())
      {
        indexer.publishUpdateMsg(msg.getBaseDN(), msg);
        cnIndexer.publishUpdateMsg(msg.getBaseDN(), msg);
      }
    }
    waitForWaitingState(indexer);
    waitForWaitingState(cnIndexer);
  }
  private void sendHeartbeat(DN baseDN, int serverId, int time) throws Exception
  {
    indexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
    waitForWaitingState(indexer);
    cnIndexer.publishHeartbeat(baseDN, new CSN(time, 0, serverId));
    waitForWaitingState(cnIndexer);
  }
  private void replicaOffline(DN baseDN, int serverId, int time) throws Exception
  {
    indexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
    waitForWaitingState(indexer);
    cnIndexer.replicaOffline(baseDN, new CSN(time, 0, serverId));
    waitForWaitingState(cnIndexer);
  }
  private void waitForWaitingState(final Thread t)
@@ -391,8 +421,15 @@
   * Asserts which records have been added to the CNIndexDB since starting the
   * {@link ChangeNumberIndexer} thread.
   */
  private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception
  private void assertExternalChangelogContent(ReplicatedUpdateMsg... msgs)
      throws Exception
  {
    if (msgs.length == 0)
    {
      verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
      return;
    }
    final ArgumentCaptor<ChangeNumberIndexRecord> arg =
        ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
    verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
@@ -410,7 +447,7 @@
      final ChangeNumberIndexRecord record = allValues.get(i);
      if (previousCookie.isEmpty())
      {
        // ugly hack to go round strange legacy code
        // ugly hack to go round strange legacy code @see OPENDJ-67
        previousCookie.replace(record.getBaseDN(), new ServerState());
      }
      // check content in order
opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
@@ -47,6 +47,12 @@
      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
      false, false, false, false);
  AttributeType COMMON_NAME = new AttributeType(
      "( 2.5.4.3 NAME ( 'cn' 'commonName' ) SUP name X-ORIGIN 'RFC 4519' )",
      "commonName", Arrays.asList("cn", "commonName"), "2.5.4.3",
      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
      false, false, false, false);
  AttributeType ORGANIZATION_NAME = new AttributeType(
      "( 2.5.4.10 NAME ( 'o' 'organizationName' ) SUP name X-ORIGIN 'RFC 4519' )",
      "organizationName", Arrays.asList("o", "organizationName"), "2.5.4.10",
@@ -67,7 +73,7 @@
      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
      false, false, false, false);
  AttributeType[] ALL = { OBJECT_CLASS, ORGANIZATION_NAME,
  AttributeType[] ALL = { OBJECT_CLASS, COMMON_NAME, ORGANIZATION_NAME,
    ORGANIZATIONAL_UNIT_NAME, DOMAIN_COMPONENT, };
}