mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
29.35.2010 7c30dbb5403772b323df3ad907d9ed15d23b5aee
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -37,6 +37,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.zip.DataFormatException;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
@@ -67,6 +68,44 @@
  // close the db (shutdown or clear).
  private ReentrantReadWriteLock dbCloseLock;
  // Change counter management
  // The Db itself does not allow to count records between a start and an end
  // change. And we cannot rely on the replication seqnum that is part of the
  // changenumber, since there can be holes (when an operation is canceled).
  // And traversing all the records from the start one to the end one works
  // fine but can be very long (ECL:lastChangeNumber).
  //
  // So we are storing special records in the DB (called counter records),
  // that contain the number of changes since the previous counter record.
  // One special record is :
  // - a special key : changetime , serverid=0  seqnum=0
  // - a counter value : count of changes since previous counter record.
  //
  // A counter record has to follow the order of the db, so it needs to have
  // a changenumber key that follow the order.
  // A counter record must have its own chagenumber key since the Db does not
  // support duplicate key (it is a compatibility breaker character of the DB).
  //
  // We define 2 conditions to store a counter record :
  // 1/- at least 'counterWindowSize' changes have been stored in the Db
  //     since the previous counter record
  // 2/- the change to be stored has a new timestamp - so that the counter
  //     record is the first record for this timestamp.
  //
  private int  counterCurrValue = 1;
  // Current value of the counter.
  private long counterTsLimit = 0;
  // When not null,
  // the next change with a ts different from tsForNewCounterRecord will lead
  // to store a new counterRecord.
  private int  counterWindowSize = 1000;
  // The counter record will never be written to the db more often than each
  // counterWindowSize changes.
 /**
   * Creates a new database or open existing database that will be used
   * to store and retrieve changes from an LDAP server.
@@ -92,6 +131,64 @@
        true).getGenerationId());
    dbCloseLock = new ReentrantReadWriteLock(true);
    //
    Cursor cursor = null;
    Transaction txn = null;
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    OperationStatus status;
    int distBackToCounterRecord = 0;
    // Initialize counter
    this.counterCurrValue = 1;
    cursor = db.openCursor(txn, null);
    status = cursor.getLast(key, data, LockMode.DEFAULT);
    while (status == OperationStatus.SUCCESS)
    {
      try
      {
        ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8"));
        if (!ReplicationDB.isaCounter(cn))
        {
          status = cursor.getPrev(key, data, LockMode.DEFAULT);
          distBackToCounterRecord++;
        }
        else
        {
          // counter record
          counterCurrValue = decodeCounterValue(data.getData())+1;
          counterTsLimit = cn.getTime();
          break;
        }
      }
      catch (UnsupportedEncodingException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
        if (txn != null)
        {
          try
          {
            txn.abort();
          } catch (DatabaseException e1)
          {
            // can't do much more. The ReplicationServer is shuting down.
          }
        }
        replicationServer.shutdown();
      }
      catch (DataFormatException e)
      {
        // Should never happen
      }
    }
    counterCurrValue += distBackToCounterRecord;
    cursor.close();
  }
  /**
@@ -123,9 +220,31 @@
          {
            DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
            DatabaseEntry data = new ReplicationData(change);
            db.put(txn, key, data);
          }
            if ((counterCurrValue!=0) &&
                (counterCurrValue%counterWindowSize == 0))
            {
              // enough changes to generate a counter record - wait for the next
              // change fo time
              counterTsLimit = change.getChangeNumber().getTime();
            }
            if ((counterTsLimit!=0)
                && (change.getChangeNumber().getTime() != counterTsLimit))
            {
              // Write the counter record
              DatabaseEntry counterKey = new ReplicationKey(
                  new ChangeNumber(
                  change.getChangeNumber().getTime(),
                  0, 0));
              DatabaseEntry counterValue =
                encodeCounterValue(counterCurrValue-1);
              db.put(txn, counterKey, counterValue);
              counterTsLimit=0;
            }
            db.put(txn, key, data);
            counterCurrValue++;
          }
          txn.commitWriteNoSync();
          txn = null;
          done = true;
@@ -275,6 +394,7 @@
  {
    Cursor cursor = null;
    String str = null;
    ChangeNumber cn = null;
    try
    {
@@ -301,11 +421,25 @@
        try
        {
          str = new String(key.getData(), "UTF-8");
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            // First record is a counter record .. go next
            status = cursor.getNext(key, data, LockMode.DEFAULT);
            if (status != OperationStatus.SUCCESS)
            {
              // DB contains only a counter record
              return null;
            }
            else
            {
              cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
            }
          }
        } catch (UnsupportedEncodingException e)
        {
          // never happens
        }
        return new ChangeNumber(str);
      }
      finally
      {
@@ -320,8 +454,9 @@
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      return null;
      cn = null;
    }
    return cn;
  }
  /**
@@ -331,7 +466,7 @@
  public ChangeNumber readLastChange()
  {
    Cursor cursor = null;
    String str = null;
    ChangeNumber cn = null;
    try
    {
@@ -349,13 +484,23 @@
        }
        try
        {
          str = new String(key.getData(), "UTF-8");
          String str = new String(key.getData(), "UTF-8");
          cn = new ChangeNumber(str);
          if (ReplicationDB.isaCounter(cn))
          {
            if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            {
              /* database only contain a counter record - don't know
               * how much it can be possible but ... */
              cn = null;
            }
          }
        }
        catch (UnsupportedEncodingException e)
        {
          // never happens
        }
        return new ChangeNumber(str);
      }
      finally
      {
@@ -369,8 +514,9 @@
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
      return null;
      cn = null;
    }
    return cn;
  }
  /**
@@ -611,7 +757,14 @@
        {
          return null;
        }
        try {
        try
        {
          ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8"));
          if(ReplicationDB.isaCounter(cn))
          {
            // counter record
            continue;
          }
          currentChange = ReplicationData.generateChange(data.getData());
        } catch (Exception e) {
          /*
@@ -681,4 +834,224 @@
      dbCloseLock.writeLock().unlock();
    }
  }
  /**
   * Count the number of changes between 2 changes numbers (inclusive).
   * @param start The lower limit of the count.
   * @param stop The higher limit of the count.
   * @return The number of changes between provided start and stop changeNumber.
   * Returns -1 when an error occurs.
   */
  public int count(ChangeNumber start, ChangeNumber stop)
  {
    int counterRecord1 = 0;
    int counterRecord2 = 0;
    int distToCounterRecord1 = 0;
    int distBackToCounterRecord2 = 0;
    int count=0;
    Cursor cursor = null;
    Transaction txn = null;
    OperationStatus status;
    try
    {
      ChangeNumber cn ;
      if ((start==null)&&(stop==null))
        return (int)db.count();
      // Step 1 : from the start point, traverse db to the next counter record
      // or to the stop point.
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(txn, null);
      if (start != null)
      {
        key = new ReplicationKey(start);
        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
          status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
      }
      else
      {
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      while (status == OperationStatus.SUCCESS)
      {
        // test whether the record is a regular change or a counter
        String csnString = new String(key.getData(), "UTF-8");
        cn = new ChangeNumber(csnString);
        if (cn.getServerId() != 0)
        {
          // reached a regular change record
          // test whether we reached the 'stop' target
          if (!cn.newer(stop))
          {
            // let's loop
            distToCounterRecord1++;
            status = cursor.getNext(key, data, LockMode.DEFAULT);
          }
          else
          {
            // reached the end
            break;
          }
        }
        else
        {
          // counter record
          counterRecord1 = decodeCounterValue(data.getData());
          break;
        }
      }
      cursor.close();
      // cases
      //
      if (counterRecord1==0)
        return distToCounterRecord1;
      // Step 2 : from the stop point, traverse db to the next counter record
      // or to the start point.
      txn = null;
      data = new DatabaseEntry();
      key = new ReplicationKey(stop);
      cursor = db.openCursor(txn, null);
      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
      if (status == OperationStatus.SUCCESS)
      {
        cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
      }
      else
      {
        key = new DatabaseEntry();
        data = new DatabaseEntry();
        status = cursor.getLast(key, data, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          /* database is empty */
          return 0;
        }
      }
      while (status == OperationStatus.SUCCESS)
      {
        cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
        if (!ReplicationDB.isaCounter(cn))
        {
          // regular change record
          if (!cn.older(start))
          {
            distBackToCounterRecord2++;
            status = cursor.getPrev(key, data, LockMode.DEFAULT);
          }
          else
            break;
        }
        else
        {
          // counter record
          counterRecord2 = decodeCounterValue(data.getData());
          break;
        }
      }
      cursor.close();
      // Step 3 : Now consolidates the result
      if (counterRecord1!=0)
      {
        if (counterRecord1 == counterRecord2)
        {
          // only one cp between from and to - no need to use it
          count = distToCounterRecord1 + distBackToCounterRecord2;
        }
        else
        {
          // 2 cp between from and to
          count = distToCounterRecord1 + (counterRecord2-counterRecord1)
            + distBackToCounterRecord2;
        }
      }
    }
    catch (UnsupportedEncodingException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      replicationServer.shutdown();
    }
    catch (DataFormatException e)
    {
      // Should never happen
    }
    finally
    {
      if (cursor != null)
        cursor.close();
      if (txn != null)
      {
        try
        {
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The ReplicationServer is shuting down.
        }
      }
    }
    return count;
  }
  /**
   * Test if a provided changeNumber represents a counter record.
   * @param cn The provided changeNumber.
   * @return True if the provided changenumber is a counter.
   */
  static private boolean isaCounter(ChangeNumber cn)
  {
    return ((cn.getServerId()== 0) && (cn.getSeqnum()==0));
  }
  /**
   * Decode the provided database entry as a the value of a counter.
   * @param entry The provided entry.
   * @return The counter value.
   * @throws DataFormatException
   */
  private static int decodeCounterValue(byte[] entry)
  throws DataFormatException
  {
    try
    {
      String numAckStr = new String(entry, 0, entry.length, "UTF-8");
      return Integer.parseInt(numAckStr);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Encode the provided counter value in a database entry.
   * @param entry The provided entry.
   * @return The databse entry with the counter value encoded inside..
   * @throws UnsupportedEncodingException
   */
  static private DatabaseEntry encodeCounterValue(int value)
  throws UnsupportedEncodingException
  {
    DatabaseEntry entry = new DatabaseEntry();
    entry.setData(String.valueOf(value).getBytes("UTF-8"));
    return entry;
  }
  /**
   * Set the counter writing window size (public method for unit tests only).
   * @param size Size in number of record.
   */
  public void setCounterWindowSize(int size)
  {
    this.counterWindowSize = size;
  }
}