| | |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.Date; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.CSN; |
| | |
| | | import org.opends.server.types.InitializationException; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * This class is used for managing the replicationServer database for each |
| | |
| | | * <p> |
| | | * This class publish some monitoring information below cn=monitor. |
| | | */ |
| | | public class JEReplicaDB implements Runnable |
| | | public class JEReplicaDB |
| | | { |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | /** |
| | | * Class that allows atomically setting oldest and newest CSNs without |
| | |
| | | |
| | | } |
| | | |
| | | /** |
| | | * The msgQueue holds all the updates not yet saved to stable storage. |
| | | * <p> |
| | | * This blocking queue is only used as a temporary placeholder so that the |
| | | * write in the stable storage can be grouped for efficiency reason. Adding an |
| | | * update synchronously add the update to this list. A dedicated thread |
| | | * flushes this blocking queue. |
| | | * <p> |
| | | * Changes are not read back by replicationServer threads that are responsible |
| | | * for pushing the changes to other replication server or to LDAP server |
| | | */ |
| | | private final LinkedBlockingQueue<UpdateMsg> msgQueue = |
| | | new LinkedBlockingQueue<UpdateMsg>(); |
| | | |
| | | /** |
| | | * Semaphore used to limit the number of bytes used in memory by the queue. |
| | | * The threads calling {@link #add(UpdateMsg)} method will be blocked if the |
| | | * size of msgQueue becomes larger than the available permits and will resume |
| | | * only when the number of available permits allow it. |
| | | */ |
| | | private final Semaphore queueSizeBytes; |
| | | private final int queueMaxBytes; |
| | | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | private ReplicationDB db; |
| | | /** |
| | | * Holds the oldest and newest CSNs for this replicaDB for fast retrieval. |
| | |
| | | private int serverId; |
| | | private DN baseDN; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private DirectoryThread thread; |
| | | /** |
| | | * Used to prevent race conditions between threads calling {@link #flush()}. |
| | | * This can happen with the thread flushing the queue, or else on shutdown. |
| | | */ |
| | | private final Object flushLock = new Object(); |
| | | private ReplicationServer replicationServer; |
| | | |
| | | /** |
| | |
| | | this.replicationServer = replicationServer; |
| | | this.serverId = serverId; |
| | | this.baseDN = baseDN; |
| | | queueMaxBytes = replicationServer.getQueueSize() * 200; |
| | | queueSizeBytes = new Semaphore(queueMaxBytes); |
| | | db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv); |
| | | csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); |
| | | thread = new DirectoryThread(this, "Replication server RS(" |
| | | + replicationServer.getServerId() |
| | | + ") flusher thread for Replica DS(" + serverId |
| | | + ") for domain \"" + baseDN + "\""); |
| | | thread.start(); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | DirectoryServer.registerMonitorProvider(dbMonitor); |
| | |
| | | */ |
| | | public void add(UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | if (thread.isShutdownInitiated()) |
| | | if (shutdown.get()) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg, baseDN, serverId)); |
| | | } |
| | | |
| | | final int msgSize = updateMsg.size(); |
| | | if (msgSize < queueMaxBytes) |
| | | { |
| | | try |
| | | { |
| | | queueSizeBytes.acquire(msgSize); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg, baseDN, serverId, |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // edge case with a very large message |
| | | collectAllPermits(); |
| | | } |
| | | msgQueue.add(updateMsg); |
| | | db.addEntry(updateMsg); |
| | | |
| | | final CSNLimits limits = csnLimits; |
| | | final boolean updateNew = limits.newestCSN == null |
| | |
| | | } |
| | | } |
| | | |
| | | /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */ |
| | | private void collectAllPermits() |
| | | { |
| | | int collectedPermits = queueSizeBytes.drainPermits(); |
| | | while (collectedPermits != queueMaxBytes) |
| | | { |
| | | // Do not use Thread.sleep() because: |
| | | // 1) it is expected the permits will be released very soon |
| | | // 2) we want to collect all the permits, so do not leave a chance to |
| | | // other threads to steal them from us. |
| | | // 3) we want to keep low latency |
| | | Thread.yield(); |
| | | collectedPermits += queueSizeBytes.drainPermits(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the oldest CSN that has not been purged yet. |
| | | * |
| | |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | if (thread.isShutdownInitiated()) |
| | | if (shutdown.compareAndSet(false, true)) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | thread.initiateShutdown(); |
| | | |
| | | while (msgQueue.size() != 0) |
| | | { |
| | | try |
| | | { |
| | | flush(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // We are already shutting down |
| | | logger.error(e.getMessageObject()); |
| | | } |
| | | } |
| | | |
| | | db.shutdown(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | } |
| | | |
| | | /** |
| | | * Flushes the replicaDB queue from memory to stable storage. |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | thread.startWork(); |
| | | |
| | | try |
| | | { |
| | | while (!thread.isShutdownInitiated()) |
| | | { |
| | | try |
| | | { |
| | | flush(); |
| | | } |
| | | catch (ChangelogException end) |
| | | { |
| | | stop(end); |
| | | break; |
| | | } |
| | | } |
| | | |
| | | try |
| | | { |
| | | // call flush a last time before exiting to make sure that |
| | | // no change was forgotten in the msgQueue |
| | | flush(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | stop(e); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | thread.stopWork(); |
| | | } |
| | | } |
| | | |
| | | private void stop(Exception e) |
| | | { |
| | | logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e)); |
| | | |
| | | thread.initiateShutdown(); |
| | | |
| | | if (replicationServer != null) |
| | | { |
| | | replicationServer.shutdown(); |
| | | db.shutdown(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | for (int j = 0; j < 50; j++) |
| | | { |
| | | if (thread.isShutdownInitiated()) |
| | | if (shutdown.get()) |
| | | { |
| | | return; |
| | | } |
| | |
| | | // mark shutdown for this db so that we don't try again to |
| | | // stop it from cursor.close() or methods called by cursor.close() |
| | | cursor.abort(); |
| | | thread.initiateShutdown(); |
| | | shutdown.set(true); |
| | | throw e; |
| | | } |
| | | finally |
| | |
| | | } |
| | | |
| | | /** |
| | | * Flush a number of updates from the memory list to the stable storage. |
| | | * <p> |
| | | * Flush is done by chunk sized to 500 messages, starting from the beginning |
| | | * of the list. |
| | | * <p> |
| | | * @GuardedBy("flushLock") |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | private void flush() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | synchronized (flushLock) |
| | | { |
| | | final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS); |
| | | if (change == null) |
| | | { |
| | | // nothing to persist, check if shutdown was invoked |
| | | return; |
| | | } |
| | | |
| | | // Try to see if there are more changes and persist them all. |
| | | final List<UpdateMsg> changes = new LinkedList<UpdateMsg>(); |
| | | changes.add(change); |
| | | msgQueue.drainTo(changes); |
| | | |
| | | int totalSize = db.addEntries(changes); |
| | | // do not release more than queue max size permits |
| | | // (be careful of the edge case with the very large message) |
| | | queueSizeBytes.release(Math.min(totalSize, queueMaxBytes)); |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH |
| | | .get(stackTraceToSingleLineString(e))); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This internal class is used to implement the Monitoring capabilities of the |
| | | * ReplicaDB. |
| | | */ |
| | |
| | | { |
| | | create(attributes, "last-change", encode(limits.newestCSN)); |
| | | } |
| | | create(attributes, "queue-size", String.valueOf(msgQueue.size())); |
| | | create(attributes, "queue-size-bytes", |
| | | String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits())); |
| | | return attributes; |
| | | } |
| | | |
| | |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | { |
| | | collectAllPermits(); |
| | | msgQueue.clear(); // this call should not do anything at all |
| | | db.clear(); |
| | | csnLimits = new CSNLimits(null, null); |
| | | } |