From de36fa06856d8d04652401bb24e49c3259aef154 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 10:26:42 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3443) Make the Medium Consistency Point support replicas temporarily leaving the topology
---
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 142 ++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 121 insertions(+), 21 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index e4d7283..c259985 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -38,9 +38,12 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -67,6 +70,7 @@
private ReplicationServer replicationServer;
private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private static final String GENERATION_ID_TAG = "GENID";
+ private static final String OFFLINE_TAG = "OFFLINE";
private static final String FIELD_SEPARATOR = " ";
/** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
@@ -268,6 +272,19 @@
}
result.setDomainGenerationId(baseDN, generationId);
}
+ else if (prefix.equals(OFFLINE_TAG))
+ {
+ final String[] str = stringKey.split(FIELD_SEPARATOR, 3);
+ final int serverId = toInt(str[1]);
+ final DN baseDN = DN.decode(str[2]);
+ long timestamp = ByteString.wrap(entry.getValue()).asReader().getLong();
+ if (debugEnabled())
+ {
+ debug("has read replica offline: baseDN=" + baseDN + " serverId="
+ + serverId);
+ }
+ result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId));
+ }
else
{
final String[] str = stringData.split(FIELD_SEPARATOR, 2);
@@ -275,7 +292,8 @@
final DN baseDN = DN.decode(str[1]);
if (debugEnabled())
{
- debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId);
+ debug("has read replica: baseDN=" + baseDN + " serverId="
+ + serverId);
}
result.addServerIdToDomain(serverId, baseDN);
}
@@ -416,7 +434,7 @@
// Opens the DB for the changes received from this server on this domain.
final Database replicaDB = openDatabase(replicaEntry.getKey());
- putInChangelogStateDBIfNotExist(replicaEntry);
+ putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
return replicaDB;
}
@@ -436,7 +454,7 @@
* the replica's serverId
* @return a database entry for the replica
*/
- Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
+ static Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
{
final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
return new SimpleImmutableEntry<String, String>(key, key);
@@ -452,30 +470,65 @@
* the domain's generationId
* @return a database entry for the generationId
*/
- Entry<String, String> toGenIdEntry(DN baseDN, long generationId)
+ static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId)
{
final String normDn = baseDN.toNormalizedString();
final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn;
final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
+ FIELD_SEPARATOR + normDn;
- return new SimpleImmutableEntry<String, String>(key, data);
+ return new SimpleImmutableEntry<byte[], byte[]>(toBytes(key),toBytes(data));
}
- private void putInChangelogStateDBIfNotExist(Entry<String, String> entry)
- throws RuntimeException
+ /**
+ * Converts an Entry<String, String> to an Entry<byte[], byte[]>.
+ *
+ * @param entry
+ * the entry to convert
+ * @return the converted entry
+ */
+ static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry)
{
- DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
+ return new SimpleImmutableEntry<byte[], byte[]>(
+ toBytes(entry.getKey()),
+ toBytes(entry.getValue()));
+ }
+
+ /**
+ * Return an entry to store in the changelog state database representing the
+ * time a replica went offline.
+ *
+ * @param baseDN
+ * the replica's baseDN
+ * @param offlineCSN
+ * the replica's serverId and offline timestamp
+ * @return a database entry representing the time a replica went offline
+ */
+ static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
+ {
+ final byte[] key =
+ toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
+ + FIELD_SEPARATOR + baseDN.toNormalizedString());
+ final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
+ data.append(offlineCSN.getTime());
+ return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
+ }
+
+ private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
+ throws ChangelogException, RuntimeException
+ {
+ DatabaseEntry key = new DatabaseEntry(entry.getKey());
DatabaseEntry data = new DatabaseEntry();
if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
{
Transaction txn = dbEnvironment.beginTransaction(null, null);
try
{
- data.setData(toBytes(entry.getValue()));
+ data.setData(entry.getValue());
if (debugEnabled())
{
debug("putting record in the changelogstate Db key=["
- + entry.getKey() + "] value=[" + entry.getValue() + "]");
+ + toString(entry.getKey()) + "] value=["
+ + toString(entry.getValue()) + "]");
}
changelogStateDb.put(txn, key, data);
txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
@@ -585,11 +638,11 @@
*/
public void clearServerId(DN baseDN, int serverId) throws ChangelogException
{
- deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId),
+ deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
"clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
}
- private void deleteFromChangelogStateDB(Entry<String, ?> entry,
+ private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
String methodInvocation) throws ChangelogException
{
if (debugEnabled())
@@ -599,7 +652,7 @@
try
{
- final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
+ final DatabaseEntry key = new DatabaseEntry(entry.getKey());
final DatabaseEntry data = new DatabaseEntry();
if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
{
@@ -620,18 +673,65 @@
throw dbe;
}
}
- else
+ else if (debugEnabled())
{
- if (debugEnabled())
- {
- debug(methodInvocation + " failed: key=[ " + entry.getKey()
- + "] not found");
- }
+ debug(methodInvocation + " failed: key not found");
}
}
- catch (RuntimeException dbe)
+ catch (RuntimeException e)
{
- throw new ChangelogException(dbe);
+ if (debugEnabled())
+ {
+ debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
+ }
+ throw new ChangelogException(e);
+ }
+ }
+
+ /**
+ * Add the information about an offline replica to the changelog state DB.
+ *
+ * @param baseDN
+ * the domain of the offline replica
+ * @param offlineCSN
+ * the offline replica serverId and offline timestamp
+ * @throws ChangelogException
+ * if a database problem occurred
+ */
+ public void addOfflineReplica(DN baseDN, CSN offlineCSN)
+ throws ChangelogException
+ {
+ // just overwrite any older entry as it is assumed a newly received offline
+ // CSN is newer than the previous one
+ putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
+ "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+ }
+
+ private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
+ String methodInvocation) throws ChangelogException
+ {
+ if (debugEnabled())
+ {
+ debug(methodInvocation + " starting");
+ }
+
+ try
+ {
+ final DatabaseEntry key = new DatabaseEntry(entry.getKey());
+ final DatabaseEntry data = new DatabaseEntry(entry.getValue());
+ changelogStateDb.put(null, key, data);
+ if (debugEnabled())
+ {
+ debug(methodInvocation + " succeeded");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ if (debugEnabled())
+ {
+ debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
+ }
+ throw new ChangelogException(e);
}
}
--
Gitblit v1.10.0