From 7ea8ac48d10e033ba0d6ca0ec0d66ace144062a0 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 07 Jul 2014 15:29:48 +0000
Subject: [PATCH] OPENDJ-1453 Replica offline messages should be synced with updates
---
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 72 ++++++++++++++++++++++++++++-------
1 files changed, 57 insertions(+), 15 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index cc9e286..3f5aa10 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -66,6 +66,16 @@
{
private Environment dbEnvironment;
private Database changelogStateDb;
+ /**
+ * The current changelogState. This is in-memory version of what is inside the
+ * on-disk changelogStateDB. It improves performances in case the
+ * changelogState is read often.
+ *
+ * @GuardedBy("stateLock")
+ */
+ private final ChangelogState changelogState;
+ /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState */
+ private final Object stateLock = new Object();
private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
private ReplicationServer replicationServer;
private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
@@ -101,6 +111,7 @@
* of all the servers that have been seen in the past.
*/
changelogStateDb = openDatabase("changelogstate");
+ changelogState = readOnDiskChangelogState();
}
catch (RuntimeException e)
{
@@ -222,13 +233,23 @@
}
/**
- * Read and return the list of known servers from the database.
+ * Return the current changelog state.
+ *
+ * @return the current {@link ChangelogState}
+ */
+ public ChangelogState getChangelogState()
+ {
+ return changelogState;
+ }
+
+ /**
+ * Read and return the changelog state from the database.
*
* @return the {@link ChangelogState} read from the changelogState DB
* @throws ChangelogException
* if a database problem occurs
*/
- public ChangelogState readChangelogState() throws ChangelogException
+ protected ChangelogState readOnDiskChangelogState() throws ChangelogException
{
return decodeChangelogState(readWholeState());
}
@@ -434,8 +455,13 @@
// Opens the DB for the changes received from this server on this domain.
final Database replicaDB = openDatabase(replicaEntry.getKey());
- putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
- putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
+ synchronized (stateLock)
+ {
+ putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
+ changelogState.addServerIdToDomain(serverId, baseDN);
+ putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
+ changelogState.setDomainGenerationId(baseDN, generationId);
+ }
return replicaDB;
}
catch (RuntimeException e)
@@ -638,9 +664,13 @@
*/
public void clearGenerationId(DN baseDN) throws ChangelogException
{
- final int unusedGenId = 0;
- deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
- "clearGenerationId(baseDN=" + baseDN + ")");
+ synchronized (stateLock)
+ {
+ final int unusedGenId = 0;
+ deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
+ "clearGenerationId(baseDN=" + baseDN + ")");
+ changelogState.setDomainGenerationId(baseDN, unusedGenId);
+ }
}
/**
@@ -656,8 +686,12 @@
*/
public void clearServerId(DN baseDN, int serverId) throws ChangelogException
{
- deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
- "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
+ synchronized (stateLock)
+ {
+ deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
+ "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
+ changelogState.setDomainGenerationId(baseDN, -1);
+ }
}
private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
@@ -721,10 +755,14 @@
public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
throws ChangelogException
{
- // just overwrite any older entry as it is assumed a newly received offline
- // CSN is newer than the previous one
- putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
- "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+ synchronized (stateLock)
+ {
+ // just overwrite any older entry as it is assumed a newly received offline
+ // CSN is newer than the previous one
+ putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
+ "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+ changelogState.addOfflineReplica(baseDN, offlineCSN);
+ }
}
/**
@@ -742,8 +780,12 @@
*/
public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
{
- deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
- "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+ synchronized (stateLock)
+ {
+ deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
+ "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+ changelogState.removeOfflineReplica(baseDN, serverId);
+ }
}
private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
--
Gitblit v1.10.0