| | |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.je.DraftCNDB.*; |
| | | import org.opends.server.types.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * This class is used for managing the replicationServer database for each |
| | |
| | | * This class publishes some monitoring information below <code> |
| | | * cn=monitor</code>. |
| | | */ |
| | | public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable |
| | | public class JEChangeNumberIndexDB implements ChangeNumberIndexDB |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | |
| | | /** FIXME What is this field used for? */ |
| | | private volatile long oldestChangeNumber = NO_KEY; |
| | | /** |
| | | * The newest changenumber stored in the DB. It is used to avoid trimming the |
| | | * The newest changenumber stored in the DB. It is used to avoid purging the |
| | | * record with the newest changenumber. The newest record in the changenumber |
| | | * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is |
| | | * then retrieved on server startup. |
| | |
| | | * condition between: |
| | | * <ol> |
| | | * <li>this atomic long being incremented for a new record ('recordB')</li> |
| | | * <li>the current newest record ('recordA') being trimmed from the DB</li> |
| | | * <li>the current newest record ('recordA') being purged from the DB</li> |
| | | * <li>'recordB' failing to be inserted in the DB</li> |
| | | * </ol> |
| | | */ |
| | | private final AtomicLong lastGeneratedChangeNumber; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | /** |
| | | * A dedicated thread loops trim(). |
| | | * <p> |
| | | * trim() : deletes from the DB a number of changes that are older than a |
| | | * certain date. |
| | | */ |
| | | private DirectoryThread trimmingThread; |
| | | /** |
| | | * The trim age in milliseconds. Changes record in the change DB that are |
| | | * older than this age are removed. |
| | | * <p> |
| | | * FIXME it never gets updated even when the replication server purge delay is |
| | | * updated |
| | | */ |
| | | private volatile long trimAge; |
| | | |
| | | private ReplicationServer replicationServer; |
| | | |
| | | |
| | | /** |
| | | * Creates a new JEChangeNumberIndexDB associated to a given LDAP server. |
| | | * |
| | | * @param replicationServer The ReplicationServer that creates this instance. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * @param dbEnv the Database Env to use to create the ReplicationServer DB. |
| | | * server for this domain. |
| | | * @throws ChangelogException If a database problem happened |
| | | */ |
| | | public JEChangeNumberIndexDB(ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv) throws ChangelogException |
| | | public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException |
| | | { |
| | | this.replicationServer = replicationServer; |
| | | this.trimAge = replicationServer.getTrimAge(); |
| | | |
| | | // DB initialization |
| | | db = new DraftCNDB(dbenv); |
| | | db = new DraftCNDB(dbEnv); |
| | | final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord(); |
| | | final ChangeNumberIndexRecord newestRecord = db.readLastRecord(); |
| | | oldestChangeNumber = getChangeNumber(oldestRecord); |
| | |
| | | DirectoryServer.registerMonitorProvider(dbMonitor); |
| | | } |
| | | |
| | | /** |
| | | * Creates and starts the thread trimming the CNIndexDB. |
| | | */ |
| | | public void startTrimmingThread() |
| | | { |
| | | trimmingThread = |
| | | new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer"); |
| | | trimmingThread.start(); |
| | | } |
| | | |
| | | private long getChangeNumber(ChangeNumberIndexRecord record) |
| | | throws ChangelogException |
| | | { |
| | |
| | | notifyAll(); |
| | | } |
| | | |
| | | if (trimmingThread != null) |
| | | { |
| | | try |
| | | { |
| | | trimmingThread.join(); |
| | | } |
| | | catch (InterruptedException ignored) |
| | | { |
| | | // Nothing can be done about it, just proceed |
| | | } |
| | | } |
| | | |
| | | db.shutdown(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | } |
| | | |
| | | /** |
| | | * Run method for this class. |
| | | * Periodically Flushes the ReplicationServerDomain cache from memory to the |
| | | * stable storage and trims the old updates. |
| | | * Synchronously purges the change number index DB up to and excluding the |
| | | * provided timestamp. |
| | | * |
| | | * @param purgeTimestamp |
| | | * the timestamp up to which purging must happen |
| | | * @return the {@link MultiDomainServerState} object that drives purging the |
| | | * replicaDBs. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | @Override |
| | | public void run() |
| | | public MultiDomainServerState purgeUpTo(long purgeTimestamp) |
| | | throws ChangelogException |
| | | { |
| | | while (!shutdown.get()) |
| | | if (isEmpty()) |
| | | { |
| | | try { |
| | | trim(shutdown); |
| | | return null; |
| | | } |
| | | |
| | | synchronized (this) |
| | | { |
| | | if (!shutdown.get()) |
| | | { |
| | | try |
| | | { |
| | | wait(1000); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (Exception end) |
| | | final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); |
| | | |
| | | final DraftCNDBCursor cursor = db.openDeleteCursor(); |
| | | try |
| | | { |
| | | while (!mustShutdown(shutdown) && cursor.next()) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH |
| | | .get(stackTraceToSingleLineString(end))); |
| | | if (replicationServer != null) |
| | | final ChangeNumberIndexRecord record = cursor.currentRecord(); |
| | | if (record.getChangeNumber() != oldestChangeNumber) |
| | | { |
| | | replicationServer.shutdown(); |
| | | oldestChangeNumber = record.getChangeNumber(); |
| | | } |
| | | break; |
| | | if (record.getChangeNumber() == newestChangeNumber) |
| | | { |
| | | // do not purge the newest record to avoid having the last generated |
| | | // changenumber dropping back to 0 if the server restarts |
| | | return getPurgeCookie(record); |
| | | } |
| | | |
| | | if (record.getCSN().isOlderThan(purgeCSN)) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | { |
| | | // Current record is not old enough to purge. |
| | | return getPurgeCookie(record); |
| | | } |
| | | } |
| | | |
| | | return null; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | cursor.abort(); |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | cursor.abort(); |
| | | throw new ChangelogException(e); |
| | | } |
| | | finally |
| | | { |
| | | cursor.close(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Trim old changes from this database. |
| | | * |
| | | * @param shutdown |
| | | * AtomicBoolean telling whether the current run must be stopped |
| | | * @throws ChangelogException |
| | | * In case of database problem. |
| | | */ |
| | | public void trim(AtomicBoolean shutdown) throws ChangelogException |
| | | private MultiDomainServerState getPurgeCookie( |
| | | final ChangeNumberIndexRecord record) throws DirectoryException |
| | | { |
| | | if (trimAge == 0) |
| | | return; |
| | | |
| | | clear(null, shutdown); |
| | | // Do not include the record's CSN to avoid having it purged |
| | | return new MultiDomainServerState(record.getPreviousCookie()); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | public void clear(DN baseDNToClear) throws ChangelogException |
| | | { |
| | | clear(baseDNToClear, null); |
| | | } |
| | | |
| | | private void clear(DN baseDNToClear, AtomicBoolean shutdown) |
| | | throws ChangelogException |
| | | public void removeDomain(DN baseDNToClear) throws ChangelogException |
| | | { |
| | | if (isEmpty()) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | for (int i = 0; i < 100; i++) |
| | | final DraftCNDBCursor cursor = db.openDeleteCursor(); |
| | | try |
| | | { |
| | | if (mustShutdown(shutdown)) |
| | | boolean isOldestRecord = true; |
| | | while (!mustShutdown(shutdown) && cursor.next()) |
| | | { |
| | | return; |
| | | } |
| | | final DraftCNDBCursor cursor = db.openDeleteCursor(); |
| | | try |
| | | { |
| | | for (int j = 0; j < 50; j++) |
| | | final ChangeNumberIndexRecord record = cursor.currentRecord(); |
| | | if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber) |
| | | { |
| | | // let's traverse the CNIndexDB |
| | | if (mustShutdown(shutdown) || !cursor.next()) |
| | | { |
| | | cursor.close(); |
| | | return; |
| | | } |
| | | |
| | | final ChangeNumberIndexRecord record = cursor.currentRecord(); |
| | | if (record.getChangeNumber() != oldestChangeNumber) |
| | | { |
| | | oldestChangeNumber = record.getChangeNumber(); |
| | | } |
| | | if (record.getChangeNumber() == newestChangeNumber) |
| | | { |
| | | // do not trim the newest record to avoid having the last generated |
| | | // changenumber dropping back to 0 if the server restarts |
| | | cursor.close(); |
| | | return; |
| | | } |
| | | |
| | | if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN())) |
| | | { |
| | | cursor.delete(); |
| | | continue; |
| | | } |
| | | |
| | | final ReplicationServerDomain domain = |
| | | replicationServer.getReplicationServerDomain(record.getBaseDN()); |
| | | if (domain == null) |
| | | { |
| | | // the domain has been removed since the record was written in the |
| | | // CNIndexDB, thus it makes no sense to keep this record in the DB. |
| | | cursor.delete(); |
| | | continue; |
| | | } |
| | | |
| | | // FIXME there is an opportunity for a phantom record in the CNIndexDB |
| | | // if the replicaDB gets purged after call to domain.getOldestState(). |
| | | final CSN csn = record.getCSN(); |
| | | final ServerState oldestState = domain.getOldestState(); |
| | | final CSN fcsn = oldestState.getCSN(csn.getServerId()); |
| | | if (csn.isOlderThan(fcsn)) |
| | | { |
| | | // This change which has already been purged from the corresponding |
| | | // replicaDB => purge it from CNIndexDB |
| | | cursor.delete(); |
| | | continue; |
| | | } |
| | | |
| | | ServerState csnVector; |
| | | try |
| | | { |
| | | Map<DN, ServerState> csnStartStates = |
| | | MultiDomainServerState.splitGenStateToServerStates( |
| | | record.getPreviousCookie()); |
| | | csnVector = csnStartStates.get(record.getBaseDN()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:" |
| | | + csnVector + " -- StartState:" + oldestState); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // We could not parse the MultiDomainServerState from the record |
| | | // FIXME this is quite an aggressive delete() |
| | | cursor.delete(); |
| | | continue; |
| | | } |
| | | |
| | | if (csnVector == null |
| | | || (csnVector.getCSN(csn.getServerId()) != null |
| | | && !csnVector.cover(oldestState))) |
| | | { |
| | | cursor.delete(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn |
| | | + "Not covering startState"); |
| | | continue; |
| | | } |
| | | |
| | | oldestChangeNumber = record.getChangeNumber(); |
| | | cursor.close(); |
| | | } |
| | | if (record.getChangeNumber() == newestChangeNumber) |
| | | { |
| | | // do not purge the newest record to avoid having the last generated |
| | | // changenumber dropping back to 0 if the server restarts |
| | | return; |
| | | } |
| | | |
| | | cursor.close(); |
| | | if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear)) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | { |
| | | isOldestRecord = false; |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | cursor.abort(); |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | cursor.abort(); |
| | | throw new ChangelogException(e); |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | cursor.abort(); |
| | | throw e; |
| | | } |
| | | finally |
| | | { |
| | | cursor.close(); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | return "ChangeNumber Index Database"; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initializeMonitorProvider(MonitorProviderCfg configuration) |
| | | throws ConfigException,InitializationException |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set the Purge delay for this db Handler. |
| | | * @param delay The purge delay in Milliseconds. |
| | | */ |
| | | public void setPurgeDelay(long delay) |
| | | { |
| | | trimAge = delay; |
| | | } |
| | | |
| | | /** |
| | | * Clear the changes from this DB (from both memory cache and DB storage). |
| | | * |
| | | * @throws ChangelogException |