From 2d735189c834108a2e5f7a795610372eb6d00aed Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 03 Jun 2014 08:45:08 +0000
Subject: [PATCH] OPENDJ-1467 : File Based Changelog must support replicas temporarily leaving the topology
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 17 +
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 49 ++++
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java | 177 +++++++++++++++-
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 15 +
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java | 27 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java | 256 +++++++++++++++++++++----
opends/src/messages/messages/replication.properties | 16 +
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 2
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 17 +
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 6
10 files changed, 494 insertions(+), 88 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index e030d37..1ec6984 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -572,8 +572,8 @@
to file system for log file '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_SEEK_260=Could not seek to position %d for reader \
on log file '%s'
-SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root directory '%s' for \
- log file
+SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root \
+ directory '%s' for log file
SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE_262=Could not decode DN \
from domain state file '%s', from line '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE_263=Could not read domain state \
@@ -604,4 +604,14 @@
SEVERE_ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \
head log file from '%s' to '%s'
INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \
- size of head log file is %d bytes
\ No newline at end of file
+ size of head log file is %d bytes
+SEVERE_ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \
+ offline for domain %s and server id %d because the path '%s' does not exist
+SEVERE_ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \
+ replica information for domain %s and server id %d, using path '%s' (offline CSN is %s)
+SEVERE_ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \
+ state file '%s' for domain %s, it should contain exactly one line corresponding to the offline CSN
+SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \
+ replica offline state file '%s' for domain %s
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
+ offline state file '%s' for domain %s and server id %d
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 3b9eab7..aa70fb2 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2507,7 +2507,7 @@
{
if (msg.isReplicaOfflineMsg())
{
- domainDB.replicaOffline(baseDN, msg.getCSN());
+ domainDB.notifyReplicaOffline(baseDN, msg.getCSN());
}
else
{
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index f2ba666..0c8df53 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -166,8 +166,10 @@
* @param heartbeatCSN
* The CSN heartbeat sent by this replica (contains the serverId and
* timestamp of the heartbeat)
+ * @throws ChangelogException
+ * If a database problem happened
*/
- void replicaHeartbeat(DN baseDN, CSN heartbeatCSN);
+ void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException;
/**
* Let the DB know this replica is going down.
@@ -186,5 +188,5 @@
* @throws ChangelogException
* If a database problem happened
*/
- void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
+ void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 36e8a7a..db0c9f3 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -672,6 +672,7 @@
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
+ notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
indexer.publishUpdateMsg(baseDN, updateMsg);
}
return wasCreated;
@@ -679,25 +680,35 @@
/** {@inheritDoc} */
@Override
- public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN)
+ public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
{
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
+ notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
indexer.publishHeartbeat(baseDN, heartbeatCSN);
}
}
+ private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
+ throws ChangelogException
+ {
+ if (indexer.isReplicaOffline(baseDN, serverId))
+ {
+ replicationEnv.notifyReplicaOnline(baseDN, serverId);
+ }
+ }
+
/** {@inheritDoc} */
@Override
- public void replicaOffline(final DN baseDN, final CSN offlineCSN)
+ public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
{
+ replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
indexer.replicaOffline(baseDN, offlineCSN);
}
- // TODO save this state in the changelogStateDB?
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 200c084..350b70e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -158,7 +158,7 @@
/**
* The last key appended to the log. In order to keep the ordering of the keys
* in the log, any attempt to append a record with a key lower or equal to
- * this key will silently fail.
+ * this is rejected (no error but an event is logged).
*/
private K lastAppendedKey;
@@ -342,6 +342,10 @@
/**
* Add the provided record at the end of this log.
* <p>
+ * The record must have a key strictly higher than the key
+ * of the last record added. If it is not the case, the record is not
+ * appended and the method returns immediately.
+ * <p>
* In order to ensure that record is written out of buffers and persisted
* to file system, it is necessary to explicitely call the
* {@code syncToFileSystem()} method.
@@ -349,7 +353,7 @@
* @param record
* The record to add.
* @throws ChangelogException
- * If the record can't be added to the log.
+ * If an error occurs while adding the record to the log.
*/
public void append(final Record<K, V> record) throws ChangelogException
{
@@ -766,7 +770,7 @@
catch (IOException e)
{
throw new ChangelogException(
- ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(HEAD_LOG_FILE_NAME, rotatedLogFile.getPath()), e);
+ ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e);
}
}
@@ -842,8 +846,8 @@
private void openHeadLogFile() throws ChangelogException
{
final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser);
- Record<K,V> newestRecord = head.getNewestRecord();
- lastAppendedKey = newestRecord == null ? null : newestRecord.getKey();
+ final Record<K,V> newestRecord = head.getNewestRecord();
+ lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
logFiles.put(recordParser.getMaxKey(), head);
}
@@ -897,7 +901,7 @@
/**
* Represents a cursor than can be repositioned on a given key.
*/
- static interface RepositionableCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
+ static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>>
{
/**
* Position the cursor to the record corresponding to the provided key or to
@@ -1011,9 +1015,9 @@
if (key != null)
{
boolean isFound = currentCursor.positionTo(key, findNearest);
- if (isFound && getRecord() == null)
+ if (isFound && getRecord() == null && !log.isHeadLogFile(currentLogFile))
{
- // The key to position to may be in the next file, force the switch
+ // The key to position is probably in the next file, force the switch
isFound = next();
}
return isFound;
@@ -1047,6 +1051,13 @@
currentLogFile = logFile;
currentCursor = currentLogFile.getCursor();
}
+
+ /** {@inheritDoc} */
+ public String toString()
+ {
+ return String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
+ log.logPath, currentLogFile.getFile().getName(), currentCursor);
+ }
}
/** An empty cursor, that always return null records and false to {@code next()} method. */
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index ee69dd7..1e658c7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -30,10 +30,12 @@
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.util.HashMap;
import java.util.HashSet;
@@ -77,15 +79,21 @@
* <ul>
* <li>A "generation_[id].id" file, where [id] is the generation id</li>
* <li>One directory per server id, named after "[id].server" where [id] is the
- * id of the server. Each directory contains the log files for the given server
- * id.</li>
+ * id of the server.</li>
* </ul>
- * All log files end with the ".log" suffix. Log files always include the "head.log"
- * file and optionally zero to many read-only log files named after the lowest key
- * and highest key present in the log file.
+ * Each server id directory contains the following files :
+ * <ul>
+ * <li>The "head.log" file, which is the more recent log file where records are appended.</li>
+ * <li>Zero to many read-only log files named after the lowest key
+ * and highest key present in the log file (they all end with the ".log" suffix.</li>
+ * <li>Optionally, a "offline.state" file that indicates that this particular server id
+ * of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li>
+ * </ul>
+ * See {@code Log} class for details on the log files.
+ *
* <p>
* Layout example with two domains "o=test1" and "o=test2", each having server
- * ids 22 and 33 :
+ * ids 22 and 33, with server id 33 for domain "o=test1" being offline :
*
* <pre>
* +---changelog
@@ -96,15 +104,16 @@
* | \---1.domain
* | \---generation1.id
* | \---22.server
- * | \---head.log
+ * | \---head.log [contains last records written]
* | \---33.server
- * | \---head.log
+ * | \---head.log [contains last records written]
+ * \---offline.state
* | \---2.domain
* | \---generation1.id
* | \---22.server
- * | \---head.log
+ * | \---head.log [contains last records written]
* | \---33.server
- * | \---head.log
+ * | \---head.log [contains last records written]
* </pre>
*/
class ReplicationEnvironment
@@ -120,6 +129,8 @@
private static final String DOMAINS_STATE_FILENAME = "domains.state";
+ static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
+
private static final String DOMAIN_STATE_SEPARATOR = ":";
private static final String DOMAIN_SUFFIX = ".domain";
@@ -130,6 +141,8 @@
private static final String GENERATION_ID_FILE_SUFFIX = ".id";
+ private static final String UTF8_ENCODING = "UTF-8";
+
private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter()
{
@Override
@@ -169,7 +182,6 @@
private final Map<DN, String> domains = new HashMap<DN, String>();
/** Exclusive lock to guard the domains mapping and change of state to a domain.*/
- // TODO : review the usefulness of this lock
private final Object domainLock = new Object();
/** The underlying replication server. */
@@ -358,6 +370,77 @@
}
}
+ /**
+ * Notify that the replica corresponding to provided domain and provided CSN
+ * is offline.
+ *
+ * @param domainDN
+ * the domain of the offline replica
+ * @param offlineCSN
+ * the offline replica serverId and offline timestamp
+ * @throws ChangelogException
+ * if a problem occurs
+ */
+ void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
+ {
+ synchronized (domainLock)
+ {
+ final String domainId = domains.get(domainDN);
+ final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
+ if (!serverIdPath.exists())
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get(
+ domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath()));
+ }
+ final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+ Writer writer = null;
+ try
+ {
+ // Overwrite file, only the last sent offline CSN is kept
+ writer = newFileWriter(offlineFile);
+ writer.write(offlineCSN.toString());
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get(
+ domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e);
+ }
+ finally
+ {
+ StaticUtils.close(writer);
+ }
+ }
+ }
+
+ /**
+ * Notify that the replica corresponding to provided domain and server id
+ * is online.
+ *
+ * @param domainDN
+ * the domain of the replica
+ * @param serverId
+ * the replica serverId
+ * @throws ChangelogException
+ * if a problem occurs
+ */
+ void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
+ {
+ synchronized (domainLock)
+ {
+ final String domainId = domains.get(domainDN);
+ final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
+ if (offlineFile.exists())
+ {
+ final boolean isDeleted = offlineFile.delete();
+ if (!isDeleted)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get(
+ offlineFile.getPath(), domainDN.toString(), serverId));
+ }
+ }
+ }
+ }
+
/** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
private void readDomainsStateFile() throws ChangelogException
{
@@ -368,7 +451,7 @@
String line = null;
try
{
- reader = new BufferedReader(new InputStreamReader(new FileInputStream(domainsStateFile), "UTF-8"));
+ reader = newFileReader(domainsStateFile);
while ((line = reader.readLine()) != null)
{
final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
@@ -425,7 +508,7 @@
* Update the changelog state with the state corresponding to the provided
* domain DN.
*/
- private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState result)
+ private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state)
throws ChangelogException
{
final File domainDirectory = getDomainPath(domainEntry.getValue());
@@ -436,7 +519,7 @@
replicationRootPath, domainDirectory.getPath()));
}
final DN domainDN = domainEntry.getKey();
- result.setDomainGenerationId(domainDN, toGenerationId(generationId));
+ state.setDomainGenerationId(domainDN, toGenerationId(generationId));
final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
if (serverIds == null)
@@ -446,7 +529,43 @@
}
for (final File serverId : serverIds)
{
- result.addServerIdToDomain(toServerId(serverId.getName()), domainDN);
+ readStateForServerId(domainDN, serverId, state);
+ }
+ }
+
+ private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException
+ {
+ state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN);
+
+ final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+ if (offlineFile.exists())
+ {
+ final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN);
+ state.addOfflineReplica(domainDN, offlineCSN);
+ }
+ }
+
+ private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException
+ {
+ BufferedReader reader = null;
+ try
+ {
+ reader = newFileReader(offlineFile);
+ String line = reader.readLine();
+ if (line == null || reader.readLine() != null)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get(
+ domainDN.toString(), offlineFile.getPath()));
+ }
+ return new CSN(line);
+ }
+ catch(IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get(
+ domainDN.toString(), offlineFile.getPath()), e);
+ }
+ finally {
+ StaticUtils.close(reader);
}
}
@@ -458,13 +577,13 @@
Writer writer = null;
try
{
- writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(domainsStateFile), "UTF-8"));
+ writer = newFileWriter(domainsStateFile);
for (final Entry<DN, String> entry : domains.entrySet())
{
writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey()));
}
}
- catch (Exception e)
+ catch (IOException e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId,
domainDN.toString(), domainsStateFile.getPath()), e);
@@ -561,7 +680,17 @@
return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
}
- private File getServerIdPath(final String domainId, final int serverId)
+ /**
+ * Return the path for the provided domain id and server id.
+ * Package private to be usable in tests.
+ *
+ * @param domainId
+ * The id corresponding to a domain DN
+ * @param serverId
+ * The server id to retrieve
+ * @return the path
+ */
+ File getServerIdPath(final String domainId, final int serverId)
{
return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX);
}
@@ -673,4 +802,16 @@
throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e);
}
}
+
+ /** Returns a buffered writer on the provided file. */
+ private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException
+ {
+ return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING));
+ }
+
+ /** Returns a buffered reader on the provided file. */
+ private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException
+ {
+ return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING));
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 628ca99..2419dc4 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -187,6 +187,21 @@
}
/**
+ * Indicates if the replica corresponding to provided domain DN and server id
+ * is offline.
+ *
+ * @param domainDN
+ * base DN of the replica
+ * @param serverId
+ * server id of the replica
+ * @return {@code true} if replica is offline, {@code false} otherwise
+ */
+ public boolean isReplicaOffline(DN domainDN, int serverId)
+ {
+ return replicasOffline.getCSN(domainDN, serverId) != null;
+ }
+
+ /**
* Ensures the medium consistency point is updated by UpdateMsg.
*
* @param baseDN
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index a9f69ba..e1dbcd3 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -756,6 +756,7 @@
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
+ notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
indexer.publishUpdateMsg(baseDN, updateMsg);
}
return wasCreated;
@@ -763,21 +764,31 @@
/** {@inheritDoc} */
@Override
- public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
+ public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN) throws ChangelogException
{
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
+ notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
indexer.publishHeartbeat(baseDN, heartbeatCSN);
}
}
+ private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
+ throws ChangelogException
+ {
+ if (indexer.isReplicaOffline(baseDN, serverId))
+ {
+ dbEnv.notifyReplicaOnline(baseDN, serverId);
+ }
+ }
+
/** {@inheritDoc} */
@Override
- public void replicaOffline(DN baseDN, CSN offlineCSN)
+ public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
throws ChangelogException
{
- dbEnv.addOfflineReplica(baseDN, offlineCSN);
+ dbEnv.notifyReplicaOffline(baseDN, offlineCSN);
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index c259985..df5d0c6 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -505,14 +505,32 @@
*/
static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
{
- final byte[] key =
- toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
- + FIELD_SEPARATOR + baseDN.toNormalizedString());
+ final byte[] key = toReplicaOfflineKey(baseDN, offlineCSN.getServerId());
final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
data.append(offlineCSN.getTime());
return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
}
+ /**
+ * Return the key for a replica offline entry in the changelog state database.
+ *
+ * @param baseDN
+ * the replica's baseDN
+ * @param serverId
+ * the replica's serverId
+ * @return the key used in the database to store offline time of the replica
+ */
+ private static byte[] toReplicaOfflineKey(DN baseDN, int serverId)
+ {
+ return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedString());
+ }
+
+ /** Returns an entry with the provided key and a null value. */
+ private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key)
+ {
+ return new SimpleImmutableEntry<byte[], byte[]>(key, null);
+ }
+
private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
throws ChangelogException, RuntimeException
{
@@ -689,7 +707,9 @@
}
/**
- * Add the information about an offline replica to the changelog state DB.
+ * Notify that replica is offline.
+ * <p>
+ * This information is stored in the changelog state DB.
*
* @param baseDN
* the domain of the offline replica
@@ -698,7 +718,7 @@
* @throws ChangelogException
* if a database problem occurred
*/
- public void addOfflineReplica(DN baseDN, CSN offlineCSN)
+ public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
throws ChangelogException
{
// just overwrite any older entry as it is assumed a newly received offline
@@ -707,6 +727,25 @@
"replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
}
+ /**
+ * Notify that replica is online.
+ * <p>
+ * Update the changelog state DB if necessary (ie, replica was known to be
+ * offline).
+ *
+ * @param baseDN
+ * the domain of replica
+ * @param serverId
+ * the serverId of replica
+ * @throws ChangelogException
+ * if a database problem occurred
+ */
+ public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
+ {
+ deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
+ "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+ }
+
private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
String methodInvocation) throws ChangelogException
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
index 8f3e173..de8c1f5 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -26,8 +26,8 @@
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
-import java.io.Closeable;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,12 +37,14 @@
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -50,6 +52,9 @@
@SuppressWarnings("javadoc")
public class ReplicationEnvironmentTest extends DirectoryServerTestCase
{
+ private static final int SERVER_ID_1 = 1;
+ private static final int SERVER_ID_2 = 2;
+
private static final String DN1_AS_STRING = "cn=test1,dc=company.com";
private static final String DN2_AS_STRING = "cn=te::st2,dc=company.com";
private static final String DN3_AS_STRING = "cn=test3,dc=company.com";
@@ -74,70 +79,231 @@
}
@Test
- public void testCreateThenReadChangelogStateWithSingleDN() throws Exception
+ public void testReadChangelogStateWithSingleDN() throws Exception
{
- final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
- final DN domainDN = DN.decode(DN1_AS_STRING);
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.decode(DN1_AS_STRING);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+ replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
- Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
- Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
- Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
- StaticUtils.close(cnDB, replicaDB, replicaDB2);
+ ChangelogState state = environment.readChangelogState();
- ChangelogState state = environment.readChangelogState();
-
- assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
- assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(1, 2);
- assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+ assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
+ assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2);
+ assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB, replicaDB2);
+ }
}
@Test
- public void testCreateThenReadChangelogStateWithMultipleDN() throws Exception
+ public void testReadChangelogStateWithMultipleDN() throws Exception
{
- File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
- List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING));
-
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
- Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>();
- for (int i = 0; i <= 2 ; i++)
+ try
{
- for (int j = 1; j <= 10; j++)
+ File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING));
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ for (int i = 0; i <= 2 ; i++)
{
- replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1));
+ for (int j = 1; j <= 10; j++)
+ {
+ // 3 domains, 10 server id each, generation id is different for each domain
+ replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1));
+ }
}
+
+ ChangelogState state = environment.readChangelogState();
+
+ assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
+ for (int i = 0; i <= 2 ; i++)
+ {
+ assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ }
+ assertThat(state.getDomainToGenerationId()).containsOnly(
+ MapEntry.entry(domainDNs.get(0), 1L),
+ MapEntry.entry(domainDNs.get(1), 2L),
+ MapEntry.entry(domainDNs.get(2), 3L));
}
- StaticUtils.close(cnDB);
- StaticUtils.close(replicaDBs.toArray(new Closeable[] {}));
-
- ChangelogState state = environment.readChangelogState();
-
- assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
- for (int i = 0; i <= 2 ; i++)
+ finally
{
- assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ StaticUtils.close(cnDB);
+ StaticUtils.close(replicaDBs);
}
- assertThat(state.getDomainToGenerationId()).containsOnly(
- MapEntry.entry(domainDNs.get(0), 1L),
- MapEntry.entry(domainDNs.get(1), 2L),
- MapEntry.entry(domainDNs.get(2), 3L));
+ }
+
+ @Test
+ public void testReadChangelogStateWithReplicaOffline() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.decode(DN1_AS_STRING);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+ // put server id 1 offline
+ CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
+ environment.notifyReplicaOffline(domainDN, offlineCSN);
+
+ ChangelogState state = environment.readChangelogState();
+
+ assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB);
+ }
+ }
+
+ @Test(expectedExceptions=ChangelogException.class)
+ public void testReadChangelogStateWithReplicaOfflineStateFileCorrupted() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.decode(DN1_AS_STRING);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+ File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME);
+ offlineStateFile.createNewFile();
+
+ environment.readChangelogState();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB);
+ }
+ }
+
+ @Test
+ public void testReadChangelogStateWithReplicaOfflineSentTwice() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.decode(DN1_AS_STRING);
+
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+ // put server id 1 offline twice
+ CSNGenerator csnGenerator = new CSNGenerator(SERVER_ID_1, 100);
+ environment.notifyReplicaOffline(domainDN, csnGenerator.newCSN());
+ CSN lastOfflineCSN = csnGenerator.newCSN();
+ environment.notifyReplicaOffline(domainDN, lastOfflineCSN);
+
+ ChangelogState state = environment.readChangelogState();
+
+ assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB);
+ }
+ }
+
+ @Test
+ public void testReadChangelogStateWithReplicaOfflineThenReplicaOnline() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.decode(DN1_AS_STRING);
+
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+ // put server id 1 offline
+ environment.notifyReplicaOffline(domainDN, new CSN(TimeThread.getTime(), 0, SERVER_ID_1));
+ // put server id 1 online again
+ environment.notifyReplicaOnline(domainDN, SERVER_ID_1);
+
+ ChangelogState state = environment.readChangelogState();
+
+ assertThat(state.getOfflineReplicas()).isEmpty();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB);
+ }
+ }
+
+ @Test
+ public void testCreateThenReadChangelogStateWithReplicaOffline() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.decode(DN1_AS_STRING);
+
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+ CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
+ environment.notifyReplicaOffline(domainDN, offlineCSN);
+
+ ChangelogState state = environment.readChangelogState();
+
+ assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
+ assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1);
+ assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+ assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB);
+ }
}
@Test(expectedExceptions=ChangelogException.class)
public void testMissingDomainDirectory() throws Exception
{
- File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
- DN domainDN = DN.decode(DN1_AS_STRING);
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
- Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
- Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
- StaticUtils.close(replicaDB, replicaDB2);
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
+ try
+ {
+ File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ DN domainDN = DN.decode(DN1_AS_STRING);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+ replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
- // delete the domain directory created for the 2 replica DBs to break the
- // consistency with domain state file
- StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
+ // delete the domain directory created for the 2 replica DBs to break the
+ // consistency with domain state file
+ StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
- environment.readChangelogState();
+ environment.readChangelogState();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB, replicaDB2);
+ }
}
}
--
Gitblit v1.10.0