mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.34.2013 ff3d7d233cd306e4fe50e2219fcdfc7f4ec7c920
OPENDJ-1116 Introduce abstraction for the changelog DB


ChangeNumberIndexDB.java:
All methods can now throw ChangelogException.


ReplicationServer.java:
Extracted method shutdownCNIndexDB().
Removed getFirstChangeNumber() and getLastChangeNumber().

DraftCNDbHandler.java:
Removed catch blocks and let the exception propagate.

ECLServerHandler.java:
Let the ChangelogException propagate.


ReplicaDBCursor.java:
Augmented the javadoc to explain the sort order. Follow up of r9504.
5 files modified
314 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 231 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java 40 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java 3 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java 34 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -1347,9 +1347,11 @@
   *         provided oldestChange, <code>false</code> otherwise
   * @throws DirectoryException
   *           if any problem occur
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
      throws DirectoryException
      throws DirectoryException, ChangelogException
  {
    // We also need to check if the draftCNdb is consistent with
    // the changelogdb.
@@ -1451,7 +1453,7 @@
  }
  private void assignNewDraftCNAndStore(ECLUpdateMsg change)
      throws DirectoryException
      throws DirectoryException, ChangelogException
  {
    // generate a new change number and assign to this change
    change.setChangeNumber(replicationServer.getNewChangeNumber());
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -586,8 +586,7 @@
   * Enable the ECL access by creating a dedicated workflow element.
   * @throws DirectoryException when an error occurs.
   */
  public void enableECL()
  throws DirectoryException
  public void enableECL() throws DirectoryException
  {
    if (externalChangeLogWorkflowImpl!=null)
    {
@@ -661,8 +660,7 @@
    {
      Message message =
        NOTE_ERR_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR.get(attrName, e.toString());
      throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
          message, e);
      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, message, e);
    }
  }
@@ -679,19 +677,14 @@
      // internalNetworkGroup?
      NetworkGroup internalNetworkGroup = NetworkGroup
          .getInternalNetworkGroup();
      internalNetworkGroup
          .deregisterWorkflow(externalChangeLogWorkflowID);
      internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
      // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
      NetworkGroup adminNetworkGroup = NetworkGroup
          .getAdminNetworkGroup();
      adminNetworkGroup
          .deregisterWorkflow(externalChangeLogWorkflowID);
      NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
      adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
      NetworkGroup defaultNetworkGroup = NetworkGroup
          .getDefaultNetworkGroup();
      defaultNetworkGroup
          .deregisterWorkflow(externalChangeLogWorkflowID);
      NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
      defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
      eclwf.deregister();
      eclwf.finalizeWorkflow();
@@ -705,11 +698,26 @@
      eclwe.finalizeWorkflowElement();
    }
    shutdownCNIndexDB();
  }
  private void shutdownCNIndexDB()
  {
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        cnIndexDB.shutdown();
        try
        {
          cnIndexDB.shutdown();
        }
        catch (ChangelogException ignored)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
      }
    }
  }
@@ -1385,17 +1393,7 @@
          }
        }
        try
        {
          cnIndexDB.shutdown();
        }
        catch (Exception ignored)
        {
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
          }
        }
        shutdownCNIndexDB();
        lastGeneratedChangeNumber = 0;
        cnIndexDB = null;
@@ -1639,7 +1637,7 @@
        if (cnIndexDB == null)
        {
          cnIndexDB = new DraftCNDbHandler(this, this.dbEnv);
          lastGeneratedChangeNumber = getLastChangeNumber();
          lastGeneratedChangeNumber = cnIndexDB.getLastChangeNumber();
        }
        return cnIndexDB;
      }
@@ -1654,40 +1652,6 @@
  }
  /**
   * Get the value of the first change number, 0 when db is empty.
   *
   * @return the first value.
   */
  public long getFirstChangeNumber()
  {
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        return cnIndexDB.getFirstChangeNumber();
      }
      return 0;
    }
  }
  /**
   * Get the value of the last change number, 0 when db is empty.
   *
   * @return the last value.
   */
  public long getLastChangeNumber()
  {
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        return cnIndexDB.getLastChangeNumber();
      }
      return 0;
    }
  }
  /**
   * Generate a new change number.
   *
   * @return The generated change number
@@ -1739,89 +1703,96 @@
    boolean dbEmpty = false;
    final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
    long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
    Map<String, ServerState> domainsServerStateForLastCN = null;
    CSN csnForLastCN = null;
    String domainForLastCN = null;
    if (firstChangeNumber < 1)
    try
    {
      dbEmpty = true;
      firstChangeNumber = 0;
      lastChangeNumber = 0;
    }
    else
    {
      lastChangeNumber = cnIndexDB.getLastChangeNumber();
      // Get the generalized state associated with the current last change
      // number and initializes from it the startStates table
      String lastCNGenState = cnIndexDB.getPreviousCookie(lastChangeNumber);
      if (lastCNGenState != null && lastCNGenState.length() > 0)
      long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
      Map<String, ServerState> domainsServerStateForLastCN = null;
      CSN csnForLastCN = null;
      String domainForLastCN = null;
      if (firstChangeNumber < 1)
      {
        domainsServerStateForLastCN =
            MultiDomainServerState.splitGenStateToServerStates(lastCNGenState);
      }
      csnForLastCN = cnIndexDB.getCSN(lastChangeNumber);
      domainForLastCN = cnIndexDB.getBaseDN(lastChangeNumber);
    }
    long newestDate = 0;
    for (ReplicationServerDomain rsd : getReplicationServerDomains())
    {
      if (contains(excludedBaseDNs, rsd.getBaseDn()))
        continue;
      // for this domain, have the state in the replchangelog
      // where the last change number update is
      long ec;
      if (domainsServerStateForLastCN == null)
      {
        // Count changes of this domain from the beginning of the changelog
        CSN trimCSN = new CSN(rsd.getLatestDomainTrimDate(), 0, 0);
        ec = rsd.getEligibleCount(
                  rsd.getStartState().duplicateOnlyOlderThan(trimCSN),
                  crossDomainEligibleCSN);
        dbEmpty = true;
        firstChangeNumber = 0;
        lastChangeNumber = 0;
      }
      else
      {
        // There are records in the draftDB (so already returned to clients)
        // BUT
        //  There is nothing related to this domain in the last draft record
        //  (may be this domain was disabled when this record was returned).
        // In that case, are counted the changes from
        //  the date of the most recent change from this last draft record
        if (newestDate == 0)
        lastChangeNumber = cnIndexDB.getLastChangeNumber();
        // Get the generalized state associated with the current last change
        // number and initializes from it the startStates table
        String lastCNGenState = cnIndexDB.getPreviousCookie(lastChangeNumber);
        if (lastCNGenState != null && lastCNGenState.length() > 0)
        {
          newestDate = csnForLastCN.getTime();
          domainsServerStateForLastCN = MultiDomainServerState
              .splitGenStateToServerStates(lastCNGenState);
        }
        // And count changes of this domain from the date of the
        // lastseqnum record (that does not refer to this domain)
        CSN csnx = new CSN(newestDate, csnForLastCN.getSeqnum(), 0);
        ec = rsd.getEligibleCount(csnx, crossDomainEligibleCSN);
        if (domainForLastCN.equalsIgnoreCase(rsd.getBaseDn()))
          ec--;
        csnForLastCN = cnIndexDB.getCSN(lastChangeNumber);
        domainForLastCN = cnIndexDB.getBaseDN(lastChangeNumber);
      }
      // cumulates on domains
      lastChangeNumber += ec;
      long newestDate = 0;
      for (ReplicationServerDomain rsd : getReplicationServerDomains())
      {
        if (contains(excludedBaseDNs, rsd.getBaseDn()))
          continue;
      // CNIndexDB is empty and there are eligible updates in the replication
      // changelog then init first change number
      if (ec > 0 && firstChangeNumber == 0)
        firstChangeNumber = 1;
        // for this domain, have the state in the replchangelog
        // where the last change number update is
        long ec;
        if (domainsServerStateForLastCN == null)
        {
          // Count changes of this domain from the beginning of the changelog
          CSN trimCSN = new CSN(rsd.getLatestDomainTrimDate(), 0, 0);
          ec = rsd.getEligibleCount(
              rsd.getStartState().duplicateOnlyOlderThan(trimCSN),
              crossDomainEligibleCSN);
        }
        else
        {
          // There are records in the draftDB (so already returned to clients)
          // BUT
          // There is nothing related to this domain in the last draft record
          // (may be this domain was disabled when this record was returned).
          // In that case, are counted the changes from
          // the date of the most recent change from this last draft record
          if (newestDate == 0)
          {
            newestDate = csnForLastCN.getTime();
          }
          // And count changes of this domain from the date of the
          // lastseqnum record (that does not refer to this domain)
          CSN csnx = new CSN(newestDate, csnForLastCN.getSeqnum(), 0);
          ec = rsd.getEligibleCount(csnx, crossDomainEligibleCSN);
          if (domainForLastCN.equalsIgnoreCase(rsd.getBaseDn()))
            ec--;
        }
        // cumulates on domains
        lastChangeNumber += ec;
        // CNIndexDB is empty and there are eligible updates in the replication
        // changelog then init first change number
        if (ec > 0 && firstChangeNumber == 0)
          firstChangeNumber = 1;
      }
      if (dbEmpty)
      {
        // The database was empty, just keep increasing numbers since last time
        // we generated one change number.
        firstChangeNumber += lastGeneratedChangeNumber;
        lastChangeNumber += lastGeneratedChangeNumber;
      }
      return new long[] { firstChangeNumber, lastChangeNumber };
    }
    if (dbEmpty)
    catch (ChangelogException e)
    {
      // The database was empty, just keep increasing numbers since last time
      // we generated one change number.
      firstChangeNumber += lastGeneratedChangeNumber;
      lastChangeNumber += lastGeneratedChangeNumber;
      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e);
    }
    return new long[] { firstChangeNumber, lastChangeNumber };
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -49,8 +49,10 @@
   * @param changeNumber
   *          the provided change number.
   * @return the associated CSN, null when none.
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  public CSN getCSN(long changeNumber);
  public CSN getCSN(long changeNumber) throws ChangelogException;
  /**
   * Get the baseDN associated to a provided change number.
@@ -58,8 +60,10 @@
   * @param changeNumber
   *          the provided change number.
   * @return the baseDN, null when none.
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  public String getBaseDN(long changeNumber);
  public String getBaseDN(long changeNumber) throws ChangelogException;
  /**
   * Get the previous cookie associated to a provided change number.
@@ -67,22 +71,28 @@
   * @param changeNumber
   *          the provided change number.
   * @return the previous cookie, null when none.
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  String getPreviousCookie(long changeNumber);
  String getPreviousCookie(long changeNumber) throws ChangelogException;
  /**
   * Get the first change number stored in this DB.
   *
   * @return Returns the first change number in this DB.
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  long getFirstChangeNumber();
  long getFirstChangeNumber() throws ChangelogException;
  /**
   * Get the last change number stored in this DB.
   *
   * @return Returns the last change number in this DB
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  long getLastChangeNumber();
  long getLastChangeNumber() throws ChangelogException;
  /**
   * Add an update to the list of messages that must be saved to this DB managed
@@ -99,8 +109,11 @@
   *          The associated baseDN.
   * @param csn
   *          The associated replication CSN.
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  void add(long changeNumber, String previousCookie, String baseDN, CSN csn);
  void add(long changeNumber, String previousCookie, String baseDN, CSN csn)
      throws ChangelogException;
  /**
   * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the
@@ -113,7 +126,7 @@
   *         this DBHandler and starting at the position defined by a given
   *         changeNumber.
   * @throws ChangelogException
   *           if a database problem happened.
   *           if a database problem occurs.
   */
  ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber)
      throws ChangelogException;
@@ -123,14 +136,16 @@
   *
   * @return <code>true</code> if this database is empty, <code>false</code>
   *         otherwise
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  boolean isEmpty();
  boolean isEmpty() throws ChangelogException;
  /**
   * Clear the changes from this DB (from both memory cache and DB storage).
   *
   * @throws ChangelogException
   *           When an exception occurs while removing the changes from this DB.
   *           if a database problem occurs.
   */
  void clear() throws ChangelogException;
@@ -142,13 +157,16 @@
   *          The baseDN for which we want to remove all records from this DB,
   *          null means all.
   * @throws ChangelogException
   *           When an exception occurs while removing the changes from this DB.
   *           if a database problem occurs.
   */
  void clear(String baseDNToClear) throws ChangelogException;
  /**
   * Shutdown this DB.
   *
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  void shutdown();
  void shutdown() throws ChangelogException;
}
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java
@@ -33,6 +33,9 @@
/**
 * This cursor allows to iterate through the changes received from a given
 * replica (Directory Server) in the topology.
 * <p>
 * Instances of this class are sorted in the order defined by the CSN of the
 * current {@link UpdateMsg}, i.e. the cursor with the oldest CSN comes first.
 */
public interface ReplicaDBCursor extends Closeable, Comparable<ReplicaDBCursor>
{
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -516,7 +516,7 @@
  /** {@inheritDoc} */
  @Override
  public String getPreviousCookie(long changeNumber)
  public String getPreviousCookie(long changeNumber) throws ChangelogException
  {
    DraftCNDBCursor cursor = null;
    try
@@ -524,11 +524,6 @@
      cursor = db.openReadCursor(changeNumber);
      return cursor.currentValue();
    }
    catch(Exception e)
    {
      debugException("getValue", changeNumber, e);
      return null;
    }
    finally
    {
      close(cursor);
@@ -537,7 +532,7 @@
  /** {@inheritDoc} */
  @Override
  public CSN getCSN(long changeNumber)
  public CSN getCSN(long changeNumber) throws ChangelogException
  {
    DraftCNDBCursor cursor = null;
    try
@@ -545,20 +540,15 @@
      cursor = db.openReadCursor(changeNumber);
      return cursor.currentCSN();
    }
    catch(Exception e)
    {
      debugException("getCSN", changeNumber, e);
      return null;
    }
    finally
    {
      close(cursor);
    }
  }
  /**{@inheritDoc}*/
  /** {@inheritDoc} */
  @Override
  public String getBaseDN(long changeNumber)
  public String getBaseDN(long changeNumber) throws ChangelogException
  {
    DraftCNDBCursor cursor = null;
    try
@@ -566,25 +556,9 @@
      cursor = db.openReadCursor(changeNumber);
      return cursor.currentBaseDN();
    }
    catch(Exception e)
    {
      debugException("getBaseDN", changeNumber, e);
      return null;
    }
    finally
    {
      close(cursor);
    }
  }
  private void debugException(String methodName, long changeNumber, Exception e)
  {
    if (debugEnabled())
      TRACER.debugInfo("In DraftCNDbHandler." + methodName + "(), read: "
          + " key=" + changeNumber + " value returned is null"
          + " first="+ db.readFirstChangeNumber()
          + " last=" + db.readLastChangeNumber()
          + " count=" + db.count()
          + " exception " + e + " " + e.getMessage());
  }
}