mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.17.2014 88cfe5045d77d433ce02b0ef10ee84c9d4fb15e2
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -34,6 +34,8 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
@@ -56,14 +58,14 @@
 * <p>
 * This is the only class that should have code using the BDB interfaces.
 */
public class ReplicationDB
class ReplicationDB
{
  private Database db;
  private ReplicationDbEnv dbEnv;
  private ReplicationServer replicationServer;
  private int serverId;
  private DN baseDN;
  private final ReplicationDbEnv dbEnv;
  private final ReplicationServer replicationServer;
  private final int serverId;
  private final DN baseDN;
  /**
   * The lock used to provide exclusive access to the thread that close the db
@@ -120,7 +122,7 @@
   * @param dbEnv The Db environment to use to create the db.
   * @throws ChangelogException If a database problem happened
   */
  public ReplicationDB(int serverId, DN baseDN,
  ReplicationDB(int serverId, DN baseDN,
      ReplicationServer replicationServer, ReplicationDbEnv dbEnv)
      throws ChangelogException
  {
@@ -188,7 +190,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void addEntry(UpdateMsg change) throws ChangelogException
  void addEntry(UpdateMsg change) throws ChangelogException
  {
    dbCloseLock.readLock().lock();
    try
@@ -200,7 +202,9 @@
      }
      final DatabaseEntry key = createReplicationKey(change.getCSN());
      final DatabaseEntry data = new ReplicationData(change);
      // Always keep messages in the replication DB with the current protocol
      // version
      final DatabaseEntry data = new DatabaseEntry(change.getBytes());
      insertCounterRecordIfNeeded(change.getCSN());
      db.put(null, key, data);
@@ -256,7 +260,7 @@
  /**
   * Shutdown the database.
   */
  public void shutdown()
  void shutdown()
  {
    dbCloseLock.writeLock().lock();
    try
@@ -286,8 +290,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public ReplServerDBCursor openReadCursor(CSN startCSN)
      throws ChangelogException
  ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
  {
    return new ReplServerDBCursor(startCSN);
  }
@@ -301,7 +304,7 @@
   *
   * @return The ReplServerDBCursor.
   */
  public ReplServerDBCursor openDeleteCursor() throws ChangelogException
  ReplServerDBCursor openDeleteCursor() throws ChangelogException
  {
    return new ReplServerDBCursor();
  }
@@ -325,7 +328,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN readOldestCSN() throws ChangelogException
  CSN readOldestCSN() throws ChangelogException
  {
    dbCloseLock.readLock().lock();
@@ -381,7 +384,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN readNewestCSN() throws ChangelogException
  CSN readNewestCSN() throws ChangelogException
  {
    dbCloseLock.readLock().lock();
@@ -432,93 +435,7 @@
    }
  }
  /**
   * Try to find in the DB, the CSN right before the one passed as a parameter.
   *
   * @param csn
   *          The CSN from which we start searching.
   * @return the CSN right before the one passed as a parameter. Can return null
   *         if there is none.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN getPreviousCSN(CSN csn) throws ChangelogException
  {
    if (csn == null)
    {
      return null;
    }
    dbCloseLock.readLock().lock();
    Cursor cursor = null;
    try
    {
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        return null;
      }
      DatabaseEntry key = createReplicationKey(csn);
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == SUCCESS)
      {
        // We can move close to the CSN.
        // Let's move to the previous change.
        if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
        {
          return getRegularRecord(cursor, key, data);
        }
        // else, there was no change previous to our CSN.
      }
      else
      {
        // We could not move the cursor past to the CSN
        // Check if the last change is older than CSN
        if (cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS)
        {
          return getRegularRecord(cursor, key, data);
        }
      }
    }
    catch (DatabaseException e)
    {
      throw new ChangelogException(e);
    }
    finally
    {
      closeAndReleaseReadLock(cursor);
    }
    return null;
  }
  private CSN getRegularRecord(Cursor cursor, DatabaseEntry key,
      DatabaseEntry data) throws DatabaseException
  {
    final CSN csn = toCSN(key.getData());
    if (!isACounterRecord(csn))
    {
      return csn;
    }
    // There cannot be 2 counter record next to each other,
    // it is safe to return previous record which must exist
    if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
    {
      return toCSN(key.getData());
    }
    // database only contain a counter record, which should not be possible
    // let's just say no CSN
    return null;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
@@ -529,7 +446,7 @@
   * This Class implements a cursor that can be used to browse a
   * replicationServer database.
   */
  public class ReplServerDBCursor implements Closeable
  class ReplServerDBCursor implements Closeable
  {
    /**
     * The transaction that will protect the actions done with the cursor.
@@ -713,7 +630,7 @@
     * (per the Cursor documentation).
     * This should not be used in any other case.
     */
    public void abort()
    void abort()
    {
      synchronized (this)
      {
@@ -735,7 +652,7 @@
     * @throws ChangelogException
     *           In case of underlying database problem.
     */
    public CSN nextCSN() throws ChangelogException
    CSN nextCSN() throws ChangelogException
    {
      if (isClosed)
      {
@@ -761,7 +678,7 @@
     *
     * @return the next UpdateMsg.
     */
    public UpdateMsg next()
    UpdateMsg next()
    {
      if (isClosed)
      {
@@ -791,7 +708,8 @@
          {
            continue;
          }
          currentChange = ReplicationData.generateChange(data.getData());
          currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
              data.getData(), ProtocolVersion.getCurrentVersion());
        }
        catch (Exception e)
        {
@@ -806,7 +724,7 @@
           */
          Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD
              .get(replicationServer.getServerId(),
                  (csn == null ? "" : csn.toString()),
                  (csn != null ? csn.toString() : ""),
                  e.getMessage());
          logError(message);
        }
@@ -819,7 +737,7 @@
     *
     * @throws ChangelogException In case of database problem.
     */
    public void delete() throws ChangelogException
    void delete() throws ChangelogException
    {
      if (isClosed)
      {
@@ -842,7 +760,7 @@
   *
   * @throws ChangelogException In case of database problem.
   */
  public void clear() throws ChangelogException
  void clear() throws ChangelogException
  {
    // The coming users will be blocked until the clear is done
    dbCloseLock.writeLock().lock();
@@ -912,7 +830,7 @@
   * Encode the provided counter value in a database entry.
   * @return The database entry with the counter value encoded inside.
   */
  static private DatabaseEntry encodeCounterValue(int value)
  private static DatabaseEntry encodeCounterValue(int value)
  {
    DatabaseEntry entry = new DatabaseEntry();
    entry.setData(getBytes(String.valueOf(value)));