mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
09.24.2009 f5efc93e858375f6b8e44eb1c04918372ae93f1b
Fix #4361 ECL - draft mode: temporary fake lastChangeNumber after thousands of updates
7 files modified
338 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/DbHandler.java 37 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 16 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 96 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 1 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java 163 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java 16 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -289,6 +289,43 @@
  }
  /**
   * Return the number of changes between 2 provided change numbers.
   * @param from The lower (older) change number.
   * @param to   The upper (newer) change number.
   * @return The computed number of changes.
   */
  public int getCount(ChangeNumber from, ChangeNumber to)
  {
    int count = 0;
    flush();
    ReplServerDBCursor cursor = null;
    try
    {
      try
      {
        cursor = db.openReadCursor(from);
      }
      catch(Exception e)
      {
        return 0;
      }
      ChangeNumber curr = null;
      while ((curr = cursor.nextChangeNumber())!=null)
      {
        if (curr.newer(to))
          break;
        count++;
      }
    }
    finally
    {
      if (cursor != null)
        cursor.abort();
    }
    return count;
  }
  /**
   * Removes the provided number of messages from the beginning of the msgQueue.
   *
   * @param number the number of changes to be removed.
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -320,8 +320,8 @@
  /**
   * Clear the changes from this DB (from both memory cache and DB storage)
   * for the provided serviceID.
   * @param serviceIDToClear The serviceID for which we want to remove the
   *         all records from the DraftCNDb.
   * @param serviceIDToClear The serviceID for which we want to remove
   *         all records from the DraftCNDb - null means all.
   * @throws DatabaseException When an exception occurs while removing the
   * changes from the DB.
   * @throws Exception When an exception occurs while accessing a resource
@@ -339,6 +339,7 @@
    boolean finished = false;
    boolean done = false;
    ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
    // In case of deadlock detection by the Database, this thread can
    // by aborted by a DeadlockException. This is a transient error and
    // the transaction should be attempted again.
@@ -385,8 +386,8 @@
            {
              // let's get the eligible part of the domain
              ServerState startSS = domain.getStartState();
              ServerState endSS   = domain.getEligibleState(
                  replicationServer.getEligibleCN());
              ServerState endSS= domain.getEligibleState(crossDomainEligibleCN);
              ChangeNumber fcn = startSS.getMaxChangeNumber(cn.getServerId());
              ChangeNumber lcn = endSS.getMaxChangeNumber(cn.getServerId());
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -568,7 +568,6 @@
  throws DirectoryException
  {
    String crossDomainStartState;
    try
    {
      draftCompat = true;
@@ -611,7 +610,7 @@
          // startDraftCN (from the request filter) is present in the draftCnDb
          // Get an iterator to traverse the draftCNDb
          draftCNDbIter =
            draftCNDb.generateIterator(draftCNDb.getFirstKey());
            draftCNDb.generateIterator(startDraftCN);
        }
        else
        {
@@ -739,7 +738,10 @@
            newDomainCtxt.startState = startStates.remove(rsd.getBaseDn());
            if ((providedCookie==null)||(providedCookie.length()==0)
                ||allowUnknownDomains)
            {
              if (newDomainCtxt.startState == null)
              newDomainCtxt.startState = new ServerState();
            }
            else
              if (newDomainCtxt.startState == null)
                throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
@@ -1121,12 +1123,6 @@
    try
    {
      // Search / no DraftCN / not persistent
      // -----------------------------------
      //  init: all domain are candidate
      //    get one msg from each
      //       no (null) msg returned: should not happen since we go to a state
      //       that is computed/expected
      //  getMessage:
      //    get the oldest msg:
      //    after:
@@ -1137,7 +1133,6 @@
      //       get one msg from that domain
      //       no (null) msg returned: should not happen since we go to a state
      //       that is computed/expected
      //  step 2: send DoneMsg
      // Persistent:
      // ----------
@@ -1245,12 +1240,14 @@
                    {
                      // let's traverse the DraftCNdb searching for the change
                      // found in the changelogDb.
                      if (debugEnabled())
                      TRACER.debugInfo("getNextECLUpdate generating draftCN "
                          + " will skip " + cnFromDraftCNDb
                          + " and read next change from the DraftCNDb.");
                      isEndOfDraftCNReached = (draftCNDbIter.next()==false);
                      if (debugEnabled())
                      TRACER.debugInfo("getNextECLUpdate generating draftCN "
                          + " has skiped to "
                          + " sn=" + draftCNDbIter.getDraftCN()
@@ -1290,6 +1287,7 @@
                    // the change from the changelogDb is older
                    // it should have been stored lately
                    // let's continue to traverse the changelogdb
                    if (debugEnabled())
                    TRACER.debugInfo("getNextECLUpdate: will skip "
                        + cnFromChangelogDb
                        + " and read next from the regular changelog.");
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1424,6 +1424,25 @@
  }
  /**
  * Count the number of changes in the replication changelog for the provided
  * serverID, between 2 provided changenumbers.
  * @param serverId Identifier of the server for which the iterator is created.
  * @param from lower limit changenumber.
  * @param to   upper limit changenumber.
  * @return the number of changes.
  *
  */
  public int getCount(int serverId,
      ChangeNumber from, ChangeNumber to)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
    if (handler == null)
      return 0;
    return handler.getCount(from, to);
  }
  /**
   * Creates and returns an iterator.
   * When the iterator is not used anymore, the caller MUST call the
   * ReplicationIterator.releaseCursor() method to free the resources
@@ -3358,81 +3377,18 @@
  public long getEligibleCount(ServerState startState, ChangeNumber endCN)
  {
    long res = 0;
    ReplicationIterator ri=null;
    // Parses the dbState of the domain , server by server
    ServerState dbState = this.getDbServerState();
    Iterator<Integer> it = dbState.iterator();
    while (it.hasNext())
    Iterator<Integer> serverIDIterator = dbState.iterator();
    while (serverIDIterator.hasNext())
    {
      // for each server
      int sid = it.next();
      DbHandler h = sourceDbHandlers.get(sid);
      try
      {
        // Set on the change related to the startState
      // process one sid
      int sid = serverIDIterator.next();
        ChangeNumber startCN = null;
        try
        {
          ri = h.generateIterator(startState.getMaxChangeNumber(sid));
          if (ri.next()==true)
          {
            startCN = ri.getChange().getChangeNumber();
          }
        }
        catch(Exception e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
          startCN = null;
        }
        finally
        {
          if (ri!=null)
          {
            ri.releaseCursor();
            ri = null;
          }
        }
        if (startCN != null)
        {
          // Set on the change related to the endCN
          ChangeNumber upperCN = null;
          try
          {
            // Build a changenumber for this very server, with the timestamp
            // of the endCN
            ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid);
            ri = h.generateIterator(f);
            if (ri.next()==true)
            {
              upperCN = ri.getChange().getChangeNumber();
            }
          }
          catch(Exception e)
          {
            upperCN = h.getLastChange();
          }
          finally
          {
            if (ri!=null)
            {
              ri.releaseCursor();
              ri = null;
            }
          }
          long diff = upperCN.getSeqnum() - startCN.getSeqnum() + 1;
          res += diff;
        }
        // TODO:ECL We should compute if changenumber.seqnum has turned !
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      if (startState.getMaxChangeNumber(sid) != null)
        startCN = startState.getMaxChangeNumber(sid);
      res += getCount(sid, startCN, endCN);
    }
    return res;
  }
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2564,6 +2564,7 @@
    }
    else
    {
      if (debugEnabled())
      TRACER.debugInfo(this +
          " is not configured to send CN heartbeat interval");
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -132,6 +132,7 @@
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.opends.server.util.LDIFWriter;
import org.opends.server.util.ServerConstants;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -171,6 +172,8 @@
  List<Control> NO_CONTROL = null;
  private int brokerSessionTimeout = 5000;
  private int maxWindow = 100;
  /**
   * Set up the environment for performing the tests in this Class.
   * Replication
@@ -200,7 +203,7 @@
    ReplServerFakeConfiguration conf1 =
      new ReplServerFakeConfiguration(
          replicationServerPort, "ExternalChangeLogTestDb",
          0, 71, 0, 100, null);
          0, 71, 0, maxWindow, null);
    replicationServer = new ReplicationServer(conf1);;
    debugInfo("configure", "ReplicationServer created"+replicationServer);
@@ -3313,8 +3316,11 @@
      // search on 'cn=changelog'
      LinkedHashSet<String> attributes = new LinkedHashSet<String>();
      attributes.add("*");
      attributes.add("+");
      if (expectedFirst>0)
        attributes.add("firstchangenumber");
      attributes.add("lastchangenumber");
      attributes.add("changelog");
      attributes.add("lastExternalChangelogCookie");
      debugInfo(tn, " Search: rootDSE");
      InternalSearchOperation searchOp =
@@ -3344,6 +3350,7 @@
          ldifWriter.writeEntry(resultEntry);
          if (eclEnabled)
          {
            if (expectedFirst>0)
            checkValue(resultEntry,"firstchangenumber",
              String.valueOf(expectedFirst));
            checkValue(resultEntry,"lastchangenumber",
@@ -3353,6 +3360,7 @@
          }
          else
          {
            if (expectedFirst>0)
            assertEquals(getAttributeValue(resultEntry, "firstchangenumber"),
                null);
            assertEquals(getAttributeValue(resultEntry, "lastchangenumber"),
@@ -3419,9 +3427,10 @@
    String user1entryUUID = "11111111-1112-1113-1114-111111111115";
    try
    {
      // The replication changelog is empty
      ReplicationServerDomain rsdtest =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
      // The replication changelog is empty
      long count = rsdtest.getEligibleCount(
          new ServerState(),
          new ChangeNumber(TimeThread.getTime(), 1, 1201));
@@ -3430,10 +3439,10 @@
      // Creates broker on o=test
      ReplicationBroker server01 = openReplicationSession(
          DN.decode(TEST_ROOT_DN_STRING),  1201,
          100, replicationServerPort,
          1000, replicationServerPort,
          brokerSessionTimeout, true);
      // Publish 1 message
      // Publish one first message
      ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), 1, 1201);
      DeleteMsg delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1,
@@ -3442,12 +3451,13 @@
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      sleep(300);
      // From begin to now : 1 change
      count = rsdtest.getEligibleCount(
          new ServerState(),
          new ChangeNumber(TimeThread.getTime(), 1, 1201));
      assertEquals(count, 1);
      // Publish 1 message
      // Publish one second message
      ChangeNumber cn2 = new ChangeNumber(TimeThread.getTime(), 2, 1201);
      delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn2,
@@ -3456,29 +3466,36 @@
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      sleep(300);
      // From begin to now : 2 changes
      count = rsdtest.getEligibleCount(
          new ServerState(),
          new ChangeNumber(TimeThread.getTime(), 1, 1201));
      assertEquals(count, 2);
      // From begin to first change (inclusive) : 1 change = cn1
      count = rsdtest.getEligibleCount(
          new ServerState(),  cn1);
      assertEquals(count, 1);
      ServerState ss = new ServerState();
      ss.update(cn1);
      // From state/cn1(exclusive) to cn1 (inclusive) : 0 change
      count = rsdtest.getEligibleCount(ss, cn1);
      assertEquals(count, 0);
      // From state/cn1(exclusive) to cn2 (inclusive) : 1 change = cn2
      count = rsdtest.getEligibleCount(ss, cn2);
      assertEquals(count, 1);
      ss.update(cn2);
      // From state/cn2(exclusive) to now (inclusive) : 0 change
      count = rsdtest.getEligibleCount(ss,
          new ChangeNumber(TimeThread.getTime(), 4, 1201));
      assertEquals(count, 0);
      // Publish 1 message
      // Publish one third message
      ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), 3, 1201);
      delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn3,
@@ -3488,11 +3505,129 @@
      sleep(300);
      ss.update(cn2);
      // From state/cn2(exclusive) to now : 1 change = cn3
      count = rsdtest.getEligibleCount(ss,
          new ChangeNumber(TimeThread.getTime(), 4, 1201));
      assertEquals(count, 1);
      boolean perfs=false;
      if (perfs)
      {
      // number of msgs used by the test
      int maxMsg = 999999;
      // We need an RS configured with a window size bigger than the number
      // of msg used by the test.
      assertTrue(maxMsg<maxWindow);
      debugInfo(tn, "Perf test in compat mode - will generate " + maxMsg + " msgs.");
      for (int i=4; i<=maxMsg; i++)
      {
        ChangeNumber cnx = new ChangeNumber(TimeThread.getTime(), i, 1201);
        delMsg =
          new DeleteMsg("uid="+tn+i+"," + TEST_ROOT_DN_STRING, cnx,
              user1entryUUID);
        server01.publish(delMsg);
      }
      sleep(1000);
      debugInfo(tn, "Perfs test in compat - search lastChangeNumber");
      ArrayList<String> excludedDomains =
        MultimasterReplication.getECLDisabledDomains();
      if (!excludedDomains.contains(
          ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
        excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
      ECLWorkflowElement eclwe = (ECLWorkflowElement)
      DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
      ReplicationServer rs = eclwe.getReplicationServer();
      rs.disableEligibility(excludedDomains);
      long t1 = TimeThread.getTime();
      int[] limitss = replicationServer.getECLDraftCNLimits(
          replicationServer.getEligibleCN(), excludedDomains);
      assertEquals(limitss[1], maxMsg);
      long t2 = TimeThread.getTime();
      debugInfo(tn, "Perfs - " + maxMsg + " counted in (ms):" + (t2 - t1));
      try
      {
        // search on 'cn=changelog'
        LinkedHashSet<String> attributes = new LinkedHashSet<String>();
        attributes.add("+");
        attributes.add("*");
        String filter = "(changenumber>="+maxMsg+")";
        debugInfo(tn, " Search: " + filter);
        InternalSearchOperation searchOp =
          connection.processSearch(
              ByteString.valueOf("cn=changelog"),
              SearchScope.WHOLE_SUBTREE,
              DereferencePolicy.NEVER_DEREF_ALIASES,
              0, // Size limit
              0, // Time limit
              false, // Types only
              LDAPFilter.decode(filter),
              attributes,
              NO_CONTROL,
              null);
        waitOpResult(searchOp, ResultCode.SUCCESS);
        long t3 = TimeThread.getTime();
        assertEquals(searchOp.getSearchEntries().size(), 1);
        debugInfo(tn, "Perfs - last change searched in (ms):" + (t3 - t2));
        filter = "(changenumber>="+maxMsg+")";
        debugInfo(tn, " Search: " + filter);
        searchOp =
          connection.processSearch(
              ByteString.valueOf("cn=changelog"),
              SearchScope.WHOLE_SUBTREE,
              DereferencePolicy.NEVER_DEREF_ALIASES,
              0, // Size limit
              0, // Time limit
              false, // Types only
              LDAPFilter.decode(filter),
              attributes,
              NO_CONTROL,
              null);
        waitOpResult(searchOp, ResultCode.SUCCESS);
        long t4 = TimeThread.getTime();
        assertEquals(searchOp.getSearchEntries().size(), 1);
        debugInfo(tn, "Perfs - last change searched in (ms):" + (t4 - t3));
        filter = "(changenumber>="+(maxMsg-2)+")";
        debugInfo(tn, " Search: " + filter);
        searchOp =
          connection.processSearch(
              ByteString.valueOf("cn=changelog"),
              SearchScope.WHOLE_SUBTREE,
              DereferencePolicy.NEVER_DEREF_ALIASES,
              0, // Size limit
              0, // Time limit
              false, // Types only
              LDAPFilter.decode(filter),
              attributes,
              NO_CONTROL,
              null);
        waitOpResult(searchOp, ResultCode.SUCCESS);
        long t5 = TimeThread.getTime();
        assertEquals(searchOp.getSearchEntries().size(), 3);
        debugInfo(tn, "Perfs - last 3 changes searched in (ms):" + (t5 - t4));
        if (searchOp.getSearchEntries() != null)
        {
          int i=0;
          for (SearchResultEntry resultEntry : searchOp.getSearchEntries())
          {
            i++;
            debugInfo(tn, "Result entry returned:" + resultEntry.toLDIFString());
          }
        }
      }
      catch(Exception e)
      {
        fail("Ending test "+tn+" with exception:\n"
            +  stackTraceToSingleLineString(e));
      }
      }
      server01.stop();
    }
@@ -3539,6 +3674,10 @@
      ExternalChangelogDomainFakeCfg eclCfg = 
        new ExternalChangelogDomainFakeCfg(true, eclInclude);
      domainConf.setExternalChangelogDomain(eclCfg);
      // Set a Changetime heartbeat interval low enough (less than default
      // value that is 1000 ms) for the test to be sure to consider all changes
      // as eligible.
      domainConf.setChangetimeHeartbeatInterval(10);
      domain2 = MultimasterReplication.createNewDomain(domainConf);
      domain2.start();
@@ -3552,6 +3691,10 @@
      eclCfg = 
        new ExternalChangelogDomainFakeCfg(true, eclInclude);
      domainConf.setExternalChangelogDomain(eclCfg);
      // Set a Changetime heartbeat interval low enough (less than default
      // value that is 1000 ms) for the test to be sure to consider all changes
      // as eligible.
      domainConf.setChangetimeHeartbeatInterval(10);
      domain3 = MultimasterReplication.createNewDomain(domainConf);
      domain3.start();
@@ -3562,6 +3705,10 @@
      eclCfg = 
        new ExternalChangelogDomainFakeCfg(true, eclInclude);
      domainConf.setExternalChangelogDomain(eclCfg);
      // Set a Changetime heartbeat interval low enough (less than default
      // value that is 1000 ms) for the test to be sure to consider all changes
      // as eligible.
      domainConf.setChangetimeHeartbeatInterval(10);
      domain21 = MultimasterReplication.createNewDomain(domainConf);
      domain21.start();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -54,6 +54,12 @@
  private int serverId;
  private SortedSet<String> replicationServers;
  private long heartbeatInterval = 1000;
  // By default changeTimeHeartbeatInterval is set to 0 in order to disable
  // this feature and not kill the tests that expect to receive special
  // messages.
  private long changeTimeHeartbeatInterval = 0;
  private IsolationPolicy policy = IsolationPolicy.REJECT_ALL_UPDATES;
  // Is assured mode enabled or not ?
@@ -197,7 +203,15 @@
   */
  public long getChangetimeHeartbeatInterval()
  {
    return 0;
    return changeTimeHeartbeatInterval;
  }
  /**
   * {@inheritDoc}
   */
  public void setChangetimeHeartbeatInterval(long changeTimeHeartbeatInterval)
  {
    this.changeTimeHeartbeatInterval = changeTimeHeartbeatInterval;
  }
  /**