From 2d735189c834108a2e5f7a795610372eb6d00aed Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 03 Jun 2014 08:45:08 +0000
Subject: [PATCH] OPENDJ-1467 : File Based Changelog must support replicas temporarily leaving the topology
---
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java | 177 +++++++++++++++++++++++++++++++++++++++++++++++++++++------
1 files changed, 159 insertions(+), 18 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index ee69dd7..1e658c7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -30,10 +30,12 @@
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.util.HashMap;
import java.util.HashSet;
@@ -77,15 +79,21 @@
* <ul>
* <li>A "generation_[id].id" file, where [id] is the generation id</li>
* <li>One directory per server id, named after "[id].server" where [id] is the
- * id of the server. Each directory contains the log files for the given server
- * id.</li>
+ * id of the server.</li>
* </ul>
- * All log files end with the ".log" suffix. Log files always include the "head.log"
- * file and optionally zero to many read-only log files named after the lowest key
- * and highest key present in the log file.
+ * Each server id directory contains the following files :
+ * <ul>
+ * <li>The "head.log" file, which is the more recent log file where records are appended.</li>
+ * <li>Zero to many read-only log files named after the lowest key
+ * and highest key present in the log file (they all end with the ".log" suffix.</li>
+ * <li>Optionally, a "offline.state" file that indicates that this particular server id
+ * of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li>
+ * </ul>
+ * See {@code Log} class for details on the log files.
+ *
* <p>
* Layout example with two domains "o=test1" and "o=test2", each having server
- * ids 22 and 33 :
+ * ids 22 and 33, with server id 33 for domain "o=test1" being offline :
*
* <pre>
* +---changelog
@@ -96,15 +104,16 @@
* | \---1.domain
* | \---generation1.id
* | \---22.server
- * | \---head.log
+ * | \---head.log [contains last records written]
* | \---33.server
- * | \---head.log
+ * | \---head.log [contains last records written]
+ * \---offline.state
* | \---2.domain
* | \---generation1.id
* | \---22.server
- * | \---head.log
+ * | \---head.log [contains last records written]
* | \---33.server
- * | \---head.log
+ * | \---head.log [contains last records written]
* </pre>
*/
class ReplicationEnvironment
@@ -120,6 +129,8 @@
private static final String DOMAINS_STATE_FILENAME = "domains.state";
+ static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
+
private static final String DOMAIN_STATE_SEPARATOR = ":";
private static final String DOMAIN_SUFFIX = ".domain";
@@ -130,6 +141,8 @@
private static final String GENERATION_ID_FILE_SUFFIX = ".id";
+ private static final String UTF8_ENCODING = "UTF-8";
+
private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter()
{
@Override
@@ -169,7 +182,6 @@
private final Map<DN, String> domains = new HashMap<DN, String>();
/** Exclusive lock to guard the domains mapping and change of state to a domain.*/
- // TODO : review the usefulness of this lock
private final Object domainLock = new Object();
/** The underlying replication server. */
@@ -358,6 +370,77 @@
}
}
+ /**
+ * Notify that the replica corresponding to provided domain and provided CSN
+ * is offline.
+ *
+ * @param domainDN
+ * the domain of the offline replica
+ * @param offlineCSN
+ * the offline replica serverId and offline timestamp
+ * @throws ChangelogException
+ * if a problem occurs
+ */
+ void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
+ {
+ synchronized (domainLock)
+ {
+ final String domainId = domains.get(domainDN);
+ final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
+ if (!serverIdPath.exists())
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get(
+ domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath()));
+ }
+ final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+ Writer writer = null;
+ try
+ {
+ // Overwrite file, only the last sent offline CSN is kept
+ writer = newFileWriter(offlineFile);
+ writer.write(offlineCSN.toString());
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get(
+ domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e);
+ }
+ finally
+ {
+ StaticUtils.close(writer);
+ }
+ }
+ }
+
+ /**
+ * Notify that the replica corresponding to provided domain and server id
+ * is online.
+ *
+ * @param domainDN
+ * the domain of the replica
+ * @param serverId
+ * the replica serverId
+ * @throws ChangelogException
+ * if a problem occurs
+ */
+ void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
+ {
+ synchronized (domainLock)
+ {
+ final String domainId = domains.get(domainDN);
+ final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
+ if (offlineFile.exists())
+ {
+ final boolean isDeleted = offlineFile.delete();
+ if (!isDeleted)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get(
+ offlineFile.getPath(), domainDN.toString(), serverId));
+ }
+ }
+ }
+ }
+
/** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
private void readDomainsStateFile() throws ChangelogException
{
@@ -368,7 +451,7 @@
String line = null;
try
{
- reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8"));
+ reader = newFileReader(domainsStateFile);
while ((line = reader.readLine()) != null)
{
final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
@@ -425,7 +508,7 @@
* Update the changelog state with the state corresponding to the provided
* domain DN.
*/
- private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState result)
+ private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state)
throws ChangelogException
{
final File domainDirectory = getDomainPath(domainEntry.getValue());
@@ -436,7 +519,7 @@
replicationRootPath, domainDirectory.getPath()));
}
final DN domainDN = domainEntry.getKey();
- result.setDomainGenerationId(domainDN, toGenerationId(generationId));
+ state.setDomainGenerationId(domainDN, toGenerationId(generationId));
final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
if (serverIds == null)
@@ -446,7 +529,43 @@
}
for (final File serverId : serverIds)
{
- result.addServerIdToDomain(toServerId(serverId.getName()), domainDN);
+ readStateForServerId(domainDN, serverId, state);
+ }
+ }
+
+ private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException
+ {
+ state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN);
+
+ final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+ if (offlineFile.exists())
+ {
+ final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN);
+ state.addOfflineReplica(domainDN, offlineCSN);
+ }
+ }
+
+ private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException
+ {
+ BufferedReader reader = null;
+ try
+ {
+ reader = newFileReader(offlineFile);
+ String line = reader.readLine();
+ if (line == null || reader.readLine() != null)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get(
+ domainDN.toString(), offlineFile.getPath()));
+ }
+ return new CSN(line);
+ }
+ catch(IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get(
+ domainDN.toString(), offlineFile.getPath()), e);
+ }
+ finally {
+ StaticUtils.close(reader);
}
}
@@ -458,13 +577,13 @@
Writer writer = null;
try
{
- writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(domainsStateFile), "UTF-8"));
+ writer = newFileWriter(domainsStateFile);
for (final Entry<DN, String> entry : domains.entrySet())
{
writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey()));
}
}
- catch (Exception e)
+ catch (IOException e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId,
domainDN.toString(), domainsStateFile.getPath()), e);
@@ -561,7 +680,17 @@
return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
}
- private File getServerIdPath(final String domainId, final int serverId)
+ /**
+ * Return the path for the provided domain id and server id.
+ * Package private to be usable in tests.
+ *
+ * @param domainId
+ * The id corresponding to a domain DN
+ * @param serverId
+ * The server id to retrieve
+ * @return the path
+ */
+ File getServerIdPath(final String domainId, final int serverId)
{
return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX);
}
@@ -673,4 +802,16 @@
throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e);
}
}
+
+ /** Returns a buffered writer on the provided file. */
+ private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException
+ {
+ return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING));
+ }
+
+ /** Returns a buffered reader on the provided file. */
+ private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException
+ {
+ return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING));
+ }
}
--
Gitblit v1.10.0