mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
24.00.2014 ed2025b72da8144087d7f51eb318c465260704fe
FileChangelogDB.java, JEChangelogDB.java:
Synchronized the two files for easier code identification.
Code is really too much duplicated between the two implementations.
2 files modified
146 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 62 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 84 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -106,7 +106,6 @@
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
@@ -322,13 +321,11 @@
    if (indexer != null)
    {
      indexer.initiateShutdown();
      indexer.interrupt();
    }
    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
    if (purger != null)
    {
      purger.initiateShutdown();
      purger.interrupt();
    }
    try
@@ -641,6 +638,8 @@
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
    }
    // recycle exhausted cursors,
    // because client code will not manage the cursors itself
    return new CompositeDBCursor<Void>(cursors, true);
  }
@@ -762,17 +761,14 @@
            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
            if (oldestNotPurgedCSN == null)
            { // shutdown may have been initiated...
              if (!isShutdownInitiated())
              {
                // ... or the change number index DB is empty,
                // wait for new changes to come in.
              // ... or the change number index DB is empty,
              // wait for new changes to come in.
                // Note we cannot sleep for as long as the purge delay
                // (3 days default), because we might receive late updates
                // that will have to be purged before the purge delay elapses.
                // This can particularly happen in case of network partitions.
                sleep(DEFAULT_SLEEP);
              }
              // Note we cannot sleep for as long as the purge delay
              // (3 days default), because we might receive late updates
              // that will have to be purged before the purge delay elapses.
              // This can particularly happen in case of network partitions.
              jeFriendlySleep(DEFAULT_SLEEP);
              continue;
            }
          }
@@ -787,7 +783,7 @@
          latestPurgeDate = purgeTimestamp;
          sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
          jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
        }
        catch (InterruptedException e)
        {
@@ -804,6 +800,33 @@
      }
    }
    /**
     * This method implements a sleep() that is friendly to Berkeley JE.
     * <p>
     * Originally, {@link Thread#sleep(long)} was used , but waking up a
     * sleeping threads required calling {@link Thread#interrupt()}, and JE
     * threw exceptions when invoked on interrupted threads.
     * <p>
     * The solution is to replace:
     * <ol>
     * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
     * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
     * </ol>
     */
    private void jeFriendlySleep(long millis) throws InterruptedException
    {
      if (!isShutdownInitiated())
      {
        synchronized (this)
        {
          if (!isShutdownInitiated())
          {
            wait(millis);
          }
        }
      }
    }
    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
    {
      final long nextPurgeTime = notPurgedCSN.getTime();
@@ -816,5 +839,16 @@
      // wait a bit before purging more
      return DEFAULT_SLEEP;
    }
    /** {@inheritDoc} */
    @Override
    public void initiateShutdown()
    {
      super.initiateShutdown();
      synchronized (this)
      {
        notify(); // wake up the purger thread for faster shutdown
      }
    }
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -79,8 +79,7 @@
   * concurrent shutdown.
   */
  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
      domainToReplicaDBs =
          new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
      domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  private ReplicationDbEnv dbEnv;
  private ReplicationServerCfg config;
  private final File dbDirectory;
@@ -92,8 +91,7 @@
   * @GuardedBy("cnIndexDBLock")
   */
  private JEChangeNumberIndexDB cnIndexDB;
  private final AtomicReference<ChangeNumberIndexer> cnIndexer =
      new AtomicReference<ChangeNumberIndexer>();
  private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<ChangeNumberIndexer>();
  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
  private final Object cnIndexDBLock = new Object();
@@ -103,8 +101,7 @@
   * older than this delay might be removed.
   */
  private long purgeDelayInMillis;
  private final AtomicReference<ChangelogDBPurger> cnPurger =
      new AtomicReference<ChangelogDBPurger>();
  private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
  private volatile long latestPurgeDate;
  /** The local replication server. */
@@ -161,7 +158,7 @@
  private File makeDir(String dbDirName) throws ConfigException
  {
    // Check that this path exists or create it.
    File dbDirectory = getFileForPath(dbDirName);
    final File dbDirectory = getFileForPath(dbDirName);
    try
    {
      if (!dbDirectory.exists())
@@ -223,7 +220,7 @@
   *          the baseDN for which to create a ReplicaDB
   * @param serverId
   *          the serverId for which to create a ReplicaDB
   * @param rs
   * @param server
   *          the ReplicationServer
   * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
   *         to be created
@@ -231,14 +228,14 @@
   *           if a problem occurred with the database
   */
  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
      int serverId, ReplicationServer rs) throws ChangelogException
      int serverId, ReplicationServer server) throws ChangelogException
  {
    while (!shutdown.get())
    {
      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
          getExistingOrNewDomainMap(baseDN);
      final Pair<JEReplicaDB, Boolean> result =
          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs);
          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
      if (result != null)
      {
        return result;
@@ -276,7 +273,7 @@
  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
      DN baseDN, ReplicationServer rs) throws ChangelogException
      DN baseDN, ReplicationServer server) throws ChangelogException
  {
    // happy path: the JEReplicaDB already exists
    JEReplicaDB currentValue = domainMap.get(serverId);
@@ -298,16 +295,16 @@
      if (domainToReplicaDBs.get(baseDN) != domainMap)
      {
        // the domainMap could have been concurrently removed because
        // The domainMap could have been concurrently removed because
        // 1) a shutdown was initiated or 2) an initialize was called.
        // Return will allow the code to:
        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
        return null;
      }
      final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv);
      domainMap.put(serverId, newValue);
      return Pair.of(newValue, true);
      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
      domainMap.put(serverId, newDB);
      return Pair.of(newDB, true);
    }
  }
@@ -320,7 +317,7 @@
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = dbEnv.readChangelogState();
      initializeChangelogState(changelogState);
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
        startIndexer(changelogState);
@@ -330,24 +327,21 @@
    catch (ChangelogException e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      logError(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(),
          e.getLocalizedMessage()));
      }
      logError(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()));
    }
  }
  private void initializeChangelogState(final ChangelogState changelogState)
  private void initializeToChangelogState(final ChangelogState changelogState)
      throws ChangelogException
  {
    for (Map.Entry<DN, Long> entry :
      changelogState.getDomainToGenerationId().entrySet())
    for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet())
    {
      replicationServer.getReplicationServerDomain(entry.getKey(), true)
          .initGenerationID(entry.getValue());
      replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
    }
    for (Map.Entry<DN, List<Integer>> entry :
      changelogState.getDomainToServerIds().entrySet())
    for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      for (int serverId : entry.getValue())
      {
@@ -491,7 +485,9 @@
            firstException = e;
          }
          else if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
          }
        }
        cnIndexDB = null;
@@ -618,7 +614,7 @@
  /** {@inheritDoc} */
  @Override
  public void setPurgeDelay(long purgeDelayInMillis)
  public void setPurgeDelay(final long purgeDelayInMillis)
  {
    this.purgeDelayInMillis = purgeDelayInMillis;
    final ChangelogDBPurger purger;
@@ -642,7 +638,7 @@
  /** {@inheritDoc} */
  @Override
  public void setComputeChangeNumber(boolean computeChangeNumber)
  public void setComputeChangeNumber(final boolean computeChangeNumber)
      throws ChangelogException
  {
    if (computeChangeNumber)
@@ -661,8 +657,7 @@
  private void startIndexer(final ChangelogState changelogState)
  {
    final ChangeNumberIndexer indexer =
        new ChangeNumberIndexer(this, changelogState);
    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
    if (cnIndexer.compareAndSet(null, indexer))
    {
      indexer.start();
@@ -671,7 +666,7 @@
  /** {@inheritDoc} */
  @Override
  public long getDomainLatestTrimDate(DN baseDN)
  public long getDomainLatestTrimDate(final DN baseDN)
  {
    return latestPurgeDate;
  }
@@ -708,17 +703,15 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
      ServerState startAfterServerState) throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
      throws ChangelogException
  {
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final Map<DBCursor<UpdateMsg>, Void> cursors =
        new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    for (int serverId : serverIds)
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ?
          startAfterServerState.getCSN(serverId) : null;
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
    }
    // recycle exhausted cursors,
@@ -728,8 +721,8 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
      CSN startAfterCSN) throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
      throws ChangelogException
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
@@ -763,7 +756,7 @@
  /** {@inheritDoc} */
  @Override
  public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException
  public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
  {
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
@@ -784,8 +777,7 @@
  /** {@inheritDoc} */
  @Override
  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
  {
    dbEnv.notifyReplicaOffline(baseDN, offlineCSN);
    final ChangeNumberIndexer indexer = cnIndexer.get();
@@ -797,7 +789,7 @@
  /**
   * The thread purging the changelogDB on a regular interval. Records are
   * purged from the changelogDB is they are older than a delay specified in
   * purged from the changelogDB if they are older than a delay specified in
   * seconds. The purge process works in two steps:
   * <ol>
   * <li>first purge the changeNumberIndexDB and retrieve information to drive
@@ -858,8 +850,7 @@
            }
          }
          for (final Map<Integer, JEReplicaDB> domainMap
              : domainToReplicaDBs.values())
          for (final Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
          {
            for (final JEReplicaDB replicaDB : domainMap.values())
            {
@@ -877,8 +868,7 @@
        }
        catch (Exception e)
        {
          logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
              .get(stackTraceToSingleLineString(e)));
          logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get(stackTraceToSingleLineString(e)));
          if (replicationServer != null)
          {
            replicationServer.shutdown();