mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.51.2014 819f74758a1c464bbf578e70ca8592cc8d101d75
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -28,29 +28,21 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class is used for managing the replicationServer database for each
@@ -62,7 +54,7 @@
 * This class publishes some monitoring information below <code>
 * cn=monitor</code>.
 */
public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable
public class JEChangeNumberIndexDB implements ChangeNumberIndexDB
{
  /**
   * The tracer object for the debug logger.
@@ -74,7 +66,7 @@
  /** FIXME What is this field used for? */
  private volatile long oldestChangeNumber = NO_KEY;
  /**
   * The newest changenumber stored in the DB. It is used to avoid trimming the
   * The newest changenumber stored in the DB. It is used to avoid purging the
   * record with the newest changenumber. The newest record in the changenumber
   * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
   * then retrieved on server startup.
@@ -86,48 +78,25 @@
   * condition between:
   * <ol>
   * <li>this atomic long being incremented for a new record ('recordB')</li>
   * <li>the current newest record ('recordA') being trimmed from the DB</li>
   * <li>the current newest record ('recordA') being purged from the DB</li>
   * <li>'recordB' failing to be inserted in the DB</li>
   * </ol>
   */
  private final AtomicLong lastGeneratedChangeNumber;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  /**
   * A dedicated thread loops trim().
   * <p>
   * trim() : deletes from the DB a number of changes that are older than a
   * certain date.
   */
  private DirectoryThread trimmingThread;
  /**
   * The trim age in milliseconds. Changes record in the change DB that are
   * older than this age are removed.
   * <p>
   * FIXME it never gets updated even when the replication server purge delay is
   * updated
   */
  private volatile long trimAge;
  private ReplicationServer replicationServer;
  /**
   * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
   *
   * @param replicationServer The ReplicationServer that creates this instance.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * @param dbEnv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @throws ChangelogException If a database problem happened
   */
  public JEChangeNumberIndexDB(ReplicationServer replicationServer,
      ReplicationDbEnv dbenv) throws ChangelogException
  public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
  {
    this.replicationServer = replicationServer;
    this.trimAge = replicationServer.getTrimAge();
    // DB initialization
    db = new DraftCNDB(dbenv);
    db = new DraftCNDB(dbEnv);
    final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
    final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
    oldestChangeNumber = getChangeNumber(oldestRecord);
@@ -142,16 +111,6 @@
    DirectoryServer.registerMonitorProvider(dbMonitor);
  }
  /**
   * Creates and starts the thread trimming the CNIndexDB.
   */
  public void startTrimmingThread()
  {
    trimmingThread =
        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
    trimmingThread.start();
  }
  private long getChangeNumber(ChangeNumberIndexRecord record)
      throws ChangelogException
  {
@@ -251,77 +210,82 @@
      notifyAll();
    }
    if (trimmingThread != null)
    {
      try
      {
        trimmingThread.join();
      }
      catch (InterruptedException ignored)
      {
        // Nothing can be done about it, just proceed
      }
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
  }
  /**
   * Run method for this class.
   * Periodically Flushes the ReplicationServerDomain cache from memory to the
   * stable storage and trims the old updates.
   * Synchronously purges the change number index DB up to and excluding the
   * provided timestamp.
   *
   * @param purgeTimestamp
   *          the timestamp up to which purging must happen
   * @return the {@link MultiDomainServerState} object that drives purging the
   *         replicaDBs.
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  @Override
  public void run()
  public MultiDomainServerState purgeUpTo(long purgeTimestamp)
      throws ChangelogException
  {
    while (!shutdown.get())
    if (isEmpty())
    {
      try {
        trim(shutdown);
      return null;
    }
        synchronized (this)
        {
          if (!shutdown.get())
          {
            try
            {
              wait(1000);
            }
            catch (InterruptedException e)
            {
              Thread.currentThread().interrupt();
            }
          }
        }
      }
      catch (Exception end)
    final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
    final DraftCNDBCursor cursor = db.openDeleteCursor();
    try
    {
      while (!mustShutdown(shutdown) && cursor.next())
      {
        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
            .get(stackTraceToSingleLineString(end)));
        if (replicationServer != null)
        final ChangeNumberIndexRecord record = cursor.currentRecord();
        if (record.getChangeNumber() != oldestChangeNumber)
        {
          replicationServer.shutdown();
          oldestChangeNumber = record.getChangeNumber();
        }
        break;
        if (record.getChangeNumber() == newestChangeNumber)
        {
          // do not purge the newest record to avoid having the last generated
          // changenumber dropping back to 0 if the server restarts
          return getPurgeCookie(record);
        }
        if (record.getCSN().isOlderThan(purgeCSN))
        {
          cursor.delete();
        }
        else
        {
          // Current record is not old enough to purge.
          return getPurgeCookie(record);
        }
      }
      return null;
    }
    catch (ChangelogException e)
    {
      cursor.abort();
      throw e;
    }
    catch (Exception e)
    {
      cursor.abort();
      throw new ChangelogException(e);
    }
    finally
    {
      cursor.close();
    }
  }
  /**
   * Trim old changes from this database.
   *
   * @param shutdown
   *          AtomicBoolean telling whether the current run must be stopped
   * @throws ChangelogException
   *           In case of database problem.
   */
  public void trim(AtomicBoolean shutdown) throws ChangelogException
  private MultiDomainServerState getPurgeCookie(
      final ChangeNumberIndexRecord record) throws DirectoryException
  {
    if (trimAge == 0)
      return;
    clear(null, shutdown);
    // Do not include the record's CSN to avoid having it purged
    return new MultiDomainServerState(record.getPreviousCookie());
  }
  /**
@@ -334,127 +298,49 @@
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  public void clear(DN baseDNToClear) throws ChangelogException
  {
    clear(baseDNToClear, null);
  }
  private void clear(DN baseDNToClear, AtomicBoolean shutdown)
      throws ChangelogException
  public void removeDomain(DN baseDNToClear) throws ChangelogException
  {
    if (isEmpty())
    {
      return;
    }
    for (int i = 0; i < 100; i++)
    final DraftCNDBCursor cursor = db.openDeleteCursor();
    try
    {
      if (mustShutdown(shutdown))
      boolean isOldestRecord = true;
      while (!mustShutdown(shutdown) && cursor.next())
      {
        return;
      }
      final DraftCNDBCursor cursor = db.openDeleteCursor();
      try
      {
        for (int j = 0; j < 50; j++)
        final ChangeNumberIndexRecord record = cursor.currentRecord();
        if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
        {
          // let's traverse the CNIndexDB
          if (mustShutdown(shutdown) || !cursor.next())
          {
            cursor.close();
            return;
          }
          final ChangeNumberIndexRecord record = cursor.currentRecord();
          if (record.getChangeNumber() != oldestChangeNumber)
          {
            oldestChangeNumber = record.getChangeNumber();
          }
          if (record.getChangeNumber() == newestChangeNumber)
          {
            // do not trim the newest record to avoid having the last generated
            // changenumber dropping back to 0 if the server restarts
            cursor.close();
            return;
          }
          if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN()))
          {
            cursor.delete();
            continue;
          }
          final ReplicationServerDomain domain =
              replicationServer.getReplicationServerDomain(record.getBaseDN());
          if (domain == null)
          {
            // the domain has been removed since the record was written in the
            // CNIndexDB, thus it makes no sense to keep this record in the DB.
            cursor.delete();
            continue;
          }
          // FIXME there is an opportunity for a phantom record in the CNIndexDB
          // if the replicaDB gets purged after call to domain.getOldestState().
          final CSN csn = record.getCSN();
          final ServerState oldestState = domain.getOldestState();
          final CSN fcsn = oldestState.getCSN(csn.getServerId());
          if (csn.isOlderThan(fcsn))
          {
            // This change which has already been purged from the corresponding
            // replicaDB => purge it from CNIndexDB
            cursor.delete();
            continue;
          }
          ServerState csnVector;
          try
          {
            Map<DN, ServerState> csnStartStates =
                MultiDomainServerState.splitGenStateToServerStates(
                        record.getPreviousCookie());
            csnVector = csnStartStates.get(record.getBaseDN());
            if (debugEnabled())
              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
                  + csnVector + " -- StartState:" + oldestState);
          }
          catch(Exception e)
          {
            // We could not parse the MultiDomainServerState from the record
            // FIXME this is quite an aggressive delete()
            cursor.delete();
            continue;
          }
          if (csnVector == null
              || (csnVector.getCSN(csn.getServerId()) != null
                    && !csnVector.cover(oldestState)))
          {
            cursor.delete();
            if (debugEnabled())
              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn
                  + "Not covering startState");
            continue;
          }
          oldestChangeNumber = record.getChangeNumber();
          cursor.close();
        }
        if (record.getChangeNumber() == newestChangeNumber)
        {
          // do not purge the newest record to avoid having the last generated
          // changenumber dropping back to 0 if the server restarts
          return;
        }
        cursor.close();
        if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear))
        {
          cursor.delete();
        }
        else
        {
          isOldestRecord = false;
        }
      }
      catch (ChangelogException e)
      {
        cursor.abort();
        throw e;
      }
      catch (Exception e)
      {
        cursor.abort();
        throw new ChangelogException(e);
      }
    }
    catch (ChangelogException e)
    {
      cursor.abort();
      throw e;
    }
    finally
    {
      cursor.close();
    }
  }
@@ -469,9 +355,7 @@
   */
  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
  {
    /**
     * {@inheritDoc}
     */
    /** {@inheritDoc} */
    @Override
    public List<Attribute> getMonitorData()
    {
@@ -509,18 +393,14 @@
      return 0;
    }
    /**
     * {@inheritDoc}
     */
    /** {@inheritDoc} */
    @Override
    public String getMonitorInstanceName()
    {
      return "ChangeNumber Index Database";
    }
    /**
     * {@inheritDoc}
     */
    /** {@inheritDoc} */
    @Override
    public void initializeMonitorProvider(MonitorProviderCfg configuration)
                            throws ConfigException,InitializationException
@@ -529,9 +409,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
@@ -540,15 +418,6 @@
  }
  /**
   * Set the Purge delay for this db Handler.
   * @param delay The purge delay in Milliseconds.
   */
  public void setPurgeDelay(long delay)
  {
    trimAge = delay;
  }
  /**
   * Clear the changes from this DB (from both memory cache and DB storage).
   *
   * @throws ChangelogException