mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
09.24.2009 f5efc93e858375f6b8e44eb1c04918372ae93f1b
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -132,6 +132,7 @@
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.opends.server.util.LDIFWriter;
import org.opends.server.util.ServerConstants;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -171,6 +172,8 @@
  List<Control> NO_CONTROL = null;
  private int brokerSessionTimeout = 5000;
  private int maxWindow = 100;
  /**
   * Set up the environment for performing the tests in this Class.
   * Replication
@@ -200,7 +203,7 @@
    ReplServerFakeConfiguration conf1 =
      new ReplServerFakeConfiguration(
          replicationServerPort, "ExternalChangeLogTestDb",
          0, 71, 0, 100, null);
          0, 71, 0, maxWindow, null);
    replicationServer = new ReplicationServer(conf1);;
    debugInfo("configure", "ReplicationServer created"+replicationServer);
@@ -250,7 +253,7 @@
    ts = ECLCompatWriteReadAllOps(5);replicationServer.clearDb();
    ECLIncludeAttributes();replicationServer.clearDb();
    ChangeTimeHeartbeatTest();replicationServer.clearDb();
  }
@@ -3313,8 +3316,11 @@
      // search on 'cn=changelog'
      LinkedHashSet<String> attributes = new LinkedHashSet<String>();
      attributes.add("*");
      attributes.add("+");
      if (expectedFirst>0)
        attributes.add("firstchangenumber");
      attributes.add("lastchangenumber");
      attributes.add("changelog");
      attributes.add("lastExternalChangelogCookie");
      debugInfo(tn, " Search: rootDSE");
      InternalSearchOperation searchOp =
@@ -3344,8 +3350,9 @@
          ldifWriter.writeEntry(resultEntry);
          if (eclEnabled)
          {
            checkValue(resultEntry,"firstchangenumber",
              String.valueOf(expectedFirst));
            if (expectedFirst>0)
              checkValue(resultEntry,"firstchangenumber",
                String.valueOf(expectedFirst));
            checkValue(resultEntry,"lastchangenumber",
              String.valueOf(expectedLast));
            checkValue(resultEntry,"changelog",
@@ -3353,7 +3360,8 @@
          }
          else
          {
            assertEquals(getAttributeValue(resultEntry, "firstchangenumber"),
            if (expectedFirst>0)
              assertEquals(getAttributeValue(resultEntry, "firstchangenumber"),
                null);
            assertEquals(getAttributeValue(resultEntry, "lastchangenumber"),
                null);
@@ -3419,9 +3427,10 @@
    String user1entryUUID = "11111111-1112-1113-1114-111111111115";
    try
    {
      // The replication changelog is empty
      ReplicationServerDomain rsdtest =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
      // The replication changelog is empty
      long count = rsdtest.getEligibleCount(
          new ServerState(),
          new ChangeNumber(TimeThread.getTime(), 1, 1201));
@@ -3430,10 +3439,10 @@
      // Creates broker on o=test
      ReplicationBroker server01 = openReplicationSession(
          DN.decode(TEST_ROOT_DN_STRING),  1201,
          100, replicationServerPort,
          1000, replicationServerPort,
          brokerSessionTimeout, true);
      // Publish 1 message
      // Publish one first message
      ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), 1, 1201);
      DeleteMsg delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1,
@@ -3442,12 +3451,13 @@
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      sleep(300);
      // From begin to now : 1 change
      count = rsdtest.getEligibleCount(
          new ServerState(),
          new ChangeNumber(TimeThread.getTime(), 1, 1201));
      assertEquals(count, 1);
      // Publish 1 message
      // Publish one second message
      ChangeNumber cn2 = new ChangeNumber(TimeThread.getTime(), 2, 1201);
      delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn2,
@@ -3456,29 +3466,36 @@
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      sleep(300);
      // From begin to now : 2 changes
      count = rsdtest.getEligibleCount(
          new ServerState(),
          new ChangeNumber(TimeThread.getTime(), 1, 1201));
      assertEquals(count, 2);
      // From begin to first change (inclusive) : 1 change = cn1
      count = rsdtest.getEligibleCount(
          new ServerState(),  cn1);
      assertEquals(count, 1);
      ServerState ss = new ServerState();
      ss.update(cn1);
      // From state/cn1(exclusive) to cn1 (inclusive) : 0 change
      count = rsdtest.getEligibleCount(ss, cn1);
      assertEquals(count, 0);
      // From state/cn1(exclusive) to cn2 (inclusive) : 1 change = cn2
      count = rsdtest.getEligibleCount(ss, cn2);
      assertEquals(count, 1);
      ss.update(cn2);
      // From state/cn2(exclusive) to now (inclusive) : 0 change
      count = rsdtest.getEligibleCount(ss,
          new ChangeNumber(TimeThread.getTime(), 4, 1201));
      assertEquals(count, 0);
      // Publish 1 message
      // Publish one third message
      ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), 3, 1201);
      delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn3,
@@ -3488,11 +3505,129 @@
      sleep(300);
      ss.update(cn2);
      // From state/cn2(exclusive) to now : 1 change = cn3
      count = rsdtest.getEligibleCount(ss,
          new ChangeNumber(TimeThread.getTime(), 4, 1201));
      assertEquals(count, 1);
      boolean perfs=false;
      if (perfs)
      {
      // number of msgs used by the test
      int maxMsg = 999999;
      // We need an RS configured with a window size bigger than the number
      // of msg used by the test.
      assertTrue(maxMsg<maxWindow);
      debugInfo(tn, "Perf test in compat mode - will generate " + maxMsg + " msgs.");
      for (int i=4; i<=maxMsg; i++)
      {
        ChangeNumber cnx = new ChangeNumber(TimeThread.getTime(), i, 1201);
        delMsg =
          new DeleteMsg("uid="+tn+i+"," + TEST_ROOT_DN_STRING, cnx,
              user1entryUUID);
        server01.publish(delMsg);
      }
      sleep(1000);
      debugInfo(tn, "Perfs test in compat - search lastChangeNumber");
      ArrayList<String> excludedDomains =
        MultimasterReplication.getECLDisabledDomains();
      if (!excludedDomains.contains(
          ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
        excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
      ECLWorkflowElement eclwe = (ECLWorkflowElement)
      DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
      ReplicationServer rs = eclwe.getReplicationServer();
      rs.disableEligibility(excludedDomains);
      long t1 = TimeThread.getTime();
      int[] limitss = replicationServer.getECLDraftCNLimits(
          replicationServer.getEligibleCN(), excludedDomains);
      assertEquals(limitss[1], maxMsg);
      long t2 = TimeThread.getTime();
      debugInfo(tn, "Perfs - " + maxMsg + " counted in (ms):" + (t2 - t1));
      try
      {
        // search on 'cn=changelog'
        LinkedHashSet<String> attributes = new LinkedHashSet<String>();
        attributes.add("+");
        attributes.add("*");
        String filter = "(changenumber>="+maxMsg+")";
        debugInfo(tn, " Search: " + filter);
        InternalSearchOperation searchOp =
          connection.processSearch(
              ByteString.valueOf("cn=changelog"),
              SearchScope.WHOLE_SUBTREE,
              DereferencePolicy.NEVER_DEREF_ALIASES,
              0, // Size limit
              0, // Time limit
              false, // Types only
              LDAPFilter.decode(filter),
              attributes,
              NO_CONTROL,
              null);
        waitOpResult(searchOp, ResultCode.SUCCESS);
        long t3 = TimeThread.getTime();
        assertEquals(searchOp.getSearchEntries().size(), 1);
        debugInfo(tn, "Perfs - last change searched in (ms):" + (t3 - t2));
        filter = "(changenumber>="+maxMsg+")";
        debugInfo(tn, " Search: " + filter);
        searchOp =
          connection.processSearch(
              ByteString.valueOf("cn=changelog"),
              SearchScope.WHOLE_SUBTREE,
              DereferencePolicy.NEVER_DEREF_ALIASES,
              0, // Size limit
              0, // Time limit
              false, // Types only
              LDAPFilter.decode(filter),
              attributes,
              NO_CONTROL,
              null);
        waitOpResult(searchOp, ResultCode.SUCCESS);
        long t4 = TimeThread.getTime();
        assertEquals(searchOp.getSearchEntries().size(), 1);
        debugInfo(tn, "Perfs - last change searched in (ms):" + (t4 - t3));
        filter = "(changenumber>="+(maxMsg-2)+")";
        debugInfo(tn, " Search: " + filter);
        searchOp =
          connection.processSearch(
              ByteString.valueOf("cn=changelog"),
              SearchScope.WHOLE_SUBTREE,
              DereferencePolicy.NEVER_DEREF_ALIASES,
              0, // Size limit
              0, // Time limit
              false, // Types only
              LDAPFilter.decode(filter),
              attributes,
              NO_CONTROL,
              null);
        waitOpResult(searchOp, ResultCode.SUCCESS);
        long t5 = TimeThread.getTime();
        assertEquals(searchOp.getSearchEntries().size(), 3);
        debugInfo(tn, "Perfs - last 3 changes searched in (ms):" + (t5 - t4));
        if (searchOp.getSearchEntries() != null)
        {
          int i=0;
          for (SearchResultEntry resultEntry : searchOp.getSearchEntries())
          {
            i++;
            debugInfo(tn, "Result entry returned:" + resultEntry.toLDIFString());
          }
        }
      }
      catch(Exception e)
      {
        fail("Ending test "+tn+" with exception:\n"
            +  stackTraceToSingleLineString(e));
      }
      }
      server01.stop();
    }
@@ -3539,6 +3674,10 @@
      ExternalChangelogDomainFakeCfg eclCfg = 
        new ExternalChangelogDomainFakeCfg(true, eclInclude);
      domainConf.setExternalChangelogDomain(eclCfg);
      // Set a Changetime heartbeat interval low enough (less than default
      // value that is 1000 ms) for the test to be sure to consider all changes
      // as eligible.
      domainConf.setChangetimeHeartbeatInterval(10);
      domain2 = MultimasterReplication.createNewDomain(domainConf);
      domain2.start();
@@ -3552,6 +3691,10 @@
      eclCfg = 
        new ExternalChangelogDomainFakeCfg(true, eclInclude);
      domainConf.setExternalChangelogDomain(eclCfg);
      // Set a Changetime heartbeat interval low enough (less than default
      // value that is 1000 ms) for the test to be sure to consider all changes
      // as eligible.
      domainConf.setChangetimeHeartbeatInterval(10);
      domain3 = MultimasterReplication.createNewDomain(domainConf);
      domain3.start();
@@ -3562,6 +3705,10 @@
      eclCfg = 
        new ExternalChangelogDomainFakeCfg(true, eclInclude);
      domainConf.setExternalChangelogDomain(eclCfg);
      // Set a Changetime heartbeat interval low enough (less than default
      // value that is 1000 ms) for the test to be sure to consider all changes
      // as eligible.
      domainConf.setChangetimeHeartbeatInterval(10);
      domain21 = MultimasterReplication.createNewDomain(domainConf);
      domain21.start();