>();
private ReplicationDbEnv dbEnv;
private ReplicationServerCfg config;
private final File dbDirectory;
/**
* The handler of the changelog database, the database stores the relation
* between a change number and the associated cookie.
*
* @GuardedBy("cnIndexDBLock")
*/
private JEChangeNumberIndexDB cnIndexDB;
private final AtomicReference cnIndexer =
new AtomicReference();
/** Used for protecting {@link ChangeNumberIndexDB} related state. */
private final Object cnIndexDBLock = new Object();
/**
* The purge delay (in milliseconds). Records in the changelog DB that are
* older than this delay might be removed.
*/
private long purgeDelayInMillis;
private final AtomicReference cnPurger =
new AtomicReference();
private volatile long latestPurgeDate;
/** The local replication server. */
private final ReplicationServer replicationServer;
private AtomicBoolean shutdown = new AtomicBoolean();
private static final DBCursor EMPTY_CURSOR =
new DBCursor()
{
@Override
public boolean next()
{
return false;
}
@Override
public UpdateMsg getRecord()
{
return null;
}
@Override
public void close()
{
// empty
}
@Override
public String toString()
{
return "EmptyDBCursor";
}
};
/**
* Builds an instance of this class.
*
* @param replicationServer
* the local replication server.
* @param config
* the replication server configuration
* @throws ConfigException
* if a problem occurs opening the supplied directory
*/
public JEChangelogDB(ReplicationServer replicationServer,
ReplicationServerCfg config) throws ConfigException
{
this.config = config;
this.replicationServer = replicationServer;
this.dbDirectory = makeDir(config.getReplicationDBDirectory());
}
private File makeDir(String dbDirName) throws ConfigException
{
// Check that this path exists or create it.
File dbDirectory = getFileForPath(dbDirName);
try
{
if (!dbDirectory.exists())
{
dbDirectory.mkdir();
}
return dbDirectory;
}
catch (Exception e)
{
if (debugEnabled())
TRACER.debugCaught(DebugLogLevel.ERROR, e);
final MessageBuilder mb = new MessageBuilder();
mb.append(e.getLocalizedMessage());
mb.append(" ");
mb.append(String.valueOf(dbDirectory));
Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
throw new ConfigException(msg, e);
}
}
private Map getDomainMap(DN baseDN)
{
final Map domainMap = domainToReplicaDBs.get(baseDN);
if (domainMap != null)
{
return domainMap;
}
return Collections.emptyMap();
}
private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
{
return getDomainMap(baseDN).get(serverId);
}
/**
* Provision resources for the specified serverId in the specified replication
* domain.
*
* @param baseDN
* the replication domain where to add the serverId
* @param serverId
* the server Id to add to the replication domain
* @throws ChangelogException
* If a database error happened.
*/
private void commission(DN baseDN, int serverId, ReplicationServer rs)
throws ChangelogException
{
getOrCreateReplicaDB(baseDN, serverId, rs);
}
/**
* Returns a {@link JEReplicaDB}, possibly creating it.
*
* @param baseDN
* the baseDN for which to create a ReplicaDB
* @param serverId
* the serverId for which to create a ReplicaDB
* @param rs
* the ReplicationServer
* @return a Pair with the JEReplicaDB and a boolean indicating whether it had
* to be created
* @throws ChangelogException
* if a problem occurred with the database
*/
Pair getOrCreateReplicaDB(DN baseDN,
int serverId, ReplicationServer rs) throws ChangelogException
{
while (!shutdown.get())
{
final ConcurrentMap domainMap =
getExistingOrNewDomainMap(baseDN);
final Pair result =
getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs);
if (result != null)
{
return result;
}
}
throw new ChangelogException(
ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
}
private ConcurrentMap getExistingOrNewDomainMap(
DN baseDN)
{
// happy path: the domainMap already exists
final ConcurrentMap currentValue =
domainToReplicaDBs.get(baseDN);
if (currentValue != null)
{
return currentValue;
}
// unlucky, the domainMap does not exist: take the hit and create the
// newValue, even though the same could be done concurrently by another
// thread
final ConcurrentMap newValue =
new ConcurrentHashMap();
final ConcurrentMap previousValue =
domainToReplicaDBs.putIfAbsent(baseDN, newValue);
if (previousValue != null)
{
// there was already a value associated to the key, let's use it
return previousValue;
}
return newValue;
}
private Pair getExistingOrNewReplicaDB(
final ConcurrentMap domainMap, int serverId,
DN baseDN, ReplicationServer rs) throws ChangelogException
{
// happy path: the JEReplicaDB already exists
JEReplicaDB currentValue = domainMap.get(serverId);
if (currentValue != null)
{
return Pair.of(currentValue, false);
}
// unlucky, the JEReplicaDB does not exist: take the hit and synchronize
// on the domainMap to create a new ReplicaDB
synchronized (domainMap)
{
// double-check
currentValue = domainMap.get(serverId);
if (currentValue != null)
{
return Pair.of(currentValue, false);
}
if (domainToReplicaDBs.get(baseDN) != domainMap)
{
// the domainMap could have been concurrently removed because
// 1) a shutdown was initiated or 2) an initialize was called.
// Return will allow the code to:
// 1) shutdown properly or 2) lazily recreate the JEReplicaDB
return null;
}
final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv);
domainMap.put(serverId, newValue);
return Pair.of(newValue, true);
}
}
/** {@inheritDoc} */
@Override
public void initializeDB()
{
try
{
final File dbDir = getFileForPath(config.getReplicationDBDirectory());
dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
final ChangelogState changelogState = dbEnv.readChangelogState();
initializeChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
startIndexer(changelogState);
}
setPurgeDelay(replicationServer.getPurgeDelay());
}
catch (ChangelogException e)
{
if (debugEnabled())
TRACER.debugCaught(DebugLogLevel.ERROR, e);
logError(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(),
e.getLocalizedMessage()));
}
}
private void initializeChangelogState(final ChangelogState changelogState)
throws ChangelogException
{
for (Map.Entry entry :
changelogState.getDomainToGenerationId().entrySet())
{
replicationServer.getReplicationServerDomain(entry.getKey(), true)
.initGenerationID(entry.getValue());
}
for (Map.Entry> entry :
changelogState.getDomainToServerIds().entrySet())
{
for (int serverId : entry.getValue())
{
commission(entry.getKey(), serverId, replicationServer);
}
}
}
private void shutdownCNIndexDB() throws ChangelogException
{
synchronized (cnIndexDBLock)
{
if (cnIndexDB != null)
{
cnIndexDB.shutdown();
}
}
}
/** {@inheritDoc} */
@Override
public void shutdownDB() throws ChangelogException
{
if (!this.shutdown.compareAndSet(false, true))
{ // shutdown has already been initiated
return;
}
// Remember the first exception because :
// - we want to try to remove everything we want to remove
// - then throw the first encountered exception
ChangelogException firstException = null;
final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
if (indexer != null)
{
indexer.initiateShutdown();
}
final ChangelogDBPurger purger = cnPurger.getAndSet(null);
if (purger != null)
{
purger.initiateShutdown();
}
try
{
shutdownCNIndexDB();
}
catch (ChangelogException e)
{
firstException = e;
}
for (Iterator> it =
this.domainToReplicaDBs.values().iterator(); it.hasNext();)
{
final ConcurrentMap domainMap = it.next();
synchronized (domainMap)
{
it.remove();
for (JEReplicaDB replicaDB : domainMap.values())
{
replicaDB.shutdown();
}
}
}
if (dbEnv != null)
{
dbEnv.shutdown();
}
if (firstException != null)
{
throw firstException;
}
}
/**
* Clears all content from the changelog database, but leaves its directory on
* the filesystem.
*
* @throws ChangelogException
* If a database problem happened
*/
public void clearDB() throws ChangelogException
{
if (!dbDirectory.exists())
{
return;
}
// Remember the first exception because :
// - we want to try to remove everything we want to remove
// - then throw the first encountered exception
ChangelogException firstException = null;
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
indexer.clear();
}
for (DN baseDN : this.domainToReplicaDBs.keySet())
{
removeDomain(baseDN);
}
synchronized (cnIndexDBLock)
{
if (cnIndexDB != null)
{
try
{
cnIndexDB.clear();
}
catch (ChangelogException e)
{
firstException = e;
}
try
{
shutdownCNIndexDB();
}
catch (ChangelogException e)
{
if (firstException == null)
{
firstException = e;
}
else if (debugEnabled())
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
cnIndexDB = null;
}
}
if (firstException != null)
{
throw firstException;
}
}
/** {@inheritDoc} */
@Override
public void removeDB() throws ChangelogException
{
shutdownDB();
StaticUtils.recursiveDelete(dbDirectory);
}
/** {@inheritDoc} */
@Override
public long getDomainChangesCount(DN baseDN)
{
long entryCount = 0;
for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
{
entryCount += replicaDB.getChangesCount();
}
return entryCount;
}
/** {@inheritDoc} */
@Override
public ServerState getDomainOldestCSNs(DN baseDN)
{
final ServerState result = new ServerState();
for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
{
result.update(replicaDB.getOldestCSN());
}
return result;
}
/** {@inheritDoc} */
@Override
public ServerState getDomainNewestCSNs(DN baseDN)
{
final ServerState result = new ServerState();
for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
{
result.update(replicaDB.getNewestCSN());
}
return result;
}
/** {@inheritDoc} */
@Override
public ServerState getDomainLastAliveCSNs(DN baseDN)
{
final ChangeNumberIndexer indexer = this.cnIndexer.get();
if (indexer != null)
{
final ServerState results = indexer.getDomainLastAliveCSNs(baseDN);
if (results != null)
{
// return a copy to protect against concurrent modifications
return results.duplicate();
}
}
return new ServerState();
}
/** {@inheritDoc} */
@Override
public void removeDomain(DN baseDN) throws ChangelogException
{
// Remember the first exception because :
// - we want to try to remove everything we want to remove
// - then throw the first encountered exception
ChangelogException firstException = null;
// 1- clear the replica DBs
Map domainMap = domainToReplicaDBs.get(baseDN);
if (domainMap != null)
{
synchronized (domainMap)
{
domainMap = domainToReplicaDBs.remove(baseDN);
for (JEReplicaDB replicaDB : domainMap.values())
{
try
{
replicaDB.clear();
}
catch (ChangelogException e)
{
firstException = e;
}
replicaDB.shutdown();
}
}
}
// 2- clear the ChangeNumber index DB
synchronized (cnIndexDBLock)
{
if (cnIndexDB != null)
{
try
{
cnIndexDB.removeDomain(baseDN);
}
catch (ChangelogException e)
{
if (firstException == null)
{
firstException = e;
}
else if (debugEnabled())
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
}
}
// 3- clear the changelogstate DB
try
{
dbEnv.clearGenerationId(baseDN);
}
catch (ChangelogException e)
{
if (firstException == null)
{
firstException = e;
}
else if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
}
if (firstException != null)
{
throw firstException;
}
}
/** {@inheritDoc} */
@Override
public void setPurgeDelay(long purgeDelayInMillis)
{
this.purgeDelayInMillis = purgeDelayInMillis;
final ChangelogDBPurger purger;
if (purgeDelayInMillis > 0)
{
purger = new ChangelogDBPurger();
if (cnPurger.compareAndSet(null, purger))
{
purger.start();
} // otherwise a purger was already running
}
else
{
purger = cnPurger.getAndSet(null);
if (purger != null)
{
purger.initiateShutdown();
}
}
}
/** {@inheritDoc} */
@Override
public void setComputeChangeNumber(boolean computeChangeNumber)
throws ChangelogException
{
if (computeChangeNumber)
{
startIndexer(dbEnv.readChangelogState());
}
else
{
final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
if (indexer != null)
{
indexer.initiateShutdown();
}
}
}
private void startIndexer(final ChangelogState changelogState)
{
final ChangeNumberIndexer indexer =
new ChangeNumberIndexer(this, changelogState);
if (cnIndexer.compareAndSet(null, indexer))
{
indexer.start();
}
}
/** {@inheritDoc} */
@Override
public long getDomainLatestTrimDate(DN baseDN)
{
return latestPurgeDate;
}
/** {@inheritDoc} */
@Override
public ChangeNumberIndexDB getChangeNumberIndexDB()
{
synchronized (cnIndexDBLock)
{
if (cnIndexDB == null)
{
try
{
cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
}
catch (Exception e)
{
if (debugEnabled())
TRACER.debugCaught(DebugLogLevel.ERROR, e);
logError(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
}
}
return cnIndexDB;
}
}
/** {@inheritDoc} */
@Override
public ReplicationDomainDB getReplicationDomainDB()
{
return this;
}
/** {@inheritDoc} */
@Override
public DBCursor getCursorFrom(DN baseDN, CSN startAfterCSN)
throws ChangelogException
{
// Builds a new serverState for all the serverIds in the replication domain
// to ensure we get cursors starting after the provided CSN.
return getCursorFrom(baseDN, buildServerState(baseDN, startAfterCSN));
}
/** {@inheritDoc} */
@Override
public DBCursor getCursorFrom(DN baseDN,
ServerState startAfterServerState) throws ChangelogException
{
final Set serverIds = getDomainMap(baseDN).keySet();
final Map, Void> cursors =
new HashMap, Void>(serverIds.size());
for (int serverId : serverIds)
{
// get the last already sent CSN from that server to get a cursor
final CSN lastCSN = startAfterServerState != null ?
startAfterServerState.getCSN(serverId) : null;
cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
}
return new CompositeDBCursor(cursors);
}
/** {@inheritDoc} */
@Override
public DBCursor getCursorFrom(DN baseDN, int serverId,
CSN startAfterCSN) throws ChangelogException
{
JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
DBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
cursor.next();
return cursor;
}
return EMPTY_CURSOR;
}
private ServerState buildServerState(DN baseDN, CSN startAfterCSN)
{
final ServerState result = new ServerState();
if (startAfterCSN == null)
{
return result;
}
for (int serverId : getDomainMap(baseDN).keySet())
{
if (serverId == startAfterCSN.getServerId())
{
// reuse the provided CSN one as it is the most accurate
result.update(startAfterCSN);
}
else
{
// build a new CSN, ignoring the seqNum since it is irrelevant for
// a different serverId
final CSN csn = startAfterCSN; // only used for increased readability
result.update(new CSN(csn.getTime(), 0, serverId));
}
}
return result;
}
/** {@inheritDoc} */
@Override
public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
throws ChangelogException
{
final Pair pair = getOrCreateReplicaDB(baseDN,
updateMsg.getCSN().getServerId(), replicationServer);
final JEReplicaDB replicaDB = pair.getFirst();
final boolean wasCreated = pair.getSecond();
replicaDB.add(updateMsg);
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
indexer.publishUpdateMsg(baseDN, updateMsg);
}
return wasCreated;
}
/** {@inheritDoc} */
@Override
public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
{
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
indexer.publishHeartbeat(baseDN, heartbeatCSN);
}
}
/** {@inheritDoc} */
@Override
public void replicaOffline(DN baseDN, CSN offlineCSN)
{
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
indexer.replicaOffline(baseDN, offlineCSN);
}
// TODO save this state in the changelogStateDB?
}
/**
* The thread purging the changelogDB on a regular interval. Records are
* purged from the changelogDB is they are older than a delay specified in
* seconds. The purge process works in two steps:
*
* - first purge the changeNumberIndexDB and retrieve information to drive
* replicaDBs purging
* - proceed to purge each replicaDBs based on the information collected
* when purging the changeNumberIndexDB
*
*/
private final class ChangelogDBPurger extends DirectoryThread
{
protected ChangelogDBPurger()
{
super("changelog DB purger");
}
/** {@inheritDoc} */
@Override
public void run()
{
// initialize CNIndexDB
getChangeNumberIndexDB();
while (!isShutdownInitiated())
{
try
{
final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
if (localCNIndexDB == null)
{ // shutdown has been called
return;
}
final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
final MultiDomainServerState purgeUpToCookie =
localCNIndexDB.purgeUpTo(purgeTimestamp);
if (purgeUpToCookie == null)
{ // this can happen when the change number index DB is empty
continue;
}
/*
* Drive purge of the replica DBs by the oldest non purged cookie in
* the change number index DB.
*/
for (Entry> entry1
: domainToReplicaDBs.entrySet())
{
final DN baseDN = entry1.getKey();
final Map domainMap = entry1.getValue();
for (Entry entry2 : domainMap.entrySet())
{
final Integer serverId = entry2.getKey();
final JEReplicaDB replicaDB = entry2.getValue();
replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
}
}
latestPurgeDate = purgeTimestamp;
// purge delay is specified in seconds so it should not be a problem
// to sleep for 500 millis
sleep(500);
}
catch (Exception e)
{
logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
.get(stackTraceToSingleLineString(e)));
if (replicationServer != null)
{
replicationServer.shutdown();
}
}
}
}
}
}