/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2014-2016 ForgeRock AS */ package org.opends.server.replication.server.changelog.file; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileFilter; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; import java.io.Writer; import java.nio.file.AtomicMoveNotSupportedException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import net.jcip.annotations.GuardedBy; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.util.time.TimeService; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ChangelogStateProvider; import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.util.StaticUtils; import static org.opends.messages.ReplicationMessages.*; /** * Represents the replication environment, which allows to manage the lifecycle * of the replication changelog. *
* A changelog has a root directory, under which the following directories and files are * created : *
* Each domain directory contains the following directories and files : *
* Layout example with two domains "o=test1" and "o=test2", each having server * ids 22 and 33, with server id 33 for domain "o=test1" being offline : * *
* +---changelog * | \---domains.state [contains mapping: 1 => "o=test1", 2 => "o=test2"] * | \---changenumberindex * | \--- head.log [contains last records written] * | \--- 1_50.log [contains records with keys in interval [1, 50]] * | \--- rotationtime198745512.last * | \---1.domain * | \---generation1.id * | \---22.server * | \---head.log [contains last records written] * | \---33.server * | \---head.log [contains last records written] * \---offline.state * | \---2.domain * | \---generation1.id * | \---22.server * | \---head.log [contains last records written] * | \---33.server * | \---head.log [contains last records written] **/ class ReplicationEnvironment implements ChangelogStateProvider { private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private static final long CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES = 1024 * 1024; private static final long REPLICA_DB_MAX_LOG_FILE_SIZE_IN_BYTES = 10 * CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES; private static final int NO_GENERATION_ID = -1; /** Extension for the temporary file used when modifying an environment file. */ private static final String FILE_EXTENSION_TEMP = ".tmp"; private static final String CN_INDEX_DB_DIRNAME = "changenumberindex"; private static final String DOMAINS_STATE_FILENAME = "domains.state"; static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state"; static final String LAST_ROTATION_TIME_FILE_PREFIX = "rotationtime"; static final String LAST_ROTATION_TIME_FILE_SUFFIX = ".ms"; private static final String DOMAIN_STATE_SEPARATOR = ":"; private static final String DOMAIN_SUFFIX = ".dom"; private static final String SERVER_ID_SUFFIX = ".server"; private static final String GENERATION_ID_FILE_PREFIX = "generation"; private static final String GENERATION_ID_FILE_SUFFIX = ".id"; private static final String UTF8_ENCODING = "UTF-8"; private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter() { @Override public boolean accept(File file) { return file.isDirectory() && file.getName().endsWith(DOMAIN_SUFFIX); } }; private static final FileFilter SERVER_ID_FILE_FILTER = new FileFilter() { @Override public boolean accept(File file) { return file.isDirectory() && file.getName().endsWith(SERVER_ID_SUFFIX); } }; private static final FileFilter GENERATION_ID_FILE_FILTER = new FileFilter() { @Override public boolean accept(File file) { return file.isFile() && file.getName().startsWith(GENERATION_ID_FILE_PREFIX) && file.getName().endsWith(GENERATION_ID_FILE_SUFFIX); } }; private static final FileFilter LAST_ROTATION_TIME_FILE_FILTER = new FileFilter() { @Override public boolean accept(File file) { return file.isFile() && file.getName().startsWith(LAST_ROTATION_TIME_FILE_PREFIX) && file.getName().endsWith(LAST_ROTATION_TIME_FILE_SUFFIX); } }; /** Root path where the replication log is stored. */ private final String replicationRootPath; /** * The current changelogState. This is in-memory version of what is inside the * on-disk changelogStateDB. It improves performances in case the * changelogState is read often. */ @GuardedBy("domainsLock") private final ChangelogState changelogState; /** The list of logs that are in use for Replica DBs. */ private final List
* It is disabled if the interval is equals to zero.
* The interval can be modified at any time.
*/
private long cnIndexDBRotationInterval;
/**
* For CN Index DB, the last time a log file was rotated.
* It is persisted to file each time it changes and read at server start. */
private long cnIndexDBLastRotationTime;
/**
* Creates the replication environment.
*
* @param rootPath
* Root path where replication log is stored.
* @param replicationServer
* The underlying replication server.
* @param timeService
* Time service to use for timing.
* @throws ChangelogException
* If an error occurs during initialization.
*/
ReplicationEnvironment(final String rootPath,
final ReplicationServer replicationServer, final TimeService timeService) throws ChangelogException
{
this.replicationRootPath = rootPath;
this.replicationServer = replicationServer;
this.timeService = timeService;
this.changelogState = readOnDiskChangelogState();
this.cnIndexDBLastRotationTime = readOnDiskLastRotationTime();
}
/**
* Sets the rotation time interval of a log file for the CN Index DB.
*
* @param timeInterval
* time interval for rotation of a log file.
*/
void setCNIndexDBRotationInterval(long timeInterval)
{
cnIndexDBRotationInterval = timeInterval;
for (Log
* TODO: ECL how to manage compatibility of this db
* with new domains added or removed ?
*
* @return the log.
* @throws ChangelogException
* when a problem occurs.
*/
Log
* The log DBs are not closed by this method. It assumes they are already
* closed.
*/
void shutdown()
{
if (isShuttingDown.compareAndSet(false, true))
{
logsReplicaDB.clear();
logsCNIndexDB.clear();
}
}
/**
* Clears the generated id associated to the provided domain DN from the state
* Db.
*
* If generation id can't be found, it is not considered as an error, the
* method will just return.
*
* @param domainDN
* The domain DN for which the generationID must be cleared.
* @throws ChangelogException
* If a problem occurs during clearing.
*/
void clearGenerationId(final DN domainDN) throws ChangelogException
{
synchronized (domainsLock)
{
final String domainId = domains.get(domainDN);
if (domainId == null)
{
return; // unknown domain => no-op
}
final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
if (idFile != null)
{
final boolean isDeleted = idFile.delete();
if (!isDeleted)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
}
}
changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID);
}
}
/**
* Reset the generationId to the default value used when there is no
* generation id.
*
* @param baseDN
* The baseDN for which the generationID must be reset.
* @throws ChangelogException
* If a problem occurs during reset.
*/
void resetGenerationId(final DN baseDN) throws ChangelogException
{
synchronized (domainsLock)
{
clearGenerationId(baseDN);
final String domainId = domains.get(baseDN);
if (domainId == null)
{
return; // unknown domain => no-op
}
final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
ensureGenerationIdFileExists(generationIdPath);
changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID);
}
}
/**
* Notify that log file has been rotated for provided log.
*
* The last rotation time is persisted to a file and read at startup time.
*
* @param log
* the log that has a file rotated.
* @throws ChangelogException
* If a problem occurs
*/
void notifyLogFileRotation(Log, ?> log) throws ChangelogException
{
// only CN Index DB log rotation time is persisted
if (logsCNIndexDB.contains(log))
{
updateCNIndexDBLastRotationTime(timeService.now());
}
}
/**
* Notify that the replica corresponding to provided domain and provided CSN
* is offline.
*
* @param domainDN
* the domain of the offline replica
* @param offlineCSN
* the offline replica serverId and offline timestamp
* @throws ChangelogException
* if a problem occurs
*/
void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
{
synchronized (domainsLock)
{
final String domainId = domains.get(domainDN);
if (domainId == null)
{
return; // unknown domain => no-op
}
final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
if (!serverIdPath.exists())
{
return; // no serverId anymore => no-op
}
final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
try (Writer writer = newTempFileWriter(offlineFile))
{
// Only the last sent offline CSN is kept
writer.write(offlineCSN.toString());
StaticUtils.close(writer);
changelogState.addOfflineReplica(domainDN, offlineCSN);
commitFile(offlineFile);
}
catch (IOException e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get(
domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e);
}
}
}
/**
* Notify that the replica corresponding to provided domain and server id
* is online.
*
* @param domainDN
* the domain of the replica
* @param serverId
* the replica serverId
* @throws ChangelogException
* if a problem occurs
*/
void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
{
synchronized (domainsLock)
{
final String domainId = domains.get(domainDN);
if (domainId == null)
{
return; // unknown domain => no-op
}
final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
if (offlineFile.exists())
{
final boolean isDeleted = offlineFile.delete();
if (!isDeleted)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get(
offlineFile.getPath(), domainDN.toString(), serverId));
}
}
changelogState.removeOfflineReplica(domainDN, serverId);
}
}
/** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
private void readDomainsStateFile() throws ChangelogException
{
final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME);
if (domainsStateFile.exists())
{
BufferedReader reader = null;
String line = null;
try
{
reader = newFileReader(domainsStateFile);
while ((line = reader.readLine()) != null)
{
final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
final String domainId = line.substring(0, separatorPos);
final DN domainDN = DN.valueOf(line.substring(separatorPos+1));
domains.put(domainDN, domainId);
}
}
catch(DirectoryException e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE.get(
domainsStateFile.getPath(), line), e);
}
catch(Exception e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE.get(
domainsStateFile.getPath()), e);
}
finally {
StaticUtils.close(reader);
}
}
}
/**
* Checks that domain directories in file system are consistent with
* information from domains mapping.
*/
private void checkDomainDirectories(final File changelogPath) throws ChangelogException
{
final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
if (dnDirectories != null)
{
final Set
* Once writes are finished, the {@code commitFile()} method should be called to finish the update
* of the provided file.
*/
private BufferedWriter newTempFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException
{
File tempFile = getTempFileFor(file);
return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tempFile), UTF8_ENCODING));
}
/**
* "Commit" the provided file by moving the ".tmp" file to its final location.
*
* In order to prevent partially written environment files, update of files is always
* performed by writing first a ".tmp" version and then switching the ".tmp" version to
* the final version once update is finished.
*
* This method effectively moves the ".tmp" version to the final version.
*
* @param file
* the final file location.
*/
private void commitFile(final File file) throws IOException
{
File tempFile = getTempFileFor(file);
try
{
Files.move(tempFile.toPath(), file.toPath(), StandardCopyOption.ATOMIC_MOVE);
}
catch (FileAlreadyExistsException | AtomicMoveNotSupportedException e)
{
// The atomic move could fail depending on OS (mostly on old Windows versions)
// See OPENDJ-1811 for details
// Try to proceed with a non-atomic move
if (file.exists())
{
file.delete();
}
Files.move(tempFile.toPath(), file.toPath());
}
}
/** Returns a temporary file from provided file, by adding the ".tmp" suffix. */
private File getTempFileFor(File file) {
return new File(file.getParentFile(), file.getName() + FILE_EXTENSION_TEMP);
}
/** Returns a buffered reader on the provided file. */
private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException
{
return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING));
}
}