| | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | |
| | | /** The last generated value for the change number. */ |
| | | private final AtomicLong lastGeneratedChangeNumber; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private boolean shutdown = false; |
| | | private boolean trimDone = false; |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | private volatile boolean trimDone = false; |
| | | /** |
| | | * A dedicated thread loops trim(). |
| | | * <p> |
| | | * trim() : deletes from the DB a number of changes that are older than a |
| | | * certain date. |
| | | */ |
| | | private DirectoryThread thread; |
| | | private DirectoryThread trimmingThread; |
| | | /** |
| | | * The trim age in milliseconds. Changes record in the change DB that are |
| | | * older than this age are removed. |
| | |
| | | long newestCN = (newestRecord != null) ? newestRecord.getChangeNumber() : 0; |
| | | lastGeneratedChangeNumber = new AtomicLong(newestCN); |
| | | |
| | | // Trimming thread |
| | | thread = |
| | | new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer"); |
| | | thread.start(); |
| | | |
| | | // Monitoring registration |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | DirectoryServer.registerMonitorProvider(dbMonitor); |
| | | } |
| | | |
| | | /** |
| | | * Creates and starts the thread trimming the CNIndexDB. |
| | | */ |
| | | public void startTrimmingThread() |
| | | { |
| | | trimmingThread = |
| | | new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer"); |
| | | trimmingThread.start(); |
| | | } |
| | | |
| | | private long getChangeNumber(CNIndexRecord record) throws ChangelogException |
| | | { |
| | | if (record != null) |
| | |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | if (shutdown) |
| | | if (shutdown.get()) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | shutdown = true; |
| | | shutdown.set(true); |
| | | synchronized (this) |
| | | { |
| | | notifyAll(); |
| | |
| | | @Override |
| | | public void run() |
| | | { |
| | | while (!shutdown) |
| | | while (!shutdown.get()) |
| | | { |
| | | try { |
| | | trim(); |
| | | trim(shutdown); |
| | | |
| | | synchronized (this) |
| | | { |
| | |
| | | * Trim old changes from this database. |
| | | * @throws ChangelogException In case of database problem. |
| | | */ |
| | | public void trim() throws ChangelogException |
| | | private void trim(AtomicBoolean shutdown) throws ChangelogException |
| | | { |
| | | if (trimAge == 0) |
| | | return; |
| | | |
| | | clear(null); |
| | | clear(null, shutdown); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void clear(DN baseDNToClear) throws ChangelogException |
| | | { |
| | | clear(baseDNToClear, null); |
| | | } |
| | | |
| | | private void clear(DN baseDNToClear, AtomicBoolean shutdown) |
| | | throws ChangelogException |
| | | { |
| | | if (isEmpty()) |
| | | { |
| | | return; |
| | |
| | | |
| | | for (int i = 0; i < 100; i++) |
| | | { |
| | | if (mustShutdown(shutdown)) |
| | | { |
| | | return; |
| | | } |
| | | final DraftCNDBCursor cursor = db.openDeleteCursor(); |
| | | try |
| | | { |
| | | for (int j = 0; j < 50; j++) |
| | | { |
| | | // let's traverse the CNIndexDB |
| | | if (!cursor.next()) |
| | | if (mustShutdown(shutdown) || !cursor.next()) |
| | | { |
| | | cursor.close(); |
| | | return; |
| | |
| | | // mark shutdown for this db so that we don't try again to |
| | | // stop it from cursor.close() or methods called by cursor.close() |
| | | cursor.abort(); |
| | | shutdown = true; |
| | | shutdown.set(true); |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | |
| | | // mark shutdown for this db so that we don't try again to |
| | | // stop it from cursor.close() or methods called by cursor.close() |
| | | cursor.abort(); |
| | | shutdown = true; |
| | | shutdown.set(true); |
| | | throw new ChangelogException(e); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private boolean mustShutdown(AtomicBoolean shutdown) |
| | | { |
| | | return shutdown != null && shutdown.get(); |
| | | } |
| | | |
| | | /** |
| | | * This internal class is used to implement the Monitoring capabilities of the |
| | | * JEChangeNumberIndexDB. |