From 7ea8ac48d10e033ba0d6ca0ec0d66ace144062a0 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 07 Jul 2014 15:29:48 +0000
Subject: [PATCH] OPENDJ-1453 Replica offline messages should be synced with updates
---
opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java | 100 +++++++++++++++++++++++++++++++++-----------------
1 files changed, 66 insertions(+), 34 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index 65cd39c..c04fc94 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -173,15 +173,33 @@
/** Root path where the replication log is stored. */
private final String replicationRootPath;
+ /**
+ * The current changelogState. This is in-memory version of what is inside the
+ * on-disk changelogStateDB. It improves performances in case the
+ * changelogState is read often.
+ *
+ * @GuardedBy("domainsLock")
+ */
+ private final ChangelogState changelogState;
/** The list of logs that are in use. */
private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
- /** Maps each domain DN to a domain id that is used to name directory in file system. */
+ /**
+ * Maps each domain DN to a domain id that is used to name directory in file system.
+ *
+ * @GuardedBy("domainsLock")
+ */
private final Map<DN, String> domains = new HashMap<DN, String>();
- /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
- private final Object domainLock = new Object();
+ /**
+ * Exclusive lock to synchronize:
+ * <ul>
+ * <li>the domains mapping</li>
+ * <li>changes to the in-memory changelogState</li>
+ * <li>changes to the on-disk state of a domain</li>
+ */
+ private final Object domainsLock = new Object();
/** The underlying replication server. */
private final ReplicationServer replicationServer;
@@ -203,21 +221,21 @@
{
this.replicationRootPath = rootPath;
this.replicationServer = replicationServer;
+ this.changelogState = readOnDiskChangelogState();
}
/**
- * Returns the state of the replication changelog, which includes the list of
- * known servers and the generation id.
+ * Returns the state of the replication changelog.
*
- * @return the {@link ChangelogState}
+ * @return the {@link ChangelogState} read from the changelogState DB
* @throws ChangelogException
- * if a problem occurs while retrieving the state.
+ * if a database problem occurs
*/
- ChangelogState readChangelogState() throws ChangelogException
+ ChangelogState readOnDiskChangelogState() throws ChangelogException
{
final ChangelogState state = new ChangelogState();
final File changelogPath = new File(replicationRootPath);
- synchronized (domainLock)
+ synchronized (domainsLock)
{
readDomainsStateFile();
checkDomainDirectories(changelogPath);
@@ -230,6 +248,16 @@
}
/**
+ * Returns the current state of the replication changelog.
+ *
+ * @return the current {@link ChangelogState}
+ */
+ ChangelogState getChangelogState()
+ {
+ return changelogState;
+ }
+
+ /**
* Finds or creates the log used to store changes from the replication server
* with the given serverId and the given baseDN.
*
@@ -256,7 +284,7 @@
ensureRootDirectoryExists();
String domainId = null;
- synchronized (domainLock)
+ synchronized (domainsLock)
{
domainId = domains.get(domainDN);
if (domainId == null)
@@ -266,9 +294,11 @@
final File serverIdPath = getServerIdPath(domainId, serverId);
ensureServerIdDirectoryExists(serverIdPath);
+ changelogState.addServerIdToDomain(serverId, domainDN);
final File generationIdPath = getGenerationIdPath(domainId, generationId);
ensureGenerationIdFileExists(generationIdPath);
+ changelogState.setDomainGenerationId(domainDN, generationId);
return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
}
@@ -333,12 +363,12 @@
*/
void clearGenerationId(final DN domainDN) throws ChangelogException
{
- synchronized (domainLock)
+ synchronized (domainsLock)
{
final String domainId = domains.get(domainDN);
if (domainId == null)
{
- return; // unknow domain => no-op
+ return; // unknown domain => no-op
}
final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
if (idFile != null)
@@ -350,6 +380,7 @@
ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
}
}
+ changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID);
}
}
@@ -364,16 +395,17 @@
*/
void resetGenerationId(final DN baseDN) throws ChangelogException
{
- synchronized (domainLock)
+ synchronized (domainsLock)
{
clearGenerationId(baseDN);
final String domainId = domains.get(baseDN);
if (domainId == null)
{
- return; // unknow domain => no-op
+ return; // unknown domain => no-op
}
final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
ensureGenerationIdFileExists(generationIdPath);
+ changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID);
}
}
@@ -390,12 +422,12 @@
*/
void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
{
- synchronized (domainLock)
+ synchronized (domainsLock)
{
final String domainId = domains.get(domainDN);
if (domainId == null)
{
- return; // unknow domain => no-op
+ return; // unknown domain => no-op
}
final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
if (!serverIdPath.exists())
@@ -409,6 +441,7 @@
// Overwrite file, only the last sent offline CSN is kept
writer = newFileWriter(offlineFile);
writer.write(offlineCSN.toString());
+ changelogState.addOfflineReplica(domainDN, offlineCSN);
}
catch (IOException e)
{
@@ -435,12 +468,12 @@
*/
void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
{
- synchronized (domainLock)
+ synchronized (domainsLock)
{
final String domainId = domains.get(domainDN);
if (domainId == null)
{
- return; // unknow domain => no-op
+ return; // unknown domain => no-op
}
final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
if (offlineFile.exists())
@@ -452,6 +485,7 @@
offlineFile.getPath(), domainDN.toString(), serverId));
}
}
+ changelogState.removeOfflineReplica(domainDN, serverId);
}
}
@@ -497,24 +531,22 @@
private void checkDomainDirectories(final File changelogPath) throws ChangelogException
{
final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
- if (dnDirectories == null)
+ if (dnDirectories != null)
{
- throw new ChangelogException(ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH.get(replicationRootPath));
- }
+ final Set<String> domainIdsFromFileSystem = new HashSet<String>();
+ for (final File dnDir : dnDirectories)
+ {
+ final String fileName = dnDir.getName();
+ final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
+ domainIdsFromFileSystem.add(domainId);
+ }
- Set<String> domainIdsFromFileSystem = new HashSet<String>();
- for (final File dnDir : dnDirectories)
- {
- final String fileName = dnDir.getName();
- final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
- domainIdsFromFileSystem.add(domainId);
- }
-
- Set<String> expectedDomainIds = new HashSet<String>(domains.values());
- if (!domainIdsFromFileSystem.equals(expectedDomainIds))
- {
- throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
- domainIdsFromFileSystem.toString()));
+ final Set<String> expectedDomainIds = new HashSet<String>(domains.values());
+ if (!domainIdsFromFileSystem.equals(expectedDomainIds))
+ {
+ throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
+ domainIdsFromFileSystem.toString()));
+ }
}
}
--
Gitblit v1.10.0