/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.config.server.ConfigException; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.MonitorProvider; import org.opends.server.core.DirectoryServer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.replication.server.changelog.je.DraftCNDB.*; import org.opends.server.types.*; /** * This class is used for managing the replicationServer database for each * server in the topology. It is responsible for efficiently saving the updates * that is received from each master server into stable storage. This class is * also able to generate a {@link DBCursor} that can be used to read all changes * from a given change number. *

* This class publishes some monitoring information below * cn=monitor. */ public class JEChangeNumberIndexDB implements ChangeNumberIndexDB { private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private static int NO_KEY = 0; private DraftCNDB db; /** FIXME What is this field used for? */ private volatile long oldestChangeNumber = NO_KEY; /** * The newest changenumber stored in the DB. It is used to avoid purging the * record with the newest changenumber. The newest record in the changenumber * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is * then retrieved on server startup. */ private volatile long newestChangeNumber = NO_KEY; /** * The last generated value for the change number. It is kept separate from * the {@link #newestChangeNumber} because there is an opportunity for a race * condition between: *

    *
  1. this atomic long being incremented for a new record ('recordB')
  2. *
  3. the current newest record ('recordA') being purged from the DB
  4. *
  5. 'recordB' failing to be inserted in the DB
  6. *
*/ private final AtomicLong lastGeneratedChangeNumber; private DbMonitorProvider dbMonitor = new DbMonitorProvider(); private final AtomicBoolean shutdown = new AtomicBoolean(false); /** * Creates a new JEChangeNumberIndexDB associated to a given LDAP server. * * @param dbEnv the Database Env to use to create the ReplicationServer DB. * server for this domain. * @throws ChangelogException If a database problem happened */ public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException { db = new DraftCNDB(dbEnv); final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord(); final ChangeNumberIndexRecord newestRecord = db.readLastRecord(); oldestChangeNumber = getChangeNumber(oldestRecord); final long newestCN = getChangeNumber(newestRecord); newestChangeNumber = newestCN; // initialization of the lastGeneratedChangeNumber from the DB content // if DB is empty => last record does not exist => default to 0 lastGeneratedChangeNumber = new AtomicLong(newestCN); // Monitoring registration DirectoryServer.deregisterMonitorProvider(dbMonitor); DirectoryServer.registerMonitorProvider(dbMonitor); } private long getChangeNumber(ChangeNumberIndexRecord record) throws ChangelogException { if (record != null) { return record.getChangeNumber(); } return 0; } /** {@inheritDoc} */ @Override public long addRecord(ChangeNumberIndexRecord record) throws ChangelogException { long changeNumber = nextChangeNumber(); final ChangeNumberIndexRecord newRecord = new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), record.getBaseDN(), record.getCSN()); db.addRecord(newRecord); newestChangeNumber = changeNumber; logger.trace("In JEChangeNumberIndexDB.add, added: %s", newRecord); return changeNumber; } /** {@inheritDoc} */ @Override public ChangeNumberIndexRecord getOldestRecord() throws ChangelogException { return db.readFirstRecord(); } /** {@inheritDoc} */ @Override public ChangeNumberIndexRecord getNewestRecord() throws ChangelogException { return db.readLastRecord(); } private long nextChangeNumber() { return lastGeneratedChangeNumber.incrementAndGet(); } /** {@inheritDoc} */ @Override public long getLastGeneratedChangeNumber() { return lastGeneratedChangeNumber.get(); } /** * Get the number of changes. * @return Returns the number of changes. */ public long count() { return db.count(); } /** * Returns whether this database is empty. * * @return true if this database is empty, false * otherwise * @throws ChangelogException * if a database problem occurs. */ public boolean isEmpty() throws ChangelogException { return getNewestRecord() == null; } /** {@inheritDoc} */ @Override public DBCursor getCursorFrom(long startChangeNumber) throws ChangelogException { return new JEChangeNumberIndexDBCursor(db, startChangeNumber); } /** * Shutdown this DB. */ public void shutdown() { if (shutdown.get()) { return; } shutdown.set(true); synchronized (this) { notifyAll(); } db.shutdown(); DirectoryServer.deregisterMonitorProvider(dbMonitor); } /** * Synchronously purges the change number index DB up to and excluding the * provided timestamp. * * @param purgeTimestamp * the timestamp up to which purging must happen * @return the {@link MultiDomainServerState} object that drives purging the * replicaDBs. * @throws ChangelogException * if a database problem occurs. */ public MultiDomainServerState purgeUpTo(long purgeTimestamp) throws ChangelogException { if (isEmpty()) { return null; } final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); final DraftCNDBCursor cursor = db.openDeleteCursor(); try { while (!mustShutdown(shutdown) && cursor.next()) { final ChangeNumberIndexRecord record = cursor.currentRecord(); if (record.getChangeNumber() != oldestChangeNumber) { oldestChangeNumber = record.getChangeNumber(); } if (record.getChangeNumber() == newestChangeNumber) { // do not purge the newest record to avoid having the last generated // changenumber dropping back to 0 if the server restarts return getPurgeCookie(record); } if (record.getCSN().isOlderThan(purgeCSN)) { cursor.delete(); } else { // Current record is not old enough to purge. return getPurgeCookie(record); } } return null; } catch (ChangelogException e) { cursor.abort(); throw e; } catch (Exception e) { cursor.abort(); throw new ChangelogException(e); } finally { cursor.close(); } } private MultiDomainServerState getPurgeCookie( final ChangeNumberIndexRecord record) throws DirectoryException { // Do not include the record's CSN to avoid having it purged return new MultiDomainServerState(record.getPreviousCookie()); } /** * Clear the changes from this DB (from both memory cache and DB storage) for * the provided baseDN. * * @param baseDNToClear * The baseDN for which we want to remove all records from this DB, * null means all. * @throws ChangelogException * if a database problem occurs. */ public void removeDomain(DN baseDNToClear) throws ChangelogException { if (isEmpty()) { return; } final DraftCNDBCursor cursor = db.openDeleteCursor(); try { boolean isOldestRecord = true; while (!mustShutdown(shutdown) && cursor.next()) { final ChangeNumberIndexRecord record = cursor.currentRecord(); if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber) { oldestChangeNumber = record.getChangeNumber(); } if (record.getChangeNumber() == newestChangeNumber) { // do not purge the newest record to avoid having the last generated // changenumber dropping back to 0 if the server restarts return; } if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear)) { cursor.delete(); } else { isOldestRecord = false; } } } catch (ChangelogException e) { cursor.abort(); throw e; } finally { cursor.close(); } } private boolean mustShutdown(AtomicBoolean shutdown) { return shutdown != null && shutdown.get(); } /** * This internal class is used to implement the Monitoring capabilities of the * JEChangeNumberIndexDB. */ private class DbMonitorProvider extends MonitorProvider { /** {@inheritDoc} */ @Override public List getMonitorData() { List attributes = new ArrayList(); attributes.add(createChangeNumberAttribute(true)); attributes.add(createChangeNumberAttribute(false)); attributes.add(Attributes.create("count", Long.toString(count()))); return attributes; } private Attribute createChangeNumberAttribute(boolean isFirst) { final String attributeName = isFirst ? "first-draft-changenumber" : "last-draft-changenumber"; final String changeNumber = String.valueOf(getChangeNumber(isFirst)); return Attributes.create(attributeName, changeNumber); } private long getChangeNumber(boolean isFirst) { try { final ChangeNumberIndexRecord record = isFirst ? db.readFirstRecord() : db.readLastRecord(); if (record != null) { return record.getChangeNumber(); } } catch (ChangelogException e) { logger.traceException(e); } return 0; } /** {@inheritDoc} */ @Override public String getMonitorInstanceName() { return "ChangeNumber Index Database"; } /** {@inheritDoc} */ @Override public void initializeMonitorProvider(MonitorProviderCfg configuration) throws ConfigException,InitializationException { // Nothing to do for now } } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + ": " + oldestChangeNumber + " " + newestChangeNumber; } /** * Clear the changes from this DB (from both memory cache and DB storage). * * @throws ChangelogException * if a database problem occurs. */ public void clear() throws ChangelogException { db.clear(); oldestChangeNumber = getChangeNumber(db.readFirstRecord()); newestChangeNumber = getChangeNumber(db.readLastRecord()); } }