/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Message; import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.util.StaticUtils; import com.forgerock.opendj.util.Pair; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; /** * Thread responsible for inserting replicated changes into the ChangeNumber * Index DB (CNIndexDB for short). Only changes older than the medium * consistency point are inserted in the CNIndexDB. As a consequence this class * is also responsible for maintaining the medium consistency point. */ public class ChangeNumberIndexer extends DirectoryThread { /** The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * If this is true, then the {@link #run()} method must clear its state. * Otherwise the run method executes normally. */ private final AtomicBoolean doClear = new AtomicBoolean(); private final ChangelogDB changelogDB; /** Only used for initialization, and then discarded. */ private ChangelogState changelogState; /* * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because * 1) initialization can happen while the replication server starts receiving * updates 2) many updates can happen concurrently. */ /** * Holds the cross domain medium consistency Replication Update Vector for the * current replication server, also known as the previous cookie. *

* Stores the value of the cookie before the change currently processed is * inserted in the DB. After insert, it is updated with the CSN of the change * currently processed (thus becoming the "current" cookie just before the * change is returned. * * @see OpenDJ Domain Names - medium consistency RUV */ private final MultiDomainServerState mediumConsistencyRUV = new MultiDomainServerState(); /** * Holds the cross domain medium consistency CSN for the current replication * server. * * @see OpenDJ Domain Names - medium consistency CSN */ private volatile CSN mediumConsistencyCSN; /** * Holds the last time each replica was seen alive, whether via updates or * heartbeats received. Data is held for each serverId cross domain. *

* Updates are persistent and stored in the replicaDBs, heartbeats are * transient and are easily constructed on normal operations. */ private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState(); private final MultiDomainServerState replicasOffline = new MultiDomainServerState(); /** * Cursor across all the replicaDBs for all the replication domains. It is * positioned on the next change that needs to be inserted in the CNIndexDB. *

* Note: it is only accessed from the {@link #run()} method. */ private CompositeDBCursor nextChangeForInsertDBCursor; /** * New cursors for this Map must be created from the {@link #run()} method, * i.e. from the same thread that will make use of them. If this rule is not * obeyed, then a JE exception will be thrown about * "Non-transactional Cursors may not be used in multiple threads;". */ private Map>> allCursors = new HashMap>>(); /** * Holds the newCursors that will have to be created in the next iteration * inside the {@link #run()} method. *

* This map can be updated by multiple threads. */ private ConcurrentMap, CSN> newCursors = new ConcurrentSkipListMap, CSN>( new Comparator>() { @Override public int compare(Pair o1, Pair o2) { final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst()); if (compareBaseDN == 0) { return o1.getSecond().compareTo(o2.getSecond()); } return compareBaseDN; } }); /** * Builds a ChangeNumberIndexer object. * * @param changelogDB * the changelogDB * @param changelogState * the changelog state used for initialization */ ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState) { super("Change number indexer"); this.changelogDB = changelogDB; this.changelogState = changelogState; } /** * Ensures the medium consistency point is updated by heartbeats. * * @param baseDN * the baseDN of the domain for which the heartbeat is published * @param heartbeatCSN * the CSN coming from the heartbeat */ public void publishHeartbeat(DN baseDN, CSN heartbeatCSN) { if (!isECLEnabledDomain(baseDN)) { return; } lastAliveCSNs.update(baseDN, heartbeatCSN); tryNotify(baseDN); } /** * Ensures the medium consistency point is updated by UpdateMsg. * * @param baseDN * the baseDN of the domain for which the heartbeat is published * @param updateMsg * the updateMsg that will update the medium consistency point * @throws ChangelogException * If a database problem happened */ public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) throws ChangelogException { if (!isECLEnabledDomain(baseDN)) { return; } final CSN csn = updateMsg.getCSN(); lastAliveCSNs.update(baseDN, csn); // only keep the oldest CSN that will be the new cursor's starting point newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); tryNotify(baseDN); } /** * Returns whether the provided baseDN represents a replication domain enabled * for the external changelog. *

* This method is a test seam that break the dependency on a static method. * * @param baseDN * the replication domain to check * @return true if the provided baseDN is enabled for the external changelog, * false if the provided baseDN is disabled for the external changelog * or unknown to multimaster replication. */ protected boolean isECLEnabledDomain(DN baseDN) { return MultimasterReplication.isECLEnabledDomain(baseDN); } /** * Returns the last time each serverId was seen alive for the specified * replication domain. * * @param baseDN * the replication domain baseDN * @return a new ServerState object holding the {serverId => CSN} Map. Can be * null if domain is not replicated. */ public ServerState getDomainLastAliveCSNs(DN baseDN) { return lastAliveCSNs.getServerState(baseDN); } /** * Signals a replica went offline. * * @param baseDN * the replica's replication domain * @param offlineCSN * the serverId and time of the replica that went offline */ public void replicaOffline(DN baseDN, CSN offlineCSN) { lastAliveCSNs.update(baseDN, offlineCSN); replicasOffline.update(baseDN, offlineCSN); tryNotify(baseDN); } /** * Notifies the Change number indexer thread if it will be able to do some * work. */ private void tryNotify(DN baseDN) { if (canMoveForwardMediumConsistencyPoint(baseDN)) { synchronized (this) { notify(); } } } private boolean canMoveForwardMediumConsistencyPoint(DN baseDN) { final CSN mcCSN = mediumConsistencyCSN; if (mcCSN != null) { final int serverId = mcCSN.getServerId(); CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId); return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive); } return true; } /** * Restores in memory data needed to build the CNIndexDB, including the medium * consistency point. */ private void initialize() throws ChangelogException, DirectoryException { final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); if (newestRecord != null) { // restore the mediumConsistencyRUV from DB mediumConsistencyRUV.update( new MultiDomainServerState(newestRecord.getPreviousCookie())); // Do not update with the newestRecord CSN // as it will be used for a sanity check later in the same method } // initialize the DB cursor and the last seen updates // to ensure the medium consistency CSN can move forward final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); for (Entry> entry : changelogState.getDomainToServerIds().entrySet()) { final DN baseDN = entry.getKey(); if (!isECLEnabledDomain(baseDN)) { continue; } for (Integer serverId : entry.getValue()) { final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId); ensureCursorExists(baseDN, serverId, csn, false); } ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); lastAliveCSNs.update(baseDN, latestKnownState); } resetNextChangeForInsertDBCursor(); if (newestRecord != null) { // restore the "previousCookie" state before shutdown final UpdateMsg record = nextChangeForInsertDBCursor.getRecord(); // sanity check: ensure that when initializing the cursors at the previous // cookie, the next change we find is the newest record in the CNIndexDB if (!record.getCSN().equals(newestRecord.getCSN())) { throw new ChangelogException( ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(newestRecord .getCSN().toStringUI(), record.getCSN().toStringUI())); } // Now we can update the mediumConsistencyRUV mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN()); nextChangeForInsertDBCursor.next(); } // this will not be used any more. Discard for garbage collection. this.changelogState = null; } private void resetNextChangeForInsertDBCursor() throws ChangelogException { final Map, DN> cursors = new HashMap, DN>(); for (Entry>> entry : this.allCursors.entrySet()) { for (Entry> entry2 : entry.getValue().entrySet()) { cursors.put(entry2.getValue(), entry.getKey()); } } final CompositeDBCursor result = new CompositeDBCursor(cursors); result.next(); nextChangeForInsertDBCursor = result; } private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn, boolean startFromPrecedingCSN) throws ChangelogException { Map> map = allCursors.get(baseDN); if (map == null) { map = new ConcurrentSkipListMap>(); allCursors.put(baseDN, map); } DBCursor cursor = map.get(serverId); if (cursor == null) { final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); // start from preceding CSN for publishUpdateMsg(), // or from the actual CSN when initializing from the previous cookie final CSN startAfterCSN = startFromPrecedingCSN ? getPrecedingCSN(csn) : csn; cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); map.put(serverId, cursor); return false; } return true; } /** * Returns the immediately preceding CSN. */ private CSN getPrecedingCSN(CSN csn) { if (csn == null) { return null; } if (csn.getSeqnum() > 0) { return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId()); } return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId()); } /** {@inheritDoc} */ @Override public void initiateShutdown() { super.initiateShutdown(); synchronized (this) { notify(); } } /** {@inheritDoc} */ @Override public void run() { try { /* * initialize here to allow fast application start up and avoid errors due * cursors being created in a different thread to the one where they are * used. */ initialize(); while (!isShutdownInitiated()) { try { if (doClear.get()) { removeAllCursors(); resetNextChangeForInsertDBCursor(); // No need to use CAS here because it is only for unit tests and at // this point all will have been cleaned up anyway. doClear.set(false); } else { createNewCursors(); } final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); if (msg == null) { synchronized (this) { if (isShutdownInitiated()) { continue; } wait(); } // advance cursor, success/failure will be checked later nextChangeForInsertDBCursor.next(); // loop to check whether new changes have been added to the // ReplicaDBs continue; } final CSN csn = msg.getCSN(); final DN baseDN = nextChangeForInsertDBCursor.getData(); // FIXME problem: what if the serverId is not part of the ServerState? // right now, change number will be blocked if (!canMoveForwardMediumConsistencyPoint(baseDN)) { // the oldest record to insert is newer than the medium consistency // point. Let's wait for a change that can be published. synchronized (this) { // double check to protect against a missed call to notify() if (!isShutdownInitiated() && !canMoveForwardMediumConsistencyPoint(baseDN)) { wait(); // loop to check if changes older than the medium consistency // point have been added to the ReplicaDBs continue; } } } // OK, the oldest change is older than the medium consistency point // let's publish it to the CNIndexDB. // Next if statement is ugly but ensures the first change will not be // immediately trimmed from the CNIndexDB. Yuck! if (mediumConsistencyRUV.isEmpty()) { mediumConsistencyRUV.replace(baseDN, new ServerState()); } final String previousCookie = mediumConsistencyRUV.toString(); final ChangeNumberIndexRecord record = new ChangeNumberIndexRecord(previousCookie, baseDN, csn); changelogDB.getChangeNumberIndexDB().addRecord(record); moveForwardMediumConsistencyPoint(csn, baseDN); // advance cursor, success/failure will be checked later nextChangeForInsertDBCursor.next(); } catch (InterruptedException ignored) { // was shutdown called? loop to figure it out. Thread.currentThread().interrupt(); } } } catch (RuntimeException e) { // Nothing can be done about it. // Rely on the DirectoryThread uncaught exceptions handler // for logging error + alert. // Message logged here gives corrective information to the administrator. Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get( getClass().getSimpleName(), stackTraceToSingleLineString(e)); TRACER.debugError(msg.toString()); throw e; } catch (Exception e) { // Nothing can be done about it. // Rely on the DirectoryThread uncaught exceptions handler // for logging error + alert. // Message logged here gives corrective information to the administrator. Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get( getClass().getSimpleName(), stackTraceToSingleLineString(e)); TRACER.debugError(msg.toString()); throw new RuntimeException(e); } finally { removeAllCursors(); } } private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN) { // update, so it becomes the previous cookie for the next change mediumConsistencyRUV.update(baseDN, csn); mediumConsistencyCSN = csn; final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId()); if (offlineCSN != null && offlineCSN.isOlderThan(mediumConsistencyCSN) // If no new updates has been seen for this replica && lastAliveCSNs.removeCSN(baseDN, offlineCSN)) { removeCursor(baseDN, csn); replicasOffline.removeCSN(baseDN, offlineCSN); mediumConsistencyRUV.removeCSN(baseDN, offlineCSN); } } private void removeAllCursors() { if (nextChangeForInsertDBCursor != null) { nextChangeForInsertDBCursor.close(); nextChangeForInsertDBCursor = null; } for (Map> map : allCursors.values()) { StaticUtils.close(map.values()); } allCursors.clear(); newCursors.clear(); } private void removeCursor(final DN baseDN, final CSN csn) { for (Entry>> entry1 : allCursors.entrySet()) { if (baseDN.equals(entry1.getKey())) { for (Iterator>> iter = entry1.getValue().entrySet().iterator(); iter.hasNext();) { final Entry> entry2 = iter.next(); if (csn.getServerId() == entry2.getKey()) { iter.remove(); StaticUtils.close(entry2.getValue()); return; } } } } } private void createNewCursors() throws ChangelogException { if (!newCursors.isEmpty()) { boolean newCursorAdded = false; for (Iterator, CSN>> iter = newCursors.entrySet().iterator(); iter.hasNext();) { final Entry, CSN> entry = iter.next(); final DN baseDN = entry.getKey().getFirst(); final CSN csn = entry.getValue(); if (!ensureCursorExists(baseDN, csn.getServerId(), csn, true)) { newCursorAdded = true; } iter.remove(); } if (newCursorAdded) { resetNextChangeForInsertDBCursor(); } } } /** * Asks the current thread to clear its state. *

* This method is only useful for unit tests. */ public void clear() { doClear.set(true); synchronized (this) { notify(); } while (doClear.get()) { // wait until clear() has been done by thread // ensures unit tests wait that this thread's state is cleaned up Thread.yield(); } } }