/* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2013-2016 ForgeRock AS. */ package org.opends.server.replication.server.changelog.file; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.opends.server.api.DirectoryThread; import org.opends.server.backends.ChangelogBackend; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.ReplicaOfflineMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ChangelogStateProvider; import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.forgerock.opendj.ldap.DN; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; import static org.opends.server.util.StaticUtils.*; /** * Thread responsible for inserting replicated changes into the ChangeNumber * Index DB (CNIndexDB for short). *
* Only changes older than the medium consistency point are inserted in the
* CNIndexDB. As a consequence this class is also responsible for maintaining
* the medium consistency point (indirectly through an
* {@link ECLMultiDomainDBCursor}).
*/
public class ChangeNumberIndexer extends DirectoryThread
{
/** The tracer object for the debug logger. */
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
/**
* If it contains nothing, then the run method executes normally.
* Otherwise, the {@link #run()} method must clear its state
* for the supplied domain baseDNs. If a supplied domain is
* {@link DN#rootDN()}, then all domains will be cleared.
*/
private final ConcurrentSkipListSet
* Updates are persistent and stored in the replicaDBs, heartbeats are
* transient and are easily constructed on normal operations.
*
* Note: This object is updated by both heartbeats and changes/updates.
*/
private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
/** Note: This object is updated by replica offline messages. */
private final MultiDomainServerState replicasOffline = new MultiDomainServerState();
/**
* Cursor across all the replicaDBs for all the replication domains. It is
* positioned on the next change that needs to be inserted in the CNIndexDB.
*
* Note: it is only accessed from the {@link #run()} method.
*
* @NonNull
*/
private ECLMultiDomainDBCursor nextChangeForInsertDBCursor;
private MultiDomainServerState cookie = new MultiDomainServerState();
/**
* Builds a ChangeNumberIndexer object.
* @param changelogDB
* the changelogDB
* @param changelogStateProvider
* the replication environment information for access to changelog state
*/
public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider)
{
this(changelogDB, changelogStateProvider, new ECLEnabledDomainPredicate());
}
/**
* Builds a ChangeNumberIndexer object.
* @param changelogDB
* the changelogDB
* @param changelogStateProvider
* the changelog state used for initialization
* @param predicate
*/
ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider,
ECLEnabledDomainPredicate predicate)
{
super("Change number indexer");
this.changelogDB = changelogDB;
this.changelogStateProvider = changelogStateProvider;
this.predicate = predicate;
}
/**
* Ensures the medium consistency point is updated by heartbeats.
*
* @param baseDN
* the baseDN of the domain for which the heartbeat is published
* @param heartbeatCSN
* the CSN coming from the heartbeat
*/
public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
{
if (!predicate.isECLEnabledDomain(baseDN))
{
return;
}
final CSN oldestCSNBefore = getOldestLastAliveCSN();
lastAliveCSNs.update(baseDN, heartbeatCSN);
tryNotify(oldestCSNBefore);
}
/**
* Indicates if the replica corresponding to provided domain DN and server id
* is offline.
*
* @param domainDN
* base DN of the replica
* @param serverId
* server id of the replica
* @return {@code true} if replica is offline, {@code false} otherwise
*/
public boolean isReplicaOffline(DN domainDN, int serverId)
{
return replicasOffline.getCSN(domainDN, serverId) != null;
}
/**
* Ensures the medium consistency point is updated by UpdateMsg.
*
* @param baseDN
* the baseDN of the domain for which the heartbeat is published
* @param updateMsg
* the updateMsg that will update the medium consistency point
* @throws ChangelogException
* If a database problem happened
*/
public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
throws ChangelogException
{
if (!predicate.isECLEnabledDomain(baseDN))
{
return;
}
final CSN oldestCSNBefore = getOldestLastAliveCSN();
lastAliveCSNs.update(baseDN, updateMsg.getCSN());
tryNotify(oldestCSNBefore);
}
/**
* Signals a replica went offline.
*
* @param baseDN
* the replica's replication domain
* @param offlineCSN
* the serverId and time of the replica that went offline
*/
public void replicaOffline(DN baseDN, CSN offlineCSN)
{
if (!predicate.isECLEnabledDomain(baseDN))
{
return;
}
replicasOffline.update(baseDN, offlineCSN);
final CSN oldestCSNBefore = getOldestLastAliveCSN();
lastAliveCSNs.update(baseDN, offlineCSN);
tryNotify(oldestCSNBefore);
}
private CSN getOldestLastAliveCSN()
{
return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond();
}
/**
* Notifies the Change number indexer thread if it will be able to do some
* work.
*/
private void tryNotify(final CSN oldestCSNBefore)
{
if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore))
{
synchronized (this)
{
notify();
}
}
}
/**
* Used for waking up the {@link ChangeNumberIndexer} thread because it might
* have some work to do.
*/
private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore)
{
final CSN oldestCSNAfter = getOldestLastAliveCSN();
// ensure that all initial replicas alive information have been updated
// with CSNs that are acceptable for moving the medium consistency forward
return allInitialReplicasAreOfflineOrAlive()
&& oldestCSNBefore != null // then oldestCSNAfter cannot be null
// has the oldest CSN changed?
&& oldestCSNBefore.isOlderThan(oldestCSNAfter);
}
/**
* Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN
* must be persisted to the change number index DB.
*/
private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist)
{
// ensure that all initial replicas alive information have been updated
// with CSNs that are acceptable for moving the medium consistency forward
return allInitialReplicasAreOfflineOrAlive()
// can we persist the next CSN?
&& nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN());
}
/**
* Returns true only if the initial replicas known from the changelog state DB
* are either:
*
* Rely on the DirectoryThread uncaught exceptions handler for logging error +
* alert.
*
* Message logged here gives corrective information to the administrator.
*/
private void logUnexpectedException(Exception e)
{
logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
getClass().getSimpleName(), stackTraceToSingleLineString(e));
}
private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException
{
final int mcServerId = mcCSN.getServerId();
final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
if (offlineCSN != null)
{
if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN))
{
// replica is back online, we can forget the last time it was offline
replicasOffline.removeCSN(mcBaseDN, offlineCSN);
}
else if (offlineCSN.isOlderThan(mcCSN))
{
/*
* replica is not back online, Medium consistency point has gone past
* its last offline time, and there are no more changes after the
* offline CSN in the cursor: remove everything known about it
* (offlineCSN from lastAliveCSN and remove all knowledge of this replica
* from the medium consistency RUV).
*/
lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
}
}
// advance the cursor we just read from,
// success/failure will be checked later
nextChangeForInsertDBCursor.next();
}
/**
* Asks the current thread to clear its state for the specified domain.
*
* Note: This method blocks the current thread until state is cleared.
*
* @param baseDN
* the baseDN to be cleared from this thread's state. {@code null} and
* {@link DN#rootDN()} mean "clear all domains".
*/
public void clear(DN baseDN)
{
final DN baseDNToClear = baseDN != null ? baseDN : DN.rootDN();
domainsToClear.add(baseDNToClear);
while (domainsToClear.contains(baseDNToClear)
&& !State.TERMINATED.equals(getState()))
{
// wait until clear() has been done by thread, always waking it up
synchronized (this)
{
notify();
}
// ensures thread wait that this thread's state is cleaned up
Thread.yield();
}
}
}
*
* In both cases, we have enough information to compute medium consistency
* without waiting any further.
*/
private boolean allInitialReplicasAreOfflineOrAlive()
{
for (DN baseDN : lastAliveCSNs)
{
for (CSN csn : lastAliveCSNs.getServerState(baseDN))
{
if (csn.getTime() == 0
&& replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
{
// this is the oldest possible CSN, but the replica is not offline
// we must wait for more up to date information from this replica
return false;
}
}
}
return true;
}
/**
* Restores in memory data needed to build the CNIndexDB. In particular,
* initializes the changes cursor to the medium consistency point.
*/
private void initialize() throws ChangelogException
{
final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
initializeLastAliveCSNs(domainDB);
initializeNextChangeCursor(domainDB);
initializeOfflineReplicas();
}
private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
{
// Initialize the multi domain cursor only from the change number index record.
// The cookie is always empty at this stage.
final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
final CSN newestCsn = newestRecord != null ? newestRecord.getCSN() : null;
final CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, newestCsn);
final MultiDomainServerState unused = new MultiDomainServerState();
MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = domainDB.getCursorFrom(unused, options);
nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
ChangelogBackend.updateCookieToMediumConsistencyPoint(cookie, nextChangeForInsertDBCursor, newestRecord);
}
private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
{
for (Entry