| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | * Portions copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | |
| | | import java.util.Date; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | |
| | | */ |
| | | public class JEReplicaDB implements Runnable |
| | | { |
| | | |
| | | /** |
| | | * Class that allows atomically setting oldest and newest CSNs without |
| | | * synchronization. |
| | | * |
| | | * @Immutable |
| | | */ |
| | | private static final class CSNLimits |
| | | { |
| | | private final CSN oldestCSN; |
| | | private final CSN newestCSN; |
| | | |
| | | public CSNLimits(CSN oldestCSN, CSN newestCSN) |
| | | { |
| | | this.oldestCSN = oldestCSN; |
| | | this.newestCSN = newestCSN; |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * The msgQueue holds all the updates not yet saved to stable storage. |
| | | * <p> |
| | | * This list is only used as a temporary placeholder so that the write in the |
| | | * stable storage can be grouped for efficiency reason. Adding an update |
| | | * synchronously add the update to this list. A dedicated thread loops on |
| | | * flush() and trim(). |
| | | * This blocking queue is only used as a temporary placeholder so that the |
| | | * write in the stable storage can be grouped for efficiency reason. Adding an |
| | | * update synchronously add the update to this list. A dedicated thread loops |
| | | * on {@link #flush()} and {@link #trim()}. |
| | | * <dl> |
| | | * <dt>flush()</dt> |
| | | * <dd>get a number of changes from the in memory list by block and write them |
| | |
| | | * Changes are not read back by replicationServer threads that are responsible |
| | | * for pushing the changes to other replication server or to LDAP server |
| | | */ |
| | | private final LinkedList<UpdateMsg> msgQueue = |
| | | new LinkedList<UpdateMsg>(); |
| | | private final LinkedBlockingQueue<UpdateMsg> msgQueue = |
| | | new LinkedBlockingQueue<UpdateMsg>(); |
| | | |
| | | /** |
| | | * The High and low water mark for the max size of the msgQueue. The threads |
| | | * calling add() method will be blocked if the size of msgQueue becomes larger |
| | | * than the queueHimark and will resume only when the size of the msgQueue |
| | | * goes below queueLowmark. |
| | | * Semaphore used to limit the number of bytes used in memory by the queue. |
| | | * The threads calling {@link #add(UpdateMsg)} method will be blocked if the |
| | | * size of msgQueue becomes larger than the available permits and will resume |
| | | * only when the number of available permits allow it. |
| | | */ |
| | | private int queueMaxSize = 5000; |
| | | private int queueLowmark = 1000; |
| | | private int queueHimark = 4000; |
| | | |
| | | /** |
| | | * The queue himark and lowmark in bytes, this is set to 100 times the himark |
| | | * and lowmark in number of updates. |
| | | */ |
| | | private int queueMaxBytes = 100 * queueMaxSize; |
| | | private int queueLowmarkBytes = 100 * queueLowmark; |
| | | private int queueHimarkBytes = 100 * queueHimark; |
| | | |
| | | /** The number of bytes currently in the queue. */ |
| | | private int queueByteSize = 0; |
| | | private final Semaphore queueSizeBytes; |
| | | private final int queueMaxBytes; |
| | | |
| | | private ReplicationDB db; |
| | | private CSN oldestCSN; |
| | | private CSN newestCSN; |
| | | /** |
| | | * Holds the oldest and newest CSNs for this replicaDB for fast retrieval. |
| | | * |
| | | * @NonNull |
| | | */ |
| | | private volatile CSNLimits csnLimits; |
| | | private int serverId; |
| | | private DN baseDN; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private DirectoryThread thread; |
| | | /** |
| | | * Used to prevent race conditions between threads calling {@link #clear()} |
| | | * {@link #flush()} or {@link #trim()}. This can happen with the thread |
| | | * flushing the queue, on shutdown or on cursor opening, a thread calling |
| | | * clear(), etc. |
| | | */ |
| | | private final Object flushLock = new Object(); |
| | | private ReplicationServer replicationServer; |
| | | |
| | |
| | | this.serverId = serverId; |
| | | this.baseDN = baseDN; |
| | | trimAge = replicationServer.getTrimAge(); |
| | | final int queueSize = replicationServer.getQueueSize(); |
| | | queueMaxSize = queueSize; |
| | | queueLowmark = queueSize / 5; |
| | | queueHimark = queueSize * 4 / 5; |
| | | queueMaxBytes = 200 * queueMaxSize; |
| | | queueLowmarkBytes = 200 * queueLowmark; |
| | | queueHimarkBytes = 200 * queueLowmark; |
| | | queueMaxBytes = replicationServer.getQueueSize() * 200; |
| | | queueSizeBytes = new Semaphore(queueMaxBytes); |
| | | db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv); |
| | | oldestCSN = db.readOldestCSN(); |
| | | newestCSN = db.readNewestCSN(); |
| | | csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); |
| | | thread = new DirectoryThread(this, "Replication server RS(" |
| | | + replicationServer.getServerId() |
| | | + ") changelog checkpointer for Replica DS(" + serverId |
| | |
| | | } |
| | | |
| | | /** |
| | | * Add an update to the list of messages that must be saved to the db |
| | | * managed by this db handler. |
| | | * This method is blocking if the size of the list of message is larger |
| | | * than its maximum. |
| | | * Add an update to the list of messages that must be saved to the db managed |
| | | * by this db handler. This method is blocking if the size of the list of |
| | | * message is larger than its maximum. |
| | | * |
| | | * @param update The update that must be saved to the db managed by this db |
| | | * handler. |
| | | * @param updateMsg |
| | | * The update message that must be saved to the db managed by this db |
| | | * handler. |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public void add(UpdateMsg update) |
| | | public void add(UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | synchronized (msgQueue) |
| | | if (thread.isShutdownInitiated()) |
| | | { |
| | | int size = msgQueue.size(); |
| | | if (size > queueHimark || queueByteSize > queueHimarkBytes) |
| | | { |
| | | msgQueue.notify(); |
| | | } |
| | | throw new ChangelogException( |
| | | ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg |
| | | .toString(), String.valueOf(baseDN), String.valueOf(serverId))); |
| | | } |
| | | |
| | | while (size > queueMaxSize || queueByteSize > queueMaxBytes) |
| | | final int msgSize = updateMsg.size(); |
| | | if (msgSize < queueMaxBytes) |
| | | { |
| | | try |
| | | { |
| | | try |
| | | { |
| | | msgQueue.wait(500); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // simply loop to try again. |
| | | } |
| | | size = msgQueue.size(); |
| | | queueSizeBytes.acquire(msgSize); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg |
| | | .toString(), String.valueOf(baseDN), String.valueOf(serverId), |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // edge case with a very large message |
| | | collectAllPermits(); |
| | | } |
| | | msgQueue.add(updateMsg); |
| | | |
| | | queueByteSize += update.size(); |
| | | msgQueue.add(update); |
| | | if (newestCSN == null || newestCSN.isOlderThan(update.getCSN())) |
| | | { |
| | | newestCSN = update.getCSN(); |
| | | } |
| | | if (oldestCSN == null) |
| | | { |
| | | oldestCSN = update.getCSN(); |
| | | } |
| | | final CSNLimits limits = csnLimits; |
| | | final boolean updateNew = limits.newestCSN == null |
| | | || limits.newestCSN.isOlderThan(updateMsg.getCSN()); |
| | | final boolean updateOld = limits.oldestCSN == null; |
| | | if (updateOld || updateNew) |
| | | { |
| | | csnLimits = new CSNLimits( |
| | | updateOld ? updateMsg.getCSN() : limits.oldestCSN, |
| | | updateNew ? updateMsg.getCSN() : limits.newestCSN); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get some changes out of the message queue of the LDAP server. |
| | | * (from the beginning of the queue) |
| | | * @param number the maximum number of messages to extract. |
| | | * @return a List containing number changes extracted from the queue. |
| | | */ |
| | | private List<UpdateMsg> getChanges(int number) |
| | | /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */ |
| | | private void collectAllPermits() |
| | | { |
| | | synchronized (msgQueue) |
| | | int collectedPermits = queueSizeBytes.drainPermits(); |
| | | while (collectedPermits != queueMaxBytes) |
| | | { |
| | | final int minAvailableNb = Math.min(number, msgQueue.size()); |
| | | return new LinkedList<UpdateMsg>(msgQueue.subList(0, minAvailableNb)); |
| | | // Do not use Thread.sleep() because: |
| | | // 1) it is expected the permits will be released very soon |
| | | // 2) we want to collect all the permits, so do not leave a chance to |
| | | // other threads to steal them from us. |
| | | // 3) we want to keep low latency |
| | | Thread.yield(); |
| | | collectedPermits += queueSizeBytes.drainPermits(); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public CSN getOldestCSN() |
| | | { |
| | | return oldestCSN; |
| | | return csnLimits.oldestCSN; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public CSN getNewestCSN() |
| | | { |
| | | return newestCSN; |
| | | return csnLimits.newestCSN; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getChangesCount() |
| | | { |
| | | if (newestCSN != null && oldestCSN != null) |
| | | final CSNLimits limits = csnLimits; |
| | | if (limits.newestCSN != null && limits.oldestCSN != null) |
| | | { |
| | | return newestCSN.getSeqnum() - oldestCSN.getSeqnum() + 1; |
| | | return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1; |
| | | } |
| | | return 0; |
| | | } |
| | |
| | | { |
| | | if (startAfterCSN == null) |
| | | { |
| | | // flush any potential changes before opening the cursor |
| | | flush(); |
| | | } |
| | | return new JEReplicaDBCursor(db, startAfterCSN, this); |
| | | } |
| | | |
| | | /** |
| | | * Removes the provided number of messages from the beginning of the msgQueue. |
| | | * |
| | | * @param number the number of changes to be removed. |
| | | */ |
| | | private void clearQueue(int number) |
| | | { |
| | | synchronized (msgQueue) |
| | | { |
| | | int current = 0; |
| | | while (current < number && !msgQueue.isEmpty()) |
| | | { |
| | | UpdateMsg msg = msgQueue.remove(); // remove first |
| | | queueByteSize -= msg.size(); |
| | | current++; |
| | | } |
| | | if (msgQueue.size() < queueLowmark |
| | | && queueByteSize < queueLowmarkBytes) |
| | | { |
| | | msgQueue.notifyAll(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ReplicaDB. |
| | | */ |
| | | public void shutdown() |
| | |
| | | |
| | | thread.initiateShutdown(); |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | msgQueue.notifyAll(); |
| | | } |
| | | |
| | | while (msgQueue.size() != 0) |
| | | { |
| | | try |
| | |
| | | catch (ChangelogException e) |
| | | { |
| | | // We are already shutting down |
| | | logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH |
| | | .get(stackTraceToSingleLineString(e))); |
| | | logError(e.getMessageObject()); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | flush(); |
| | | trim(); |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | if (msgQueue.size() < queueLowmark |
| | | && queueByteSize < queueLowmarkBytes) |
| | | { |
| | | try |
| | | { |
| | | msgQueue.wait(1000); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | // Do not reset the interrupt flag here, |
| | | // because otherwise JE will barf next time flush() is called: |
| | | // JE 5.0.97 refuses to persist changes to the DB when invoked |
| | | // from a Thread with the interrupt flag set to true. |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (Exception end) |
| | | catch (ChangelogException end) |
| | | { |
| | | stop(end); |
| | | break; |
| | |
| | | { |
| | | thread.stopWork(); |
| | | } |
| | | |
| | | synchronized (this) |
| | | { |
| | | notifyAll(); |
| | | } |
| | | } |
| | | |
| | | private void stop(Exception e) |
| | |
| | | trimDate = lastBeforeTrimDate; |
| | | } |
| | | |
| | | final int queueLowMarkBytes = queueMaxBytes / 5; |
| | | for (int i = 0; i < 100; i++) |
| | | { |
| | | /* |
| | | * Perform at least some trimming regardless of the flush backlog. Then |
| | | * continue trim iterations while the flush backlog is low (below the |
| | | * lowmark). Once the flush backlog increases, stop trimming and start |
| | | * flushing more eagerly. |
| | | */ |
| | | if (i > 20 && msgQueue.size() < queueLowMarkBytes) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | synchronized (flushLock) |
| | | { |
| | | /* |
| | | * the trim is done by group in order to save some CPU and IO bandwidth |
| | | * start the transaction then do a bunch of remove then commit |
| | | * the trim is done by group in order to save some CPU, IO bandwidth and |
| | | * DB caches: start the transaction then do a bunch of remove then |
| | | * commit. |
| | | */ |
| | | /* |
| | | * Matt wrote: The record removal is done as a DB transaction and the |
| | | * deleted records are only "deleted" on commit. While the txn/cursor is |
| | | * open the records to be deleted will, I think, be pinned in the DB |
| | | * cache. In other words, the larger the transaction (the more records |
| | | * deleted during a single batch) the more DB cache will be used to |
| | | * process the transaction. |
| | | */ |
| | | final ReplServerDBCursor cursor = db.openDeleteCursor(); |
| | | try |
| | |
| | | return; |
| | | } |
| | | |
| | | if (!csn.equals(newestCSN) && csn.isOlderThan(trimDate)) |
| | | if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate)) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | { |
| | | oldestCSN = csn; |
| | | csnLimits = new CSNLimits(csn, csnLimits.newestCSN); |
| | | return; |
| | | } |
| | | } |
| | |
| | | */ |
| | | public void flush() throws ChangelogException |
| | | { |
| | | int size; |
| | | int chunksize = Math.min(queueMaxSize, 500); |
| | | |
| | | do |
| | | try |
| | | { |
| | | synchronized(flushLock) |
| | | synchronized (flushLock) |
| | | { |
| | | // get N (or less) messages from the queue to save to the DB |
| | | // (from the beginning of the queue) |
| | | List<UpdateMsg> changes = getChanges(chunksize); |
| | | |
| | | // if no more changes to save exit immediately. |
| | | if (changes == null || (size = changes.size()) == 0) |
| | | final List<UpdateMsg> changes = new LinkedList<UpdateMsg>(); |
| | | final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS); |
| | | if (change == null) |
| | | { |
| | | // nothing to persist, move on to the trim phase |
| | | return; |
| | | } |
| | | |
| | | // save the change to the stable storage. |
| | | db.addEntries(changes); |
| | | // Try to see if there are more changes and persist them all. |
| | | changes.add(change); |
| | | msgQueue.drainTo(changes); |
| | | |
| | | // remove the changes from the list of changes to be saved |
| | | // (remove from the beginning of the queue) |
| | | clearQueue(changes.size()); |
| | | int totalSize = db.addEntries(changes); |
| | | // do not release more than queue max size permits |
| | | // (be careful of the edge case with the very large message) |
| | | queueSizeBytes.release(Math.min(totalSize, queueMaxBytes)); |
| | | } |
| | | // loop while there are more changes in the queue |
| | | } while (size == chunksize); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH |
| | | .get(stackTraceToSingleLineString(e))); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(Attributes.create("replicationServer-database", |
| | | String.valueOf(serverId))); |
| | | attributes.add(Attributes.create("domain-name", |
| | | baseDN.toNormalizedString())); |
| | | if (oldestCSN != null) |
| | | create(attributes, "replicationServer-database", String.valueOf(serverId)); |
| | | create(attributes, "domain-name", baseDN.toNormalizedString()); |
| | | final CSNLimits limits = csnLimits; |
| | | if (limits.oldestCSN != null) |
| | | { |
| | | attributes.add(Attributes.create("first-change", encode(oldestCSN))); |
| | | create(attributes, "first-change", encode(limits.oldestCSN)); |
| | | } |
| | | if (newestCSN != null) |
| | | if (limits.newestCSN != null) |
| | | { |
| | | attributes.add(Attributes.create("last-change", encode(newestCSN))); |
| | | create(attributes, "last-change", encode(limits.newestCSN)); |
| | | } |
| | | attributes.add( |
| | | Attributes.create("queue-size", String.valueOf(msgQueue.size()))); |
| | | attributes.add( |
| | | Attributes.create("queue-size-bytes", String.valueOf(queueByteSize))); |
| | | create(attributes, "queue-size", String.valueOf(msgQueue.size())); |
| | | create(attributes, "queue-size-bytes", |
| | | String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits())); |
| | | return attributes; |
| | | } |
| | | |
| | | private void create(List<Attribute> attributes, String name, String value) |
| | | { |
| | | attributes.add(Attributes.create(name, value)); |
| | | } |
| | | |
| | | private String encode(CSN csn) |
| | | { |
| | | return csn + " " + new Date(csn.getTime()); |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | final CSNLimits limits = csnLimits; |
| | | return getClass().getSimpleName() + " " + baseDN + " " + serverId + " " |
| | | + oldestCSN + " " + newestCSN; |
| | | + limits.oldestCSN + " " + limits.newestCSN; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | synchronized(flushLock) |
| | | { |
| | | collectAllPermits(); |
| | | msgQueue.clear(); |
| | | queueByteSize = 0; |
| | | |
| | | db.clear(); |
| | | oldestCSN = db.readOldestCSN(); |
| | | newestCSN = db.readNewestCSN(); |
| | | csnLimits = new CSNLimits(null, null); |
| | | } |
| | | } |
| | | |
| | |
| | | * For test purpose. |
| | | * @return The memory queue size. |
| | | */ |
| | | public int getQueueSize() |
| | | int getQueueSize() |
| | | { |
| | | return this.msgQueue.size(); |
| | | } |