/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import org.opends.messages.Message; import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.types.DN; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; import static org.opends.server.loggers.debug.DebugLogger.*; /** * Thread responsible for inserting replicated changes into the ChangeNumber * Index DB (CNIndexDB for short). Only changes older than the medium * consistency point are inserted in the CNIndexDB. As a consequence this class * is also responsible for maintaining the medium consistency point. */ public class ChangeNumberIndexer extends DirectoryThread { /** The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private final ChangelogDB changelogDB; /** Only used for initialization, and then discarded. */ private ChangelogState changelogState; /* * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because * 1) initialization can happen while the replication server starts receiving * updates 2) many updates can happen concurrently. */ /** * Holds the cross domain medium consistency Replication Update Vector for the * current replication server, also known as the previous cookie. *

* Stores the value of the cookie before the change currently processed is * inserted in the DB. After insert, it is updated with the CSN of the change * currently processed (thus becoming the "current" cookie just before the * change is returned. * * @see OpenDJ Domain Names - medium consistency RUV */ private final MultiDomainServerState mediumConsistencyRUV = new MultiDomainServerState(); /** * Holds the cross domain medium consistency CSN for the current replication * server. * * @see OpenDJ Domain Names - medium consistency CSN */ private volatile CSN mediumConsistencyCSN; /** * Holds the most recent changes or heartbeats received for each serverIds * cross domain. */ private final MultiDomainServerState lastSeenUpdates = new MultiDomainServerState(); private final MultiDomainServerState replicasOffline = new MultiDomainServerState(); /** * Composite cursor across all the replicaDBs for all the replication domains. * It is volatile to ensure it supports concurrent update. Each time it is * used more than once in a method, the method must take a local copy to * ensure the cursor does not get updated in the middle of the method. */ private volatile CompositeDBCursor crossDomainDBCursor; /** * New cursors for this Map must be created from the {@link #run()} method, * i.e. from the same thread that will make use of them. If this rule is not * obeyed, then a JE exception will be thrown about * "Non-transactional Cursors may not be used in multiple threads;". */ private Map>> allCursors = new HashMap>>(); /** This map can be updated by multiple threads. */ private ConcurrentMap newCursors = new ConcurrentSkipListMap(); /** * Builds a ChangeNumberIndexer object. * * @param changelogDB * the changelogDB * @param changelogState * the changelog state used for initialization */ ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState) { super("Change number indexer"); this.changelogDB = changelogDB; this.changelogState = changelogState; } /** * Ensures the medium consistency point is updated by heartbeats. * * @param baseDN * the baseDN of the domain for which the heartbeat is published * @param heartbeatCSN * the CSN coming from the heartbeat */ public void publishHeartbeat(DN baseDN, CSN heartbeatCSN) { lastSeenUpdates.update(baseDN, heartbeatCSN); tryNotify(baseDN); } /** * Ensures the medium consistency point is updated by UpdateMsg. * * @param baseDN * the baseDN of the domain for which the heartbeat is published * @param updateMsg * the updateMsg that will update the medium consistency point * @throws ChangelogException * If a database problem happened */ public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg) throws ChangelogException { final CSN csn = updateMsg.getCSN(); lastSeenUpdates.update(baseDN, csn); newCursors.put(csn, baseDN); tryNotify(baseDN); } /** * Signals a replica went offline. * * @param baseDN * the replica's replication domain * @param offlineCSN * the serverId and time of the replica that went offline */ public void replicaOffline(DN baseDN, CSN offlineCSN) { lastSeenUpdates.update(baseDN, offlineCSN); replicasOffline.update(baseDN, offlineCSN); tryNotify(baseDN); } /** * Notifies the Change number indexer thread if it will be able to do some * work. */ private void tryNotify(DN baseDN) { if (canMoveForwardMediumConsistencyPoint(baseDN)) { synchronized (this) { notify(); } } } private boolean canMoveForwardMediumConsistencyPoint(DN baseDN) { final CSN mcCSN = mediumConsistencyCSN; if (mcCSN != null) { final int serverId = mcCSN.getServerId(); final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId); return mcCSN.isOlderThan(lastSeenSameServerId); } return true; } private void initialize() throws ChangelogException, DirectoryException { final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); if (newestRecord != null) { mediumConsistencyRUV.update( new MultiDomainServerState(newestRecord.getPreviousCookie())); } // initialize the cross domain DB cursor final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); for (Entry> entry : changelogState.getDomainToServerIds().entrySet()) { final DN baseDN = entry.getKey(); for (Integer serverId : entry.getValue()) { final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId); ensureCursorExists(baseDN, serverId, csn); } ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); lastSeenUpdates.update(baseDN, latestKnownState); } crossDomainDBCursor = newCompositeDBCursor(); if (newestRecord != null) { // restore the "previousCookie" state before shutdown final UpdateMsg record = crossDomainDBCursor.getRecord(); if (!record.getCSN().equals(newestRecord.getCSN())) { // TODO JNR i18n safety check, should never happen throw new ChangelogException(Message.raw("They do not equal! recordCSN=" + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN())); } mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN()); crossDomainDBCursor.next(); } // this will not be used any more. Discard for garbage collection. this.changelogState = null; } private CompositeDBCursor newCompositeDBCursor() throws ChangelogException { final Map, DN> cursors = new HashMap, DN>(); for (Entry>> entry : this.allCursors.entrySet()) { for (Entry> entry2 : entry.getValue().entrySet()) { cursors.put(entry2.getValue(), entry.getKey()); } } final CompositeDBCursor result = new CompositeDBCursor(cursors); result.next(); return result; } private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn) throws ChangelogException { Map> map = allCursors.get(baseDN); if (map == null) { map = new ConcurrentSkipListMap>(); allCursors.put(baseDN, map); } DBCursor cursor = map.get(serverId); if (cursor == null) { final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); cursor = domainDB.getCursorFrom(baseDN, serverId, csn); map.put(serverId, cursor); return false; } return true; } /** {@inheritDoc} */ @Override public void run() { try { /* * initialize here to allow fast application start up and avoid errors due * cursors being created in a different thread to the one where they are * used. */ initialize(); } catch (DirectoryException e) { // TODO JNR error message i18n if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); return; } catch (ChangelogException e) { // TODO JNR error message i18n if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); return; } while (!isShutdownInitiated()) { try { createNewCursors(); final UpdateMsg msg = crossDomainDBCursor.getRecord(); if (msg == null) { synchronized (this) { wait(); } // advance cursor, success/failure will be checked later crossDomainDBCursor.next(); // loop to check whether new changes have been added to the ReplicaDBs continue; } final CSN csn = msg.getCSN(); final DN baseDN = crossDomainDBCursor.getData(); // FIXME problem: what if the serverId is not part of the ServerState? // right now, thread will be blocked if (!canMoveForwardMediumConsistencyPoint(baseDN)) { // the oldest record to insert is newer than the medium consistency // point. Let's wait for a change that can be published. synchronized (this) { // double check to protect against a missed call to notify() if (!canMoveForwardMediumConsistencyPoint(baseDN)) { wait(); // loop to check if changes older than the medium consistency // point have been added to the ReplicaDBs continue; } } } // OK, the oldest change is older than the medium consistency point // let's publish it to the CNIndexDB final String previousCookie = mediumConsistencyRUV.toString(); final ChangeNumberIndexRecord record = new ChangeNumberIndexRecord(previousCookie, baseDN, csn); changelogDB.getChangeNumberIndexDB().addRecord(record); moveForwardMediumConsistencyPoint(csn, baseDN); // advance cursor, success/failure will be checked later crossDomainDBCursor.next(); } catch (ChangelogException e) { if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); // TODO JNR error message i18n } catch (InterruptedException ignored) { // was shutdown called? } } } private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN) { // update, so it becomes the previous cookie for the next change mediumConsistencyRUV.update(baseDN, csn); mediumConsistencyCSN = csn; final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId()); if (offlineCSN != null && offlineCSN.isOlderThan(mediumConsistencyCSN) // If no new updates has been seen for this replica && lastSeenUpdates.removeCSN(baseDN, offlineCSN)) { removeCursor(baseDN, csn); replicasOffline.removeCSN(baseDN, offlineCSN); mediumConsistencyRUV.removeCSN(baseDN, offlineCSN); } } private void removeCursor(final DN baseDN, final CSN csn) { for (Entry>> entry : allCursors .entrySet()) { if (baseDN.equals(entry.getKey())) { final Set serverIds = entry.getValue().keySet(); for (Iterator iter = serverIds.iterator(); iter.hasNext();) { final int serverId = iter.next(); if (csn.getServerId() == serverId) { iter.remove(); return; } } } } } private void createNewCursors() throws ChangelogException { if (!newCursors.isEmpty()) { boolean newCursorAdded = false; for (Iterator> iter = newCursors.entrySet().iterator(); iter.hasNext();) { final Entry entry = iter.next(); final CSN csn = entry.getKey(); if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null)) { newCursorAdded = true; } iter.remove(); } if (newCursorAdded) { crossDomainDBCursor = newCompositeDBCursor(); } } } }