/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013-2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import org.opends.messages.Message; import org.opends.server.api.DirectoryThread; import org.opends.server.backends.ChangelogBackend; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.ReplicaOfflineMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; import static org.opends.server.util.StaticUtils.*; /** * Thread responsible for inserting replicated changes into the ChangeNumber * Index DB (CNIndexDB for short). Only changes older than the medium * consistency point are inserted in the CNIndexDB. As a consequence this class * is also responsible for maintaining the medium consistency point. */ public class ChangeNumberIndexer extends DirectoryThread { /** The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * If it contains nothing, then the run method executes normally. * Otherwise, the {@link #run()} method must clear its state * for the supplied domain baseDNs. If a supplied domain is * {@link DN#NULL_DN}, then all domains will be cleared. */ private final ConcurrentSkipListSet domainsToClear = new ConcurrentSkipListSet(); private final ChangelogDB changelogDB; /** Only used for initialization, and then discarded. */ private ChangelogState changelogState; private final ECLEnabledDomainPredicate predicate; /* * The following MultiDomainServerState fields must be thread safe, because * 1) initialization can happen while the replication server starts receiving * updates 2) many updates can happen concurrently. */ /** * Holds the cross domain medium consistency Replication Update Vector for the * current replication server, also known as the previous cookie. *

* Stores the value of the cookie before the change currently processed is * inserted in the DB. After insert, it is updated with the CSN of the change * currently processed (thus becoming the "current" cookie just before the * change is returned. *

* Note: This object is only updated by changes/updates. * * @see OpenDJ Domain Names - medium consistency RUV */ private final MultiDomainServerState mediumConsistencyRUV = new MultiDomainServerState(); /** * Holds the last time each replica was seen alive, whether via updates or * heartbeat notifications, or offline notifications. Data is held for each * serverId cross domain. *

* Updates are persistent and stored in the replicaDBs, heartbeats are * transient and are easily constructed on normal operations. *

* Note: This object is updated by both heartbeats and changes/updates. */ private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState(); /** Note: This object is updated by replica offline messages. */ private final MultiDomainServerState replicasOffline = new MultiDomainServerState(); /** * Cursor across all the replicaDBs for all the replication domains. It is * positioned on the next change that needs to be inserted in the CNIndexDB. *

* Note: it is only accessed from the {@link #run()} method. * * @NonNull */ private ECLMultiDomainDBCursor nextChangeForInsertDBCursor; /** * Builds a ChangeNumberIndexer object. * * @param changelogDB * the changelogDB * @param changelogState * the changelog state used for initialization */ public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState) { this(changelogDB, changelogState, new ECLEnabledDomainPredicate()); } /** * Builds a ChangeNumberIndexer object. * * @param changelogDB * the changelogDB * @param changelogState * the changelog state used for initialization * @param predicate * tells whether a domain is enabled for the external changelog */ ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState, ECLEnabledDomainPredicate predicate) { super("Change number indexer"); this.changelogDB = changelogDB; this.changelogState = changelogState; this.predicate = predicate; } /** * Ensures the medium consistency point is updated by heartbeats. * * @param baseDN * the baseDN of the domain for which the heartbeat is published * @param heartbeatCSN * the CSN coming from the heartbeat */ public void publishHeartbeat(DN baseDN, CSN heartbeatCSN) { if (!predicate.isECLEnabledDomain(baseDN)) { return; } final CSN oldestCSNBefore = getOldestLastAliveCSN(); lastAliveCSNs.update(baseDN, heartbeatCSN); tryNotify(oldestCSNBefore); } /** * Indicates if the replica corresponding to provided domain DN and server id * is offline. * * @param domainDN * base DN of the replica * @param serverId * server id of the replica * @return {@code true} if replica is offline, {@code false} otherwise */ public boolean isReplicaOffline(DN domainDN, int serverId) { return replicasOffline.getCSN(domainDN, serverId) != null; } /** * Ensures the medium consistency point is updated by UpdateMsg. * * @param baseDN * the baseDN of the domain for which the heartbeat is published * @param updateMsg * the updateMsg that will update the medium consistency point * @throws ChangelogException * If a database problem happened */ public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) throws ChangelogException { if (!predicate.isECLEnabledDomain(baseDN)) { return; } final CSN oldestCSNBefore = getOldestLastAliveCSN(); lastAliveCSNs.update(baseDN, updateMsg.getCSN()); tryNotify(oldestCSNBefore); } /** * Signals a replica went offline. * * @param baseDN * the replica's replication domain * @param offlineCSN * the serverId and time of the replica that went offline */ public void replicaOffline(DN baseDN, CSN offlineCSN) { if (!predicate.isECLEnabledDomain(baseDN)) { return; } replicasOffline.update(baseDN, offlineCSN); final CSN oldestCSNBefore = getOldestLastAliveCSN(); lastAliveCSNs.update(baseDN, offlineCSN); tryNotify(oldestCSNBefore); } private CSN getOldestLastAliveCSN() { return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond(); } /** * Notifies the Change number indexer thread if it will be able to do some * work. */ private void tryNotify(final CSN oldestCSNBefore) { if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore)) { synchronized (this) { notify(); } } } /** * Used for waking up the {@link ChangeNumberIndexer} thread because it might * have some work to do. */ private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore) { final CSN oldestCSNAfter = getOldestLastAliveCSN(); // ensure that all initial replicas alive information have been updated // with CSNs that are acceptable for moving the medium consistency forward return allInitialReplicasAreOfflineOrAlive() && oldestCSNBefore != null // then oldestCSNAfter cannot be null // has the oldest CSN changed? && oldestCSNBefore.isOlderThan(oldestCSNAfter); } /** * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN * must be persisted to the change number index DB. */ private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist) { // ensure that all initial replicas alive information have been updated // with CSNs that are acceptable for moving the medium consistency forward return allInitialReplicasAreOfflineOrAlive() // can we persist the next CSN? && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN()); } /** * Returns true only if the initial replicas known from the changelog state DB * are either: *

* In both cases, we have enough information to compute medium consistency * without waiting any further. */ private boolean allInitialReplicasAreOfflineOrAlive() { for (DN baseDN : lastAliveCSNs) { for (CSN csn : lastAliveCSNs.getServerState(baseDN)) { if (csn.getTime() == 0 && replicasOffline.getCSN(baseDN, csn.getServerId()) == null) { // this is the oldest possible CSN, but the replica is not offline // we must wait for more up to date information from this replica return false; } } } return true; } /** * Restores in memory data needed to build the CNIndexDB, including the medium * consistency point. */ private void initialize() throws ChangelogException, DirectoryException { final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); if (newestRecord != null) { // restore the mediumConsistencyRUV from DB mediumConsistencyRUV.update( new MultiDomainServerState(newestRecord.getPreviousCookie())); // Do not update with the newestRecord CSN // as it will be used for a sanity check later in the same method } // initialize the DB cursor and the last seen updates // to ensure the medium consistency CSN can move forward final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); for (Entry> entry : changelogState.getDomainToServerIds().entrySet()) { final DN baseDN = entry.getKey(); if (predicate.isECLEnabledDomain(baseDN)) { for (Integer serverId : entry.getValue()) { /* * initialize with the oldest possible CSN in order for medium * consistency to wait for all replicas to be alive before moving forward */ lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId)); } final ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); lastAliveCSNs.update(baseDN, latestKnownState); } } nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY)); nextChangeForInsertDBCursor.next(); if (newestRecord != null) { // restore the "previousCookie" state before shutdown UpdateMsg record = nextChangeForInsertDBCursor.getRecord(); if (record instanceof ReplicaOfflineMsg) { // ignore: replica offline messages are never stored in the CNIndexDB nextChangeForInsertDBCursor.next(); record = nextChangeForInsertDBCursor.getRecord(); } // sanity check: ensure that when initializing the cursors at the previous // cookie, the next change we find is the newest record in the CNIndexDB if (!record.getCSN().equals(newestRecord.getCSN())) { throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get( newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI())); } // Now we can update the mediumConsistencyRUV mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN()); nextChangeForInsertDBCursor.next(); } final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas(); for (DN baseDN : offlineReplicas) { for (CSN offlineCSN : offlineReplicas.getServerState(baseDN)) { if (predicate.isECLEnabledDomain(baseDN)) { replicasOffline.update(baseDN, offlineCSN); // a replica offline message could also be the very last time // we heard from this replica :) lastAliveCSNs.update(baseDN, offlineCSN); } } } // this will not be used any more. Discard for garbage collection. this.changelogState = null; } private CSN oldestPossibleCSN(int serverId) { return new CSN(0, 0, serverId); } /** {@inheritDoc} */ @Override public void initiateShutdown() { super.initiateShutdown(); synchronized (this) { notify(); } } /** {@inheritDoc} */ @Override public void run() { try { /* * initialize here to allow fast application start up and avoid errors due * cursors being created in a different thread to the one where they are used. */ initialize(); while (!isShutdownInitiated()) { try { while (!domainsToClear.isEmpty()) { final DN baseDNToClear = domainsToClear.first(); nextChangeForInsertDBCursor.removeDomain(baseDNToClear); // Only release the waiting thread // once this domain's state has been cleared. domainsToClear.remove(baseDNToClear); } // Do not call DBCursor.next() here // because we might not have consumed the last record, // for example if we could not move the MCP forward final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); if (msg == null) { synchronized (this) { if (isShutdownInitiated()) { continue; } wait(); } // check whether new changes have been added to the ReplicaDBs nextChangeForInsertDBCursor.next(); continue; } else if (msg instanceof ReplicaOfflineMsg) { nextChangeForInsertDBCursor.next(); continue; } final CSN csn = msg.getCSN(); final DN baseDN = nextChangeForInsertDBCursor.getData(); // FIXME problem: what if the serverId is not part of the ServerState? // right now, change number will be blocked if (!canMoveForwardMediumConsistencyPoint(csn)) { // the oldest record to insert is newer than the medium consistency // point. Let's wait for a change that can be published. synchronized (this) { // double check to protect against a missed call to notify() if (!canMoveForwardMediumConsistencyPoint(csn)) { if (isShutdownInitiated()) { return; } wait(); // loop to check if changes older than the medium consistency // point have been added to the ReplicaDBs continue; } } } // OK, the oldest change is older than the medium consistency point // let's publish it to the CNIndexDB. final String previousCookie = mediumConsistencyRUV.toString(); final long changeNumber = changelogDB.getChangeNumberIndexDB().addRecord( new ChangeNumberIndexRecord(previousCookie, baseDN, csn)); notifyEntryAddedToChangelog(baseDN, changeNumber, previousCookie, msg); moveForwardMediumConsistencyPoint(csn, baseDN); } catch (InterruptedException ignored) { // was shutdown called? loop to figure it out. Thread.currentThread().interrupt(); } } } catch (RuntimeException e) { logUnexpectedException(e); // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. throw e; } catch (Exception e) { logUnexpectedException(e); // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert. throw new RuntimeException(e); } finally { nextChangeForInsertDBCursor.close(); nextChangeForInsertDBCursor = null; } } /** * Notifies the {@link ChangelogBackend} that a new entry has been added. * * @param baseDN * the baseDN of the newly added entry. * @param changeNumber * the change number of the newly added entry. It will be greater * than zero for entries added to the change number index and less * than or equal to zero for entries added to any replica DB * @param cookieString * a string representing the cookie of the newly added entry. This is * only meaningful for entries added to the change number index * @param msg * the update message of the newly added entry * @throws ChangelogException * If a problem occurs while notifying of the newly added entry. */ protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber, String cookieString, UpdateMsg msg) throws ChangelogException { ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookieString, msg); } /** * Nothing can be done about it. *

* Rely on the DirectoryThread uncaught exceptions handler for logging error + * alert. *

* Message logged here gives corrective information to the administrator. */ private void logUnexpectedException(Exception e) { Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get( getClass().getSimpleName(), stackTraceToSingleLineString(e)); TRACER.debugError(msg.toString()); } private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException { // update, so it becomes the previous cookie for the next change mediumConsistencyRUV.update(mcBaseDN, mcCSN); final int mcServerId = mcCSN.getServerId(); final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); if (offlineCSN != null) { if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN)) { // replica is back online, we can forget the last time it was offline replicasOffline.removeCSN(mcBaseDN, offlineCSN); } else if (offlineCSN.isOlderThan(mcCSN)) { /* * replica is not back online, Medium consistency point has gone past * its last offline time, and there are no more changes after the * offline CSN in the cursor: remove everything known about it * (offlineCSN from lastAliveCSN and remove all knowledge of this replica * from the medium consistency RUV). */ lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); } } // advance the cursor we just read from, // success/failure will be checked later nextChangeForInsertDBCursor.next(); } /** * Asks the current thread to clear its state for the specified domain. *

* Note: This method blocks the current thread until state is cleared. * * @param baseDN the baseDN to be cleared from this thread's state. * {@code null} and {@link DN#NULL_DN} mean "clear all domains". */ public void clear(DN baseDN) { // Use DN.NULL_DN to say "clear all domains" final DN baseDNToClear = baseDN != null ? baseDN : DN.NULL_DN; domainsToClear.add(baseDNToClear); while (domainsToClear.contains(baseDNToClear) && !State.TERMINATED.equals(getState())) { // wait until clear() has been done by thread, always waking it up synchronized (this) { notify(); } // ensures thread wait that this thread's state is cleaned up Thread.yield(); } } }