mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
09.24.2011 aba87ac47f81475c93ab6f439f247176334b1ef0
Fix issue OPENDJ-67: Investigate ECL change number consistency across replications servers which have been subjected to different purging policies.

The issue of ECL change number inconsistency across servers is not tied to purging policies but to the fact that the CN is based on a lazy built index. As a result, the last change number in the index database depends on when Searches are done against the server and not what are really the changes available. The lastChangeNumber virtual attribute uses the index last CN and adds the changes received and eligible.

The changes here are of fixing the following :
After purging of the DraftCNDB, the next value will be incremented from last known value. As a result, changeNumbers are always increasing. Purging the DraftCNDB is now stopping at the first index that still points to a valid change in the changelog. The counter of changes has been fixed to take into account the fact that the changelog keeps at least one change older than the trim age in order to preserve the validity of the Cookie for at least "purge delay".
Also contains some optimization of the DraftCNDBCursor to avoid reading the entries pointed by the cursor several times.
6 files modified
146 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/DbHandler.java 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDB.java 63 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java 39 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java 17 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 14 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -654,8 +654,14 @@
  public int getCount(ChangeNumber from, ChangeNumber to)
  {
    int c=0;
    flush();
    c = db.count(from, to);
    // Now that we always keep the last ChangeNumber in the DB to avoid
    // expiring cookies to quickly, we need to check if the "to"
    // is older than the trim date.
    if ((to == null) || !to.older(new ChangeNumber(latestTrimDate, 0, 0)))
    {
      flush();
      c = db.count(from, to);
    }
    return c;
  }
opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -321,7 +321,7 @@
    private final Transaction txn;
    private final DatabaseEntry key;
    private final DatabaseEntry entry;
    private DraftCNData seqnumData = null;
    private boolean isClosed = false;
@@ -367,18 +367,22 @@
            }
            else
            {
              // We can move close to the startingChangeNumber.
              // Let's create a cursor from that point.
              DatabaseEntry key = new DatabaseEntry();
              DatabaseEntry data = new DatabaseEntry();
              if (localCursor.getPrev(
                  key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS)
              if (localCursor.getPrev(key, entry, LockMode.DEFAULT)
                      != OperationStatus.SUCCESS)
              {
                localCursor.close();
                localCursor = db.openCursor(localTxn, null);
              }
              else
              {
                 seqnumData =  new DraftCNData(entry.getData());
              }
            }
          }
          else
          {
            seqnumData = new DraftCNData(entry.getData());
          }
        }
        this.txn = localTxn;
@@ -514,15 +518,10 @@
    {
      try
      {
        OperationStatus status =
          cursor.getCurrent(key, entry, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        if (seqnumData != null)
        {
          return null;
          return seqnumData.getValue();
        }
        DraftCNData seqnumData = new DraftCNData(entry.getData());
        return seqnumData.getValue();
      }
      catch(Exception e)
      {
@@ -539,15 +538,10 @@
    {
      try
      {
        OperationStatus status =
          cursor.getCurrent(key, entry, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        if (seqnumData != null)
        {
          return null;
          return seqnumData.getServiceID();
        }
        DraftCNData seqnumData = new DraftCNData(entry.getData());
        return seqnumData.getServiceID();
      }
      catch(Exception e)
      {
@@ -557,7 +551,7 @@
    }
    /**
     * Getter for the integer value of the current curson, representing
     * Getter for the integer value of the current cursor, representing
     * the current DraftChangeNumber being processed.
     *
     * @return the current DraftCN as an integer.
@@ -566,13 +560,6 @@
    {
       try
      {
        OperationStatus status =
          cursor.getCurrent(key, entry, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          return -1;
        }
        String str = decodeUTF8(key.getData());
        int draftCN = new Integer(str);
        return draftCN;
@@ -592,16 +579,10 @@
    {
      try
      {
        OperationStatus status =
          cursor.getCurrent(key, entry, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        if (seqnumData != null)
        {
          return null;
          return seqnumData.getChangeNumber();
        }
        DraftCNData seqnumData =
          new DraftCNData(entry.getData());
        return seqnumData.getChangeNumber();
      }
      catch(Exception e)
      {
@@ -620,8 +601,16 @@
      OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
        seqnumData = null;
        return false;
      }
      try {
        seqnumData = new DraftCNData(entry.getData());
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      return true;
    }
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -50,6 +50,8 @@
import org.opends.server.types.InitializationException;
import com.sleepycat.je.DatabaseException;
import java.util.HashMap;
import org.opends.server.replication.common.MultiDomainServerState;
/**
 * This class is used for managing the replicationServer database for each
@@ -385,14 +387,45 @@
          int currentKey = cursor.currentKey();
          // Do not delete the lastKey. This should allow us to
          // preserve last change number over time.
          if ((currentKey != lastkey) && (cn.older(fcn)))
          if (cn.older(fcn))
          {
            cursor.delete();
            continue;
          }
          ServerState cnVector = null;
          try
          {
            HashMap<String,ServerState> cnStartStates =
                MultiDomainServerState.splitGenStateToServerStates(
                        cursor.currentValue());
            cnVector = cnStartStates.get(serviceID);
            if (debugEnabled())
              TRACER.debugInfo("DraftCNDBHandler:clear() - ChangeVector:"+
                      cnVector.toString()+
                      " -- StartState:"+startState.toString());
            // cnVector.update(cn);
          }
          catch(Exception e)
          {
            // We couldn't parse the mdss from the DraftCNData Value
            assert(false);
            cursor.delete();
            continue;
          }
          if ((cnVector == null)
                  || ((cnVector.getMaxChangeNumber(cn.getServerId()) != null)
                      && !cnVector.cover(startState)))
          {
            cursor.delete();
            if (debugEnabled())
              TRACER.debugInfo("DraftCNDBHandler:clear() - deleted "+
                      cn.toString()+"Not covering startState");
            continue;
          }
          firstkey = currentKey;
          cursor.close();
          return;
opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java
@@ -68,23 +68,6 @@
  }
  /**
   * Getter for the value field (external changelog cookie).
   * @return The value field (external changelog cookie).
   */
  public String getValue()
  {
    try
    {
      return this.draftCNDbCursor.currentValue();
    }
    catch(Exception e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
      return null;
    }
  }
  /**
   * Getter for the serviceID field.
   * @return The service ID.
   */
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1860,6 +1860,7 @@
    int firstDraftCN;
    int lastDraftCN;
    Boolean dbEmpty = false;
    Long newestDate = 0L;
    DraftCNDbHandler draftCNDbH = this.getDraftCNDbHandler();
@@ -1870,6 +1871,7 @@
    String domainForLastSeqnum = null;
    if (firstDraftCN < 1)
    {
      dbEmpty = true;
      firstDraftCN = 0;
      lastDraftCN = 0;
    }
@@ -1912,8 +1914,11 @@
        if (domainsServerStateForLastSeqnum == null)
        {
          // Count changes of this domain from the beginning of the changelog
          ChangeNumber trimCN =
              new ChangeNumber(rsd.getLatestDomainTrimDate(), 0,0);
          ec = rsd.getEligibleCount(
              new ServerState(), crossDomainEligibleCN);
                    rsd.getStartState().duplicateOnlyOlderThan(trimCN),
                    crossDomainEligibleCN);
        }
        else
        {
@@ -1947,6 +1952,13 @@
          firstDraftCN = 1;
      }
    }
    if (dbEmpty)
    {
      // The database was empty, just keep increasing numbers since last time
      // we generated one DraftCN.
      firstDraftCN += lastGeneratedDraftCN;
      lastDraftCN += lastGeneratedDraftCN;
    }
    return new int[]{firstDraftCN, lastDraftCN};
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -587,7 +588,7 @@
      }
      else
      {
        expectedCnt = 1;
        expectedCnt = 0;
      }
      debugInfo(tn,testcase + " actualCnt=" + actualCnt);
      assertEquals(actualCnt, expectedCnt, testcase);