| | |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | |
| | | * <p> |
| | | * This blocking queue is only used as a temporary placeholder so that the |
| | | * write in the stable storage can be grouped for efficiency reason. Adding an |
| | | * update synchronously add the update to this list. A dedicated thread loops |
| | | * on {@link #flush()} and {@link #trim()}. |
| | | * <dl> |
| | | * <dt>flush()</dt> |
| | | * <dd>get a number of changes from the in memory list by block and write them |
| | | * to the db.</dd> |
| | | * <dt>trim()</dt> |
| | | * <dd>deletes from the DB a number of changes that are older than a certain |
| | | * date.</dd> |
| | | * </dl> |
| | | * update synchronously add the update to this list. A dedicated thread |
| | | * flushes this blocking queue. |
| | | * <p> |
| | | * Changes are not read back by replicationServer threads that are responsible |
| | | * for pushing the changes to other replication server or to LDAP server |
| | |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private DirectoryThread thread; |
| | | /** |
| | | * Used to prevent race conditions between threads calling {@link #clear()} |
| | | * {@link #flush()} or {@link #trim()}. This can happen with the thread |
| | | * flushing the queue, on shutdown or on cursor opening, a thread calling |
| | | * clear(), etc. |
| | | * Used to prevent race conditions between threads calling {@link #flush()}. |
| | | * This can happen with the thread flushing the queue, or else on shutdown. |
| | | */ |
| | | private final Object flushLock = new Object(); |
| | | private ReplicationServer replicationServer; |
| | | |
| | | private long latestTrimDate = 0; |
| | | |
| | | /** |
| | | * The trim age in milliseconds. Changes record in the change DB that |
| | | * are older than this age are removed. |
| | | */ |
| | | private long trimAge; |
| | | |
| | | /** |
| | | * Creates a new ReplicaDB associated to a given LDAP server. |
| | | * |
| | |
| | | this.replicationServer = replicationServer; |
| | | this.serverId = serverId; |
| | | this.baseDN = baseDN; |
| | | trimAge = replicationServer.getTrimAge(); |
| | | queueMaxBytes = replicationServer.getQueueSize() * 200; |
| | | queueSizeBytes = new Semaphore(queueMaxBytes); |
| | | db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv); |
| | | csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); |
| | | thread = new DirectoryThread(this, "Replication server RS(" |
| | | + replicationServer.getServerId() |
| | | + ") changelog checkpointer for Replica DS(" + serverId |
| | | + ") for domain \"" + baseDN + "\""); |
| | | + replicationServer.getServerId() |
| | | + ") flusher thread for Replica DS(" + serverId |
| | | + ") for domain \"" + baseDN + "\""); |
| | | thread.start(); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Run method for this class. |
| | | * Periodically Flushes the ReplicationServerDomain cache from memory to the |
| | | * stable storage and trims the old updates. |
| | | * Flushes the replicaDB queue from memory to stable storage. |
| | | */ |
| | | @Override |
| | | public void run() |
| | |
| | | try |
| | | { |
| | | flush(); |
| | | trim(); |
| | | } |
| | | catch (ChangelogException end) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the latest trim date. |
| | | * @return the latest trim date. |
| | | * Synchronously purge changes older than purgeCSN from this replicaDB. |
| | | * |
| | | * @param purgeCSN |
| | | * The CSN up to which changes can be purged. No purging happens when |
| | | * it is null. |
| | | * @throws ChangelogException |
| | | * In case of database problem. |
| | | */ |
| | | public long getLatestTrimDate() |
| | | void purgeUpTo(final CSN purgeCSN) throws ChangelogException |
| | | { |
| | | return latestTrimDate; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Trim old changes from this replicationServer database. |
| | | * @throws ChangelogException In case of database problem. |
| | | */ |
| | | private void trim() throws ChangelogException |
| | | { |
| | | if (trimAge == 0) |
| | | if (purgeCSN == null) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | latestTrimDate = TimeThread.getTime() - trimAge; |
| | | |
| | | CSN trimDate = new CSN(latestTrimDate, 0, 0); |
| | | |
| | | // Find the last CSN before the trimDate, in the Database. |
| | | CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate); |
| | | if (lastBeforeTrimDate != null) |
| | | { |
| | | // If we found it, we want to stop trimming when reaching it. |
| | | trimDate = lastBeforeTrimDate; |
| | | } |
| | | |
| | | for (int i = 0; i < 100; i++) |
| | | { |
| | | /* |
| | | * Perform at least some trimming regardless of the flush backlog. Then |
| | | * continue trim iterations while the flush backlog is low (below the |
| | | * lowmark). Once the flush backlog increases, stop trimming and start |
| | | * flushing more eagerly. |
| | | */ |
| | | if (i > 20 && isQueueAboveLowMark()) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | /* |
| | | * the trim is done by group in order to save some CPU, IO bandwidth and |
| | | * DB caches: start the transaction then do a bunch of remove then |
| | | * commit. |
| | | * the purge is done by group in order to save some CPU, IO bandwidth and |
| | | * DB caches: start the transaction then do a bunch of remove then commit. |
| | | */ |
| | | /* |
| | | * Matt wrote: The record removal is done as a DB transaction and the |
| | |
| | | return; |
| | | } |
| | | |
| | | if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate)) |
| | | if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN)) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean isQueueAboveLowMark() |
| | | { |
| | | final int lowMarkBytes = queueMaxBytes / 5; |
| | | final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits(); |
| | | return bytesUsed > lowMarkBytes; |
| | | } |
| | | |
| | | /** |
| | | * Flush a number of updates from the memory list to the stable storage. |
| | | * <p> |
| | | * Flush is done by chunk sized to 500 messages, starting from the beginning |
| | | * of the list. |
| | | * |
| | | * <p> |
| | | * @GuardedBy("flushLock") |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public void flush() throws ChangelogException |
| | | private void flush() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | synchronized (flushLock) |
| | | { |
| | | final List<UpdateMsg> changes = new LinkedList<UpdateMsg>(); |
| | | final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS); |
| | | final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS); |
| | | if (change == null) |
| | | { |
| | | // nothing to persist, move on to the trim phase |
| | | // nothing to persist, check if shutdown was invoked |
| | | return; |
| | | } |
| | | |
| | | // Try to see if there are more changes and persist them all. |
| | | final List<UpdateMsg> changes = new LinkedList<UpdateMsg>(); |
| | | changes.add(change); |
| | | msgQueue.drainTo(changes); |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set the Purge delay for this db Handler. |
| | | * @param delay The purge delay in Milliseconds. |
| | | */ |
| | | public void setPurgeDelay(long delay) |
| | | { |
| | | trimAge = delay; |
| | | } |
| | | |
| | | /** |
| | | * Clear the changes from this DB (from both memory cache and DB storage). |
| | | * @throws ChangelogException When an exception occurs while removing the |
| | | * changes from the DB. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return the size of the msgQueue (the memory cache of the ReplicaDB). |
| | | * Return the number of records of this replicaDB. |
| | | * <p> |
| | | * For test purpose. |
| | | * @return The memory queue size. |
| | | * |
| | | * @return The number of records of this replicaDB. |
| | | */ |
| | | int getQueueSize() |
| | | long getNumberRecords() |
| | | { |
| | | return this.msgQueue.size(); |
| | | return db.getNumberRecords(); |
| | | } |
| | | |
| | | /** |