From 46e1cd9b8a361103d3591fda2ba687de8c51964c Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 22 Sep 2014 09:55:15 +0000
Subject: [PATCH] OPENDJ-1467 : File Based Changelog must support replicas temporarily leaving the topology
---
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 2
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 15 ++++++-
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 2
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 15 +++++++
opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties | 16 ++++++-
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 49 ++++++++++++++++++++++--
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 16 ++++----
7 files changed, 95 insertions(+), 20 deletions(-)
diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
index 4178833..b6b55c2 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
@@ -574,8 +574,8 @@
to file system for log file '%s'
ERR_CHANGELOG_UNABLE_TO_SEEK_260=Could not seek to position %d for reader \
on log file '%s'
-ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root directory '%s' for \
- log file
+ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY_261=Could not create root \
+ directory '%s' for log file
ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE_262=Could not decode DN \
from domain state file '%s', from line '%s'
ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE_263=Could not read domain state \
@@ -606,4 +606,14 @@
ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \
head log file from '%s' to '%s'
INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \
- size of head log file is %d bytes
\ No newline at end of file
+ size of head log file is %d bytes
+ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \
+ offline for domain %s and server id %d because the path '%s' does not exist
+ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \
+ replica information for domain %s and server id %d, using path '%s' (offline CSN is %s)
+ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \
+ state file '%s' for domain %s, it should contain exactly one line corresponding to the offline CSN
+ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \
+ replica offline state file '%s' for domain %s
+ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
+ offline state file '%s' for domain %s and server id %d
\ No newline at end of file
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 644b994..749b0e1 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -496,7 +496,7 @@
if (updateMsg instanceof ReplicaOfflineMsg)
{
final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg;
- this.domainDB.replicaOffline(baseDN, offlineMsg.getCSN());
+ this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN());
return true;
}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 011a7c4..e275020 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -225,5 +225,5 @@
* @throws ChangelogException
* If a database problem happened
*/
- void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
+ void notifyReplicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index b28a4bd..ad0f2fe 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -138,14 +138,6 @@
/** {@inheritDoc} */
@Override
- public void replicaOffline(DN baseDN, CSN offlineCSN)
- throws ChangelogException
- {
- throw new RuntimeException("Not implemented");
- }
-
- /** {@inheritDoc} */
- @Override
public void initializeDB()
{
throw new RuntimeException("Not implemented");
@@ -182,6 +174,14 @@
/** {@inheritDoc} */
@Override
+ public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
+ throws ChangelogException
+ {
+ throw new RuntimeException("Not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override
public ChangeNumberIndexDB getChangeNumberIndexDB()
{
throw new RuntimeException("Not implemented");
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 48ebc5f..ac97e20 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -174,6 +174,21 @@
}
/**
+ * Indicates if the replica corresponding to provided domain DN and server id
+ * is offline.
+ *
+ * @param domainDN
+ * base DN of the replica
+ * @param serverId
+ * server id of the replica
+ * @return {@code true} if replica is offline, {@code false} otherwise
+ */
+ public boolean isReplicaOffline(DN domainDN, int serverId)
+ {
+ return replicasOffline.getCSN(domainDN, serverId) != null;
+ }
+
+ /**
* Ensures the medium consistency point is updated by UpdateMsg.
*
* @param baseDN
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index ceca244..b39b6d5 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -827,6 +827,7 @@
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
+ notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
indexer.publishUpdateMsg(baseDN, updateMsg);
}
return pair.getSecond(); // replica DB was created
@@ -839,15 +840,25 @@
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
+ notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
indexer.publishHeartbeat(baseDN, heartbeatCSN);
}
}
+ private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
+ throws ChangelogException
+ {
+ if (indexer.isReplicaOffline(baseDN, serverId))
+ {
+ replicationEnv.notifyReplicaOnline(baseDN, serverId);
+ }
+ }
+
/** {@inheritDoc} */
@Override
- public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
+ public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
{
- replicationEnv.addOfflineReplica(baseDN, offlineCSN);
+ replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index 217dd56..33330bf 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -530,14 +530,32 @@
*/
static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
{
- final byte[] key =
- toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
- + FIELD_SEPARATOR + baseDN.toNormalizedString());
+ final byte[] key = toReplicaOfflineKey(baseDN, offlineCSN.getServerId());
final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
data.append(offlineCSN.getTime());
return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
}
+ /**
+ * Return the key for a replica offline entry in the changelog state database.
+ *
+ * @param baseDN
+ * the replica's baseDN
+ * @param serverId
+ * the replica's serverId
+ * @return the key used in the database to store offline time of the replica
+ */
+ private static byte[] toReplicaOfflineKey(DN baseDN, int serverId)
+ {
+ return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedString());
+ }
+
+ /** Returns an entry with the provided key and a null value. */
+ private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key)
+ {
+ return new SimpleImmutableEntry<byte[], byte[]>(key, null);
+ }
+
private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
throws ChangelogException, RuntimeException
{
@@ -722,7 +740,9 @@
}
/**
- * Add the information about an offline replica to the changelog state DB.
+ * Notify that replica is offline.
+ * <p>
+ * This information is stored in the changelog state DB.
*
* @param baseDN
* the domain of the offline replica
@@ -731,7 +751,7 @@
* @throws ChangelogException
* if a database problem occurred
*/
- public void addOfflineReplica(DN baseDN, CSN offlineCSN)
+ public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
throws ChangelogException
{
synchronized (stateLock)
@@ -744,6 +764,25 @@
}
}
+ /**
+ * Notify that replica is online.
+ * <p>
+ * Update the changelog state DB if necessary (ie, replica was known to be
+ * offline).
+ *
+ * @param baseDN
+ * the domain of replica
+ * @param serverId
+ * the serverId of replica
+ * @throws ChangelogException
+ * if a database problem occurred
+ */
+ public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
+ {
+ deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
+ "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+ }
+
private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
String methodInvocation) throws ChangelogException
{
--
Gitblit v1.10.0