mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
30.20.2011 ae41fb531bbbd1bc8f9f6a82eb41c4eeb2da63c4
Resolve several issues with the External Changelog with regards to Cookies and changes with updates and purging.
More specifically these changes are resolving the following issues :
OPENDJ-57 - ECL: lastChangeNumber and firstChangeNumber reset to zero when the changelog is purged to empty
OPENDJ-172 - External ChangeLog Cookie varies when searching with an empty cookie. Cookie should be reproducible.
OPENDJ-173 - External ChangeLog cookies content is altered by Change purging and prevents from continuing search with a previous returned cookie.

The changes are multiples and interdependant, hence a single commit.
But :
- The DraftCNDB is now purged but always keep the last record. The firstKey value is updated accordingly.
- The ReplicationDB is purged but always keep the last record before the trimdate.
- If no cookie is specified, the initial cookie is computed from the ReplicationDomain StartState, but keeping only change numbers older than the trimdate.
- Fix a possible issue when searching a ChangeNumber, and a Count record is found, the changeNumber value was not correct.

These changes mostly revert revision 6406 which meant to fix the same issue (cookies would become obsolete and a search using them returns an Unwilling To Perform), but only managed to hide the root cause of the issue.
8 files modified
226 ■■■■ changed files
opends/src/server/org/opends/server/replication/common/ServerState.java 27 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDB.java 28 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 21 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDB.java 110 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 23 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -518,4 +518,31 @@
  {
    return saved;
  }
  /**
   * Build a copy of the ServerState with only ChangeNumbers older than
   * a specific ChangeNumber. This is used when building the initial
   * Cookie in the External Changelog, to cope with purged changes.
   * @param cn The ChangeNumber to compare the ServerState with
   * @return a copy of the ServerState which only contains the ChangeNumbers
   *         older than cn.
   */
  public ServerState duplicateOnlyOlderThan(ChangeNumber cn)
  {
    ServerState newState = new ServerState();
    synchronized (list)
    {
      for (Integer key  : list.keySet())
      {
        ChangeNumber change = list.get(key);
        Integer id =  change.getServerId();
        if (change.older(cn))
        {
          newState.list.put(id,change);
        }
      }
    }
    return newState;
  }
}
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -466,6 +466,13 @@
    ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0);
    // Find the last changeNumber before the trimDate, in the Database.
    ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate);
    if (lastBeforeTrimDate != null)
    {
      // If we found it, we want to stop trimming when reaching it.
      trimDate = lastBeforeTrimDate;
    }
    // In case of deadlock detection by the Database, this thread can
    // by aborted by a DeadlockException. This is a transient error and
    // the transaction should be attempted again.
opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -668,6 +668,34 @@
    }
    /**
     * Getter for the integer value of the current curson, representing
     * the current DraftChangeNumber being processed.
     *
     * @return the current DraftCN as an integer.
     */
    public int currentKey()
    {
       try
      {
        OperationStatus status =
          cursor.getCurrent(key, entry, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          return -1;
        }
        String str = decodeUTF8(key.getData());
        int draftCN = new Integer(str);
        return draftCN;
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      return -1;
    }
    /**
     * Returns the replication changeNumber associated with the current key.
     * @return the replication changeNumber
     */
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -389,18 +389,22 @@
              // We don't use the returned endState but it's updating CN as
              // reading
              domain.getEligibleState(crossDomainEligibleCN, false);
              domain.getEligibleState(crossDomainEligibleCN);
              ChangeNumber fcn = startState.getMaxChangeNumber(
                  cn.getServerId());
              if (cn.older(fcn))
              int currentKey = cursor.currentKey();
              // Do not delete the lastKey. This should allow us to
              // preserve last change number over time.
              if ((currentKey != lastkey) && (cn.older(fcn)))
              {
                size++;
                cursor.delete();
              }
              else
              {
                firstkey = currentKey;
                finished = true;
              }
            }
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -182,8 +182,8 @@
              + ")" +
              "] [nextNonEligibleMsg="      + nextNonEligibleMsg +
              "] [startState=" + startState +
              "] [stopState= " + stopState +
              "] [currentState= " + currentState + "]]");
              "] [stopState=" + stopState +
              "] [currentState=" + currentState + "]]");
    }
    /**
@@ -749,7 +749,7 @@
          if (isPersistent ==
            StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
          {
            newDomainCtxt.startState = rsd.getEligibleState(eligibleCN, true);
            newDomainCtxt.startState = rsd.getEligibleState(eligibleCN);
            startStatesFromProvidedCookie.remove(rsd.getBaseDn());
          }
          else
@@ -766,7 +766,12 @@
              // let's start traversing this domain from the beginning of
              // what we have in the replication changelog
              if (newDomainCtxt.startState == null)
                newDomainCtxt.startState = new ServerState();
              {
                ChangeNumber latestTrimCN =
                    new ChangeNumber(newDomainCtxt.domainLatestTrimDate, 0,0);
                newDomainCtxt.startState = rsd.getStartState()
                        .duplicateOnlyOlderThan(latestTrimCN);
              }
            }
            else
            {
@@ -807,7 +812,7 @@
            }
            // Set the stop state for the domain from the eligibleCN
            newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN, true);
            newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
          }
          newDomainCtxt.currentState = new ServerState();
@@ -822,9 +827,8 @@
          rsd.registerHandler(mh);
          newDomainCtxt.mh = mh;
          previousCookie.update(
              newDomainCtxt.rsd.getBaseDn(),
              newDomainCtxt.startState);
          previousCookie.update(newDomainCtxt.rsd.getBaseDn(),
                                newDomainCtxt.startState);
          // store the new context
          tmpSet.add(newDomainCtxt);
@@ -1052,6 +1056,7 @@
          ResultCode.UNWILLING_TO_PERFORM,
          ERR_INVALID_COOKIE_SYNTAX.get());
    }
    excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs();
    replicationServer.disableEligibility(excludedServiceIDs);
    eligibleCN = replicationServer.getEligibleCN();
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -450,6 +450,12 @@
           */
          cn = null;
        }
        else
        {
          str = decodeUTF8(key.getData());
          cn= new ChangeNumber(str);
          // There can't be 2 counter record next to each other
        }
      }
    }
    catch (DatabaseException e)
@@ -470,6 +476,110 @@
  }
  /**
   * Try to find in the DB, the change number right before the one
   * passed as a parameter.
   *
   * @param changeNumber
   *          The changeNumber from which we start searching.
   * @return the changeNumber right before the one passed as a parameter.
   *         Can return null if there is none.
   */
  public ChangeNumber getPreviousChangeNumber(ChangeNumber changeNumber)
  {
    if (changeNumber == null)
      return null;
    Cursor cursor = null;
    ChangeNumber cn = null;
    DatabaseEntry key = new ReplicationKey(changeNumber);
    DatabaseEntry data = new DatabaseEntry();
    Transaction txn = null;
    dbCloseLock.readLock().lock();
    try
    {
      cursor = db.openCursor(txn, null);
      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) ==
              OperationStatus.SUCCESS)
      {
        // We can move close to the changeNumber.
        // Let's move to the previous change.
        if (cursor.getPrev(key, data, LockMode.DEFAULT) ==
                OperationStatus.SUCCESS)
        {
          String str = decodeUTF8(key.getData());
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            if (cursor.getPrev(key, data,
                 LockMode.DEFAULT) != OperationStatus.SUCCESS)
            {
              // database starts with a counter record.
              cn = null;
            }
            else
            {
              str = decodeUTF8(key.getData());
              cn= new ChangeNumber(str);
              // There can't be 2 counter record next to each other
            }
          }
        }
        // else, there was no change previous to our changeNumber.
      }
      else
      {
        // We could not move the cursor past to the changeNumber
        // Check if the last change is older than changeNumber
        if (cursor.getLast(key, data, LockMode.DEFAULT) ==
                OperationStatus.SUCCESS)
        {
          String str = decodeUTF8(key.getData());
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            if (cursor.getPrev(key, data,
              LockMode.DEFAULT) != OperationStatus.SUCCESS)
            {
              /*
               * database only contain a counter record, should not be
               * possible, but Ok, let's just say no change Number
               */
              cn = null;
            }
            else
            {
              str = decodeUTF8(key.getData());
              cn= new ChangeNumber(str);
              // There can't be 2 counter record next to each other
            }
          }
        }
      }
    }
    catch (DatabaseException e)
    {
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      // TODO: Verify if shutting down replication is the right thing to do
      replicationServer.shutdown();
      cn = null;
    }
    finally
    {
      closeLockedCursor(cursor);
    }
    return cn;
  }
  /**
   * {@inheritDoc}
   */
  @Override
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1977,7 +1977,7 @@
          continue;
        result.update(rsd.getBaseDn(), rsd.getEligibleState(
            getEligibleCN(),false));
            getEligibleCN()));
      }
    }
    return result;
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -3153,12 +3153,9 @@
   * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
   *
   * @param eligibleCN              The provided eligibleCN.
   * @param allowOlderThanPurgeDate When true, the returned state can be older
   *                                than the purge date of the domain.
   * @return The computed eligible server state.
   */
  public ServerState getEligibleState(ChangeNumber eligibleCN,
      boolean allowOlderThanPurgeDate)
  public ServerState getEligibleState(ChangeNumber eligibleCN)
  {
    ServerState result = new ServerState();
@@ -3225,24 +3222,6 @@
      }
    }
    if (allowOlderThanPurgeDate == false)
    {
      boolean domainPurged = true;
      long latestDomainTrimDate = getLatestDomainTrimDate();
      Iterator<Integer> it = result.iterator();
      while (it.hasNext())
      {
        int sid = it.next();
        ChangeNumber cn = result.getMaxChangeNumber(sid);
        if ((cn.getTime()>0) && (cn.getTime()<latestDomainTrimDate))
          result.update(new ChangeNumber(0,0,sid));
        else
          domainPurged = false;
      }
      if (domainPurged == true)
        result.clear();
    }
    if (debugEnabled())
      TRACER.debugInfo("In " + this
        + " getEligibleState() result is " + result);