| | |
| | | public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) |
| | | throws ChangelogException |
| | | { |
| | | if (startAfterCSN == null) |
| | | { |
| | | // flush any potential changes before opening the cursor |
| | | flush(); |
| | | } |
| | | return new JEReplicaDBCursor(db, startAfterCSN, this); |
| | | } |
| | | |
| | |
| | | trimDate = lastBeforeTrimDate; |
| | | } |
| | | |
| | | final int queueLowMarkBytes = queueMaxBytes / 5; |
| | | for (int i = 0; i < 100; i++) |
| | | { |
| | | /* |
| | |
| | | * lowmark). Once the flush backlog increases, stop trimming and start |
| | | * flushing more eagerly. |
| | | */ |
| | | if (i > 20 && msgQueue.size() < queueLowMarkBytes) |
| | | if (i > 20 && isQueueAboveLowMark()) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | synchronized (flushLock) |
| | | /* |
| | | * the trim is done by group in order to save some CPU, IO bandwidth and |
| | | * DB caches: start the transaction then do a bunch of remove then |
| | | * commit. |
| | | */ |
| | | /* |
| | | * Matt wrote: The record removal is done as a DB transaction and the |
| | | * deleted records are only "deleted" on commit. While the txn/cursor is |
| | | * open the records to be deleted will, I think, be pinned in the DB |
| | | * cache. In other words, the larger the transaction (the more records |
| | | * deleted during a single batch) the more DB cache will be used to |
| | | * process the transaction. |
| | | */ |
| | | final ReplServerDBCursor cursor = db.openDeleteCursor(); |
| | | try |
| | | { |
| | | /* |
| | | * the trim is done by group in order to save some CPU, IO bandwidth and |
| | | * DB caches: start the transaction then do a bunch of remove then |
| | | * commit. |
| | | */ |
| | | /* |
| | | * Matt wrote: The record removal is done as a DB transaction and the |
| | | * deleted records are only "deleted" on commit. While the txn/cursor is |
| | | * open the records to be deleted will, I think, be pinned in the DB |
| | | * cache. In other words, the larger the transaction (the more records |
| | | * deleted during a single batch) the more DB cache will be used to |
| | | * process the transaction. |
| | | */ |
| | | final ReplServerDBCursor cursor = db.openDeleteCursor(); |
| | | try |
| | | for (int j = 0; j < 50; j++) |
| | | { |
| | | for (int j = 0; j < 50; j++) |
| | | if (thread.isShutdownInitiated()) |
| | | { |
| | | if (thread.isShutdownInitiated()) |
| | | { |
| | | return; |
| | | } |
| | | return; |
| | | } |
| | | |
| | | CSN csn = cursor.nextCSN(); |
| | | if (csn == null) |
| | | { |
| | | return; |
| | | } |
| | | CSN csn = cursor.nextCSN(); |
| | | if (csn == null) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate)) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | { |
| | | csnLimits = new CSNLimits(csn, csnLimits.newestCSN); |
| | | return; |
| | | } |
| | | if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate)) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | { |
| | | csnLimits = new CSNLimits(csn, csnLimits.newestCSN); |
| | | return; |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // mark shutdown for this db so that we don't try again to |
| | | // stop it from cursor.close() or methods called by cursor.close() |
| | | cursor.abort(); |
| | | thread.initiateShutdown(); |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | cursor.close(); |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // mark shutdown for this db so that we don't try again to |
| | | // stop it from cursor.close() or methods called by cursor.close() |
| | | cursor.abort(); |
| | | thread.initiateShutdown(); |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | cursor.close(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private boolean isQueueAboveLowMark() |
| | | { |
| | | final int lowMarkBytes = queueMaxBytes / 5; |
| | | final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits(); |
| | | return bytesUsed > lowMarkBytes; |
| | | } |
| | | |
| | | /** |
| | | * Flush a number of updates from the memory list to the stable storage. |
| | | * <p> |
| | |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | { |
| | | synchronized(flushLock) |
| | | { |
| | | collectAllPermits(); |
| | | msgQueue.clear(); |
| | | db.clear(); |
| | | csnLimits = new CSNLimits(null, null); |
| | | } |
| | | collectAllPermits(); |
| | | msgQueue.clear(); // this call should not do anything at all |
| | | db.clear(); |
| | | csnLimits = new CSNLimits(null, null); |
| | | } |
| | | |
| | | /** |