mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.57.2014 12db845ee284503024cd2ebd62e6549d5cc42b77
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.io.Closeable;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.locks.ReadWriteLock;
@@ -39,6 +38,8 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -244,15 +245,18 @@
  private DatabaseEntry createReplicationKey(CSN csn)
  {
    DatabaseEntry key = new DatabaseEntry();
    try
    final DatabaseEntry key = new DatabaseEntry();
    if (csn != null)
    {
      key.setData(csn.toString().getBytes("UTF-8"));
    }
    catch (UnsupportedEncodingException e)
    {
      // Should never happens, UTF-8 is always supported
      // TODO : add better logging
      try
      {
        key.setData(csn.toString().getBytes("UTF-8"));
      }
      catch (UnsupportedEncodingException e)
      {
        // Should never happens, UTF-8 is always supported
        // TODO : add better logging
      }
    }
    return key;
  }
@@ -285,13 +289,15 @@
   * @param startCSN
   *          The CSN from which the cursor must start.If null, start from the
   *          oldest CSN
   * @param positionStrategy
   *          indicates at which exact position the cursor must start
   * @return The ReplServerDBCursor.
   * @throws ChangelogException
   *           If a database problem happened
   */
  ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
  ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
  {
    return new ReplServerDBCursor(startCSN);
    return new ReplServerDBCursor(startCSN, positionStrategy);
  }
  /**
@@ -445,7 +451,7 @@
   * This Class implements a cursor that can be used to browse a
   * replicationServer database.
   */
  class ReplServerDBCursor implements Closeable
  class ReplServerDBCursor implements DBCursor<UpdateMsg>
  {
    /**
     * The transaction that will protect the actions done with the cursor.
@@ -454,12 +460,14 @@
     * <p>
     * Will be set non null for a write cursor
     */
    private final Transaction txn;
    private final Cursor cursor;
    private final DatabaseEntry key;
    private final DatabaseEntry data;
    /** \@Null for read cursors, \@NotNull for deleting cursors. */
    private final Transaction txn;
    private UpdateMsg currentRecord;
    private boolean isClosed = false;
    private boolean isClosed;
    /**
     * Creates a ReplServerDBCursor that can be used for browsing a
@@ -467,21 +475,15 @@
     *
     * @param startCSN
     *          The CSN from which the cursor must start.
     * @param positionStrategy
     *          indicates at which exact position the cursor must start
     * @throws ChangelogException
     *           When the startCSN does not exist.
     */
    private ReplServerDBCursor(CSN startCSN) throws ChangelogException
    private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
    {
      if (startCSN != null)
      {
        key = createReplicationKey(startCSN);
      }
      else
      {
        key = new DatabaseEntry();
      }
      key = createReplicationKey(startCSN);
      data = new DatabaseEntry();
      txn = null;
      // Take the lock. From now on, whatever error that happen in the life
@@ -515,18 +517,25 @@
            return;
          }
          // We can move close to the startCSN.
          // Let's create a cursor from that point.
          DatabaseEntry aKey = new DatabaseEntry();
          DatabaseEntry aData = new DatabaseEntry();
          if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS)
          if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
          {
            localCursor.close();
            localCursor = db.openCursor(txn, null);
            // We can move close to the startCSN.
            // Let's create a cursor from that point.
            key.setData(null);
            if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
            {
              localCursor.close();
              localCursor = db.openCursor(txn, null);
            }
          }
        }
        cursor = localCursor;
        cursorHeld = cursor != null;
        if (key.getData() != null)
        {
          computeCurrentRecord();
        }
      }
      catch (DatabaseException e)
      {
@@ -604,6 +613,7 @@
          return;
        }
        isClosed = true;
        currentRecord = null;
      }
      closeAndReleaseReadLock(cursor);
@@ -658,6 +668,7 @@
        return null;
      }
      currentRecord = null;
      try
      {
        if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
@@ -672,60 +683,73 @@
      }
    }
    /**
     * Get the next UpdateMsg from this cursor.
     *
     * @return the next UpdateMsg.
     */
    UpdateMsg next()
    /** {@inheritDoc} */
    @Override
    public boolean next() throws ChangelogException
    {
      if (isClosed)
      {
        return null;
        return false;
      }
      UpdateMsg currentChange = null;
      while (currentChange == null)
      currentRecord = null;
      while (currentRecord == null)
      {
        try
        {
          if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
          {
            return null;
            return false;
          }
        }
        catch (DatabaseException e)
        {
          return null;
          throw new ChangelogException(e);
        }
        CSN csn = null;
        try
        {
          csn = toCSN(key.getData());
          if (isACounterRecord(csn))
          {
            continue;
          }
          currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
              data.getData(), ProtocolVersion.getCurrentVersion());
        }
        catch (Exception e)
        {
          /*
           * An error happening trying to convert the data from the
           * replicationServer database to an Update LocalizableMessage. This can only
           * happen if the database is corrupted. There is not much more that we
           * can do at this point except trying to continue with the next
           * record. In such case, it is therefore possible that we miss some
           * changes.
           * TODO : This should be handled by the repair functionality.
           */
          logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
                  csn, e.getMessage());
        }
        computeCurrentRecord();
      }
      return currentChange;
      return currentRecord != null;
    }
    private void computeCurrentRecord()
    {
      CSN csn = null;
      try
      {
        csn = toCSN(key.getData());
        if (isACounterRecord(csn))
        {
          return;
        }
        currentRecord = toRecord(data.getData());
      }
      catch (Exception e)
      {
        /*
         * An error happening trying to convert the data from the
         * replicationServer database to an Update Message. This can only
         * happen if the database is corrupted. There is not much more that we
         * can do at this point except trying to continue with the next
         * record. In such case, it is therefore possible that we miss some
         * changes.
         * TODO : This should be handled by the repair functionality.
         */
        logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
            csn, e.getMessage());
      }
    }
    private UpdateMsg toRecord(final byte[] data) throws Exception
    {
      final short currentVersion = ProtocolVersion.getCurrentVersion();
      return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion);
    }
    /** {@inheritDoc} */
    @Override
    public UpdateMsg getRecord()
    {
      return currentRecord;
    }
    /**