mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.57.2013 b1ae3d652cc31c837721fd750623cec531b94d93
OPENDJ-1116 Introduce abstraction for the changelog DB


After talking Matthew's over the phone review, did a few changes described down below.

*.java:
Changed changeNumber from int to long throughout the replication code base.

ECLUpdateMsg.java:
In getBytes(), did not complete the change to long because it would require a protocol version change.


ReplicaDBCursor.java:
Now extends Comparable.

ReplicaDBCursorComparator.java: REMOVED
Code has been moved to JEReplicaDBCursor.compareTo().


ChangeNumberIndexDB.java:
Does not extend Runnable anymore (mistake from earlier refactorings).

DraftCNDbHandler.java:
Now extends Runnable.
1 files deleted
17 files modified
206 ■■■■■ changed files
opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java 12 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java 12 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 16 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java 16 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursorComparator.java 58 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java 20 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDraftCNKey.java 15 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
@@ -27,9 +27,6 @@
 */
package org.opends.server.replication.common;
import static org.opends.messages.ExtensionMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -48,7 +45,8 @@
import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ExtensionMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
 * This class implements a virtual attribute provider that allows administrators
@@ -61,7 +59,7 @@
       extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg>
       implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg>
{
  // The tracer object for the debug logger.
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
@@ -128,7 +126,7 @@
  @Override()
  public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule)
  {
    String first="0";
    String first = "0";
    try
    {
      ECLWorkflowElement eclwe = (ECLWorkflowElement)
@@ -142,7 +140,7 @@
        ReplicationServer rs = eclwe.getReplicationServer();
        rs.disableEligibility(excludedDomains);
        int[] limits = rs.getECLChangeNumberLimits(
        long[] limits = rs.getECLChangeNumberLimits(
            rs.getEligibleCSN(), excludedDomains);
        first = String.valueOf(limits[0]);
opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
@@ -27,9 +27,6 @@
 */
package org.opends.server.replication.common;
import static org.opends.messages.ExtensionMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -48,7 +45,8 @@
import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ExtensionMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
 * This class implements a virtual attribute provider that allows administrators
@@ -61,7 +59,9 @@
       extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg>
       implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg>
{
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Creates a new instance of this member virtual attribute provider.
   */
@@ -109,7 +109,6 @@
  }
  /**
   *  {@inheritDoc}
   */
@@ -121,7 +120,6 @@
  }
  /**
   * {@inheritDoc}
   */
@@ -142,7 +140,7 @@
        ReplicationServer rs = eclwe.getReplicationServer();
        rs.disableEligibility(excludedDomains);
        int[] limits = rs.getECLChangeNumberLimits(
        long[] limits = rs.getECLChangeNumberLimits(
            rs.getEligibleCSN(), excludedDomains);
        last = String.valueOf(limits[1]);
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -49,7 +49,7 @@
  private MultiDomainServerState cookie;
  /** The changeNumber as specified by draft-good-ldap-changelog. */
  private int changeNumber;
  private long changeNumber;
  /**
   * Creates a new message.
@@ -184,7 +184,10 @@
  {
    byte[] byteCookie = String.valueOf(cookie).getBytes("UTF-8");
    byte[] byteBaseDN = String.valueOf(baseDN).getBytes("UTF-8");
    byte[] byteChangeNumber = Integer.toString(changeNumber).getBytes("UTF-8");
    // FIXME JNR Changing line below to use long would require a protocol
    // version change. Leave it like this for now until the need arises.
    byte[] byteChangeNumber =
        Integer.toString((int) changeNumber).getBytes("UTF-8");
    byte[] byteUpdateMsg = updateMsg.getBytes(protocolVersion);
    int length = 1 + byteCookie.length +
@@ -211,7 +214,7 @@
   * Setter for the changeNumber of this change.
   * @param changeNumber the provided changeNumber for this change.
   */
  public void setChangeNumber(int changeNumber)
  public void setChangeNumber(long changeNumber)
  {
    this.changeNumber = changeNumber;
  }
@@ -220,7 +223,7 @@
   * Getter for the changeNumber of this change.
   * @return the changeNumber of this change.
   */
  public int getChangeNumber()
  public long getChangeNumber()
  {
    return this.changeNumber;
  }
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -581,7 +581,7 @@
        return null;
      }
      final int firstChangeNumber = cnIndexDB.getFirstChangeNumber();
      final long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
      final String crossDomainStartState =
          cnIndexDB.getPreviousCookie(firstChangeNumber);
      cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
@@ -606,10 +606,10 @@
     * Get the draftLimits (from the eligibleCSN got at the beginning of the
     * operation) in order to have the first and possible last change number.
     */
    final int[] limits = replicationServer.getECLChangeNumberLimits(
    final long[] limits = replicationServer.getECLChangeNumberLimits(
        eligibleCSN, excludedBaseDNs);
    final int firstChangeNumber = limits[0];
    final int lastChangeNumber = limits[1];
    final long firstChangeNumber = limits[0];
    final long lastChangeNumber = limits[1];
    // If the startChangeNumber provided is lower than the firstChangeNumber in
    // the DB, let's use the lower limit.
@@ -636,7 +636,7 @@
        return null;
      }
      final int lastKey = cnIndexDB.getLastChangeNumber();
      final long lastKey = cnIndexDB.getLastChangeNumber();
      crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey);
      cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey);
      return crossDomainStartState;
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -39,7 +39,7 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -497,7 +497,7 @@
  private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges()
  {
    final NavigableSet<ReplicaDBCursor> results =
        new TreeSet<ReplicaDBCursor>(new ReplicaDBCursorComparator());
        new TreeSet<ReplicaDBCursor>();
    for (int serverId : replicationServerDomain.getServerIds())
    {
      // get the last already sent CN from that server to get a cursor
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -157,7 +157,7 @@
   * <p>
   * Guarded by cnIndexDBLock
   **/
  private int lastGeneratedChangeNumber = 0;
  private long lastGeneratedChangeNumber = 0;
  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
  private final Object cnIndexDBLock = new Object();
@@ -1658,7 +1658,7 @@
   *
   * @return the first value.
   */
  public int getFirstChangeNumber()
  public long getFirstChangeNumber()
  {
    synchronized (cnIndexDBLock)
    {
@@ -1675,7 +1675,7 @@
   *
   * @return the last value.
   */
  public int getLastChangeNumber()
  public long getLastChangeNumber()
  {
    synchronized (cnIndexDBLock)
    {
@@ -1692,7 +1692,7 @@
   *
   * @return The generated change number
   */
  public int getNewChangeNumber()
  public long getNewChangeNumber()
  {
    synchronized (cnIndexDBLock)
    {
@@ -1712,7 +1712,7 @@
   * @throws DirectoryException
   *           When it happens.
   */
  public int[] getECLChangeNumberLimits(CSN crossDomainEligibleCSN,
  public long[] getECLChangeNumberLimits(CSN crossDomainEligibleCSN,
      Set<String> excludedBaseDNs) throws DirectoryException
  {
    /* The content of the DraftCNdb depends on the SEARCH operations done before
@@ -1735,11 +1735,11 @@
     *     (this diff is done domain by domain)
     */
    int lastChangeNumber;
    long lastChangeNumber;
    boolean dbEmpty = false;
    final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
    int firstChangeNumber = cnIndexDB.getFirstChangeNumber();
    long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
    Map<String, ServerState> domainsServerStateForLastCN = null;
    CSN csnForLastCN = null;
    String domainForLastCN = null;
@@ -1821,7 +1821,7 @@
      firstChangeNumber += lastGeneratedChangeNumber;
      lastChangeNumber += lastGeneratedChangeNumber;
    }
    return new int[]{firstChangeNumber, lastChangeNumber};
    return new long[] { firstChangeNumber, lastChangeNumber };
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -40,7 +40,7 @@
 * @see <a href= "http://tools.ietf.org/html/draft-good-ldap-changelog-04"
 * >OpenDJ Domain Names</a> for more information about the changeNumber.
 */
public interface ChangeNumberIndexDB extends Runnable
public interface ChangeNumberIndexDB
{
  /**
@@ -50,7 +50,7 @@
   *          the provided change number.
   * @return the associated CSN, null when none.
   */
  public CSN getCSN(int changeNumber);
  public CSN getCSN(long changeNumber);
  /**
   * Get the baseDN associated to a provided change number.
@@ -59,7 +59,7 @@
   *          the provided change number.
   * @return the baseDN, null when none.
   */
  public String getBaseDN(int changeNumber);
  public String getBaseDN(long changeNumber);
  /**
   * Get the previous cookie associated to a provided change number.
@@ -68,21 +68,21 @@
   *          the provided change number.
   * @return the previous cookie, null when none.
   */
  String getPreviousCookie(int changeNumber);
  String getPreviousCookie(long changeNumber);
  /**
   * Get the first change number stored in this DB.
   *
   * @return Returns the first change number in this DB.
   */
  int getFirstChangeNumber();
  long getFirstChangeNumber();
  /**
   * Get the last change number stored in this DB.
   *
   * @return Returns the last change number in this DB
   */
  int getLastChangeNumber();
  long getLastChangeNumber();
  /**
   * Add an update to the list of messages that must be saved to this DB managed
@@ -100,7 +100,7 @@
   * @param csn
   *          The associated replication CSN.
   */
  void add(int changeNumber, String previousCookie, String baseDN, CSN csn);
  void add(long changeNumber, String previousCookie, String baseDN, CSN csn);
  /**
   * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the
@@ -115,7 +115,7 @@
   * @throws ChangelogException
   *           if a database problem happened.
   */
  ChangeNumberIndexDBCursor getCursorFrom(int startChangeNumber)
  ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber)
      throws ChangelogException;
  /**
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java
@@ -57,7 +57,7 @@
   *
   * @return The change number field.
   */
  int getChangeNumber();
  long getChangeNumber();
  /**
   * Skip to the next record of the database.
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java
@@ -34,7 +34,7 @@
 * This cursor allows to iterate through the changes received from a given
 * replica (Directory Server) in the topology.
 */
public interface ReplicaDBCursor extends Closeable
public interface ReplicaDBCursor extends Closeable, Comparable<ReplicaDBCursor>
{
  /**
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursorComparator.java
File was deleted
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -92,7 +92,7 @@
   * @param csn the provided replication CSN to be
   *                     stored associated with this change number.
   */
  public void addEntry(int changeNumber, String value, String domainBaseDN,
  public void addEntry(long changeNumber, String value, String domainBaseDN,
      CSN csn)
  {
    try
@@ -187,7 +187,7 @@
   *                           creation.
   * @return The ReplServerDBCursor.
   */
  public DraftCNDBCursor openReadCursor(int changeNumber)
  public DraftCNDBCursor openReadCursor(long changeNumber)
      throws ChangelogException
  {
    return new DraftCNDBCursor(changeNumber);
@@ -362,7 +362,7 @@
     * @throws ChangelogException
     *           when the startChangeNumber does not exist.
     */
    private DraftCNDBCursor(int startChangeNumber) throws ChangelogException
    private DraftCNDBCursor(long startChangeNumber) throws ChangelogException
    {
      this.key = new ReplicationDraftCNKey(startChangeNumber);
      this.entry = new DatabaseEntry();
@@ -392,7 +392,7 @@
            // We could not move the cursor to the expected startChangeNumber
            if (localCursor.getSearchKeyRange(key, entry, DEFAULT) != SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
              // We could not even move the cursor close to it => failure
              throw new ChangelogException(
                  Message.raw("ChangeLog Change Number " + startChangeNumber
                      + " is not available"));
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -65,7 +65,7 @@
 * This class publishes some monitoring information below <code>
 * cn=monitor</code>.
 */
public class DraftCNDbHandler implements ChangeNumberIndexDB
public class DraftCNDbHandler implements ChangeNumberIndexDB, Runnable
{
  /**
   * The tracer object for the debug logger.
@@ -136,7 +136,7 @@
  /** {@inheritDoc} */
  @Override
  public synchronized void add(int changeNumber, String previousCookie,
  public synchronized void add(long changeNumber, String previousCookie,
      String baseDN, CSN csn)
  {
    db.addEntry(changeNumber, previousCookie, baseDN, csn);
@@ -152,14 +152,14 @@
  /** {@inheritDoc} */
  @Override
  public int getFirstChangeNumber()
  public long getFirstChangeNumber()
  {
    return db.readFirstChangeNumber();
  }
  /** {@inheritDoc} */
  @Override
  public int getLastChangeNumber()
  public long getLastChangeNumber()
  {
    return db.readLastChangeNumber();
  }
@@ -207,7 +207,7 @@
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  DraftCNDBCursor getReadCursor(int startChangeNumber)
  DraftCNDBCursor getReadCursor(long startChangeNumber)
      throws ChangelogException
  {
    return db.openReadCursor(startChangeNumber);
@@ -215,7 +215,7 @@
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexDBCursor getCursorFrom(int startChangeNumber)
  public ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber)
      throws ChangelogException
  {
    return new DraftCNDbIterator(db, startChangeNumber);
@@ -516,7 +516,7 @@
  /** {@inheritDoc} */
  @Override
  public String getPreviousCookie(int changeNumber)
  public String getPreviousCookie(long changeNumber)
  {
    DraftCNDBCursor cursor = null;
    try
@@ -537,7 +537,7 @@
  /** {@inheritDoc} */
  @Override
  public CSN getCSN(int changeNumber)
  public CSN getCSN(long changeNumber)
  {
    DraftCNDBCursor cursor = null;
    try
@@ -558,7 +558,7 @@
  /**{@inheritDoc}*/
  @Override
  public String getBaseDN(int changeNumber)
  public String getBaseDN(long changeNumber)
  {
    DraftCNDBCursor cursor = null;
    try
@@ -577,7 +577,7 @@
    }
  }
  private void debugException(String methodName, int changeNumber, Exception e)
  private void debugException(String methodName, long changeNumber, Exception e)
  {
    if (debugEnabled())
      TRACER.debugInfo("In DraftCNDbHandler." + methodName + "(), read: "
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbIterator.java
@@ -56,7 +56,7 @@
   * @throws ChangelogException
   *           If a database problem happened.
   */
  public DraftCNDbIterator(DraftCNDB db, int startChangeNumber)
  public DraftCNDbIterator(DraftCNDB db, long startChangeNumber)
      throws ChangelogException
  {
    draftCNDbCursor = db.openReadCursor(startChangeNumber);
@@ -98,7 +98,7 @@
  /** {@inheritDoc} */
  @Override
  public int getChangeNumber()
  public long getChangeNumber()
  {
    return ((ReplicationDraftCNKey) draftCNDbCursor.getKey()).getChangeNumber();
  }
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -160,4 +160,14 @@
  {
    close();
  }
  /** {@inheritDoc} */
  @Override
  public int compareTo(ReplicaDBCursor o)
  {
    final CSN csn1 = getChange().getCSN();
    final CSN csn2 = o.getChange().getCSN();
    return CSN.compare(csn1, csn2);
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDraftCNKey.java
@@ -42,16 +42,14 @@
   * Creates a new ReplicationKey from the given change number.
   * @param changeNumber The change number to use.
   */
  public ReplicationDraftCNKey(int changeNumber)
  public ReplicationDraftCNKey(long changeNumber)
  {
    try
    {
      String s = String.valueOf(changeNumber);
      int a = 16-s.length();
      String sscn = "0000000000000000".substring(0, a) + s;
      // Should it use StaticUtils.getBytes() to increase performances?
      setData(sscn.getBytes("UTF-8"));
    } catch (UnsupportedEncodingException e)
      setData(String.format("%016d", changeNumber).getBytes("UTF-8"));
    }
    catch (UnsupportedEncodingException e)
    {
      // Should never happens, UTF-8 is always supported
      // TODO : add better logging
@@ -60,10 +58,11 @@
  /**
   * Getter for the change number associated with this key.
   *
   * @return the change number associated with this key.
   */
  public int getChangeNumber()
  public long getChangeNumber()
  {
    return Integer.valueOf(new String(getData()));
    return Long.valueOf(new String(getData()));
  }
}
opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java
@@ -1007,7 +1007,7 @@
      String clearLDIFchanges,
      String targetUUID,
      List<RawAttribute> includedAttributes,
      int changenumber,
      long changenumber,
      String changetype,
      String changeInitiatorsName)
  throws DirectoryException
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -2924,7 +2924,7 @@
      ReplicationServer rs = eclwe.getReplicationServer();
      rs.disableEligibility(excludedDomains);
      long t1 = TimeThread.getTime();
      int[] limits = replicationServer.getECLChangeNumberLimits(
      long[] limits = replicationServer.getECLChangeNumberLimits(
          replicationServer.getEligibleCSN(), excludedDomains);
      assertEquals(limits[1], maxMsg);
      long t2 = TimeThread.getTime();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
@@ -110,7 +110,7 @@
      handler.add(cn3, value3, baseDN3, csn3);
      // The ChangeNumber should not get purged
      final int firstChangeNumber = handler.getFirstChangeNumber();
      final long firstChangeNumber = handler.getFirstChangeNumber();
      assertEquals(firstChangeNumber, cn1);
      assertEquals(handler.getLastChangeNumber(), cn3);