/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.file; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileFilter; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.Writer; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.util.StaticUtils; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.messages.ReplicationMessages.*; /** * Represents the replication environment, which allows to manage the lifecycle * of the replication changelog. *
* A changelog has a root directory, under which the following directories and files are * created : *
* Each domain directory contains the following directories and files : *
* Layout example with two domains "o=test1" and "o=test2", each having server * ids 22 and 33 : * *
* +---changelog * | \---domains.state [contains mapping: 1 => "o=test1", 2 => "o=test2"] * | \---changenumberindex * | \--- head.log [contains last records written] * | \--- 1_50.log [contains records with keys in interval [1, 50]] * | \---1.domain * | \---generation1.id * | \---22.server * | \---head.log * | \---33.server * | \---head.log * | \---2.domain * | \---generation1.id * | \---22.server * | \---head.log * | \---33.server * | \---head.log **/ class ReplicationEnvironment { private static final DebugTracer TRACER = getTracer(); // TODO : to replace by configurable value private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024; private static final int NO_GENERATION_ID = -1; private static final String CN_INDEX_DB_DIRNAME = "changenumberindex"; private static final String DOMAINS_STATE_FILENAME = "domains.state"; private static final String DOMAIN_STATE_SEPARATOR = ":"; private static final String DOMAIN_SUFFIX = ".domain"; private static final String SERVER_ID_SUFFIX = ".server"; private static final String GENERATION_ID_FILE_PREFIX = "generation"; private static final String GENERATION_ID_FILE_SUFFIX = ".id"; private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter() { @Override public boolean accept(File file) { return file.isDirectory() && file.getName().endsWith(DOMAIN_SUFFIX); } }; private static final FileFilter SERVER_ID_FILE_FILTER = new FileFilter() { @Override public boolean accept(File file) { return file.isDirectory() && file.getName().endsWith(SERVER_ID_SUFFIX); } }; private static final FileFilter GENERATION_ID_FILE_FILTER = new FileFilter() { @Override public boolean accept(File file) { return file.isFile() && file.getName().startsWith(GENERATION_ID_FILE_PREFIX) && file.getName().endsWith(GENERATION_ID_FILE_SUFFIX); } }; /** Root path where the replication log is stored. */ private final String replicationRootPath; /** The list of logs that are in use. */ private final List
* TODO: ECL how to manage compatibility of this db
* with new domains added or removed ?
*
* @return the log.
* @throws ChangelogException
* when a problem occurs.
*/
Log
* The log DBs are not closed by this method. It assumes they are already
* closed.
*/
void shutdown()
{
if (isShuttingDown.compareAndSet(false, true))
{
logs.clear();
}
}
/**
* Clears the generated id associated to the provided domain DN from the state
* Db.
*
* If generation id can't be found, it is not considered as an error, the
* method will just return.
*
* @param domainDN
* The domain DN for which the generationID must be cleared.
* @throws ChangelogException
* If a problem occurs during clearing.
*/
void clearGenerationId(final DN domainDN) throws ChangelogException
{
synchronized(domainLock)
{
final String domainId = domains.get(domainDN);
final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
if (idFile != null)
{
final boolean isDeleted = idFile.delete();
if (!isDeleted)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
}
}
}
}
/**
* Reset the generationId to the default value used when there is no
* generation id.
*
* @param baseDN
* The baseDN for which the generationID must be reset.
* @throws ChangelogException
* If a problem occurs during reset.
*/
void resetGenerationId(final DN baseDN) throws ChangelogException
{
synchronized (domainLock)
{
clearGenerationId(baseDN);
final String domainId = domains.get(baseDN);
final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
ensureGenerationIdFileExists(generationIdPath);
}
}
/** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
private void readDomainsStateFile() throws ChangelogException
{
final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME);
if (domainsStateFile.exists())
{
BufferedReader reader = null;
String line = null;
try
{
reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8"));
while ((line = reader.readLine()) != null)
{
final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
final String domainId = line.substring(0, separatorPos);
final DN domainDN = DN.decode(line.substring(separatorPos+1));
domains.put(domainDN, domainId);
}
}
catch(DirectoryException e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE.get(
domainsStateFile.getPath(), line), e);
}
catch(Exception e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE.get(
domainsStateFile.getPath()), e);
}
finally {
StaticUtils.close(reader);
}
}
}
/**
* Checks that domain directories in file system are consistent with
* information from domains mapping.
*/
private void checkDomainDirectories(final File changelogPath) throws ChangelogException
{
final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
if (dnDirectories == null)
{
throw new ChangelogException(ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH.get(replicationRootPath));
}
Set