mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
24.00.2014 ed2025b72da8144087d7f51eb318c465260704fe
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -106,7 +106,6 @@
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
@@ -322,13 +321,11 @@
    if (indexer != null)
    {
      indexer.initiateShutdown();
      indexer.interrupt();
    }
    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
    if (purger != null)
    {
      purger.initiateShutdown();
      purger.interrupt();
    }
    try
@@ -641,6 +638,8 @@
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
    }
    // recycle exhausted cursors,
    // because client code will not manage the cursors itself
    return new CompositeDBCursor<Void>(cursors, true);
  }
@@ -762,17 +761,14 @@
            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
            if (oldestNotPurgedCSN == null)
            { // shutdown may have been initiated...
              if (!isShutdownInitiated())
              {
                // ... or the change number index DB is empty,
                // wait for new changes to come in.
              // ... or the change number index DB is empty,
              // wait for new changes to come in.
                // Note we cannot sleep for as long as the purge delay
                // (3 days default), because we might receive late updates
                // that will have to be purged before the purge delay elapses.
                // This can particularly happen in case of network partitions.
                sleep(DEFAULT_SLEEP);
              }
              // Note we cannot sleep for as long as the purge delay
              // (3 days default), because we might receive late updates
              // that will have to be purged before the purge delay elapses.
              // This can particularly happen in case of network partitions.
              jeFriendlySleep(DEFAULT_SLEEP);
              continue;
            }
          }
@@ -787,7 +783,7 @@
          latestPurgeDate = purgeTimestamp;
          sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
          jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
        }
        catch (InterruptedException e)
        {
@@ -804,6 +800,33 @@
      }
    }
    /**
     * This method implements a sleep() that is friendly to Berkeley JE.
     * <p>
     * Originally, {@link Thread#sleep(long)} was used , but waking up a
     * sleeping threads required calling {@link Thread#interrupt()}, and JE
     * threw exceptions when invoked on interrupted threads.
     * <p>
     * The solution is to replace:
     * <ol>
     * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
     * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
     * </ol>
     */
    private void jeFriendlySleep(long millis) throws InterruptedException
    {
      if (!isShutdownInitiated())
      {
        synchronized (this)
        {
          if (!isShutdownInitiated())
          {
            wait(millis);
          }
        }
      }
    }
    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
    {
      final long nextPurgeTime = notPurgedCSN.getTime();
@@ -816,5 +839,16 @@
      // wait a bit before purging more
      return DEFAULT_SLEEP;
    }
    /** {@inheritDoc} */
    @Override
    public void initiateShutdown()
    {
      super.initiateShutdown();
      synchronized (this)
      {
        notify(); // wake up the purger thread for faster shutdown
      }
    }
  }
}