From 6fde75c6138d9f81009935d8d782209d8426e4de Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 29 Apr 2014 12:06:03 +0000
Subject: [PATCH] Added unit tests for encoding/decoding changelog state DB entries.
---
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 341 ++++++++++++++++++++++++++++++++++++--------------------
1 files changed, 220 insertions(+), 121 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index fb11b44..ba63c39 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -28,7 +28,9 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
-import java.util.List;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,58 +89,7 @@
try
{
- EnvironmentConfig envConfig = new EnvironmentConfig();
-
- /*
- * Create the DB Environment that will be used for all the
- * ReplicationServer activities related to the db
- */
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setConfigParam(STATS_COLLECT, "false");
- envConfig.setConfigParam(CLEANER_THREADS, "2");
- envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
- /*
- * Tests have shown that since the parsing of the Replication log is
- * always done sequentially, it is not necessary to use a large DB cache.
- */
- if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
- {
- /*
- * If the JVM is reasonably large then we can safely default to bigger
- * read buffers. This will result in more scalable checkpointer and
- * cleaner performance.
- */
- envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
- envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
- envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
-
- /*
- * The cache size must be bigger in order to accommodate the larger
- * buffers - see OPENDJ-943.
- */
- envConfig.setConfigParam(MAX_MEMORY, mb(16));
- }
- else
- {
- /*
- * Use 5M so that the replication can be used with 64M total for the
- * JVM.
- */
- envConfig.setConfigParam(MAX_MEMORY, mb(5));
- }
-
- // Since records are always added at the end of the Replication log and
- // deleted at the beginning of the Replication log, this should never
- // cause any deadlock.
- envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
- envConfig.setLockTimeout(0, TimeUnit.SECONDS);
-
- // Since replication provides durability, we can reduce the DB durability
- // level so that we are immune to application / JVM crashes.
- envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
-
- dbEnvironment = new Environment(new File(path), envConfig);
+ dbEnvironment = openJEEnvironment(path);
/*
* One database is created to store the update from each LDAP server in
@@ -153,6 +104,71 @@
}
}
+ /**
+ * Open a JE environment.
+ * <p>
+ * protected so it can be overridden by tests.
+ *
+ * @param path
+ * the path to the JE environment in the filesystem
+ * @return the opened JE environment
+ */
+ protected Environment openJEEnvironment(String path)
+ {
+ final EnvironmentConfig envConfig = new EnvironmentConfig();
+
+ /*
+ * Create the DB Environment that will be used for all the
+ * ReplicationServer activities related to the db
+ */
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setConfigParam(STATS_COLLECT, "false");
+ envConfig.setConfigParam(CLEANER_THREADS, "2");
+ envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
+ /*
+ * Tests have shown that since the parsing of the Replication log is
+ * always done sequentially, it is not necessary to use a large DB cache.
+ */
+ if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
+ {
+ /*
+ * If the JVM is reasonably large then we can safely default to bigger
+ * read buffers. This will result in more scalable checkpointer and
+ * cleaner performance.
+ */
+ envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
+ envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
+ envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
+
+ /*
+ * The cache size must be bigger in order to accommodate the larger
+ * buffers - see OPENDJ-943.
+ */
+ envConfig.setConfigParam(MAX_MEMORY, mb(16));
+ }
+ else
+ {
+ /*
+ * Use 5M so that the replication can be used with 64M total for the
+ * JVM.
+ */
+ envConfig.setConfigParam(MAX_MEMORY, mb(5));
+ }
+
+ // Since records are always added at the end of the Replication log and
+ // deleted at the beginning of the Replication log, this should never
+ // cause any deadlock.
+ envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
+ envConfig.setLockTimeout(0, TimeUnit.SECONDS);
+
+ // Since replication provides durability, we can reduce the DB durability
+ // level so that we are immune to application / JVM crashes.
+ envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
+
+ return new Environment(new File(path), envConfig);
+ }
+
private String kb(int sizeInKb)
{
return String.valueOf(sizeInKb * 1024);
@@ -163,8 +179,21 @@
return String.valueOf(sizeInMb * 1024 * 1024);
}
- private Database openDatabase(String databaseName) throws ChangelogException,
- RuntimeException
+ /**
+ * Open a JE database.
+ * <p>
+ * protected so it can be overridden by tests.
+ *
+ * @param databaseName
+ * the databaseName to open
+ * @return the opened JE database
+ * @throws ChangelogException
+ * if a problem happened opening the database
+ * @throws RuntimeException
+ * if a problem happened with the JE database
+ */
+ protected Database openDatabase(String databaseName)
+ throws ChangelogException, RuntimeException
{
if (isShuttingDown.get())
{
@@ -197,59 +226,90 @@
*/
public ChangelogState readChangelogState() throws ChangelogException
{
+ return decodeChangelogState(readWholeState());
+ }
+
+ /**
+ * Decode the whole changelog state DB.
+ *
+ * @param wholeState
+ * the whole changelog state DB as a Map.
+ * The Map is only used as a convenient collection of key => data objects
+ * @return the decoded changelog state
+ * @throws ChangelogException
+ * if a problem occurred while decoding
+ */
+ ChangelogState decodeChangelogState(Map<byte[], byte[]> wholeState)
+ throws ChangelogException
+ {
+ try
+ {
+ final ChangelogState result = new ChangelogState();
+ for (Entry<byte[], byte[]> entry : wholeState.entrySet())
+ {
+ final String stringKey = toString(entry.getKey());
+ final String stringData = toString(entry.getValue());
+
+ if (debugEnabled())
+ {
+ debug("read (key, value)=(" + stringKey + ", " + stringData + ")");
+ }
+
+ final String[] str = stringData.split(FIELD_SEPARATOR, 3);
+ if (str[0].equals(GENERATION_ID_TAG))
+ {
+ final long generationId = toLong(str[1]);
+ final DN baseDN = DN.decode(str[2]);
+ if (debugEnabled())
+ {
+ debug("has read generationId: baseDN=" + baseDN + " generationId="
+ + generationId);
+ }
+ result.setDomainGenerationId(baseDN, generationId);
+ }
+ else
+ {
+ final int serverId = toInt(str[0]);
+ final DN baseDN = DN.decode(str[1]);
+ if (debugEnabled())
+ {
+ debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId);
+ }
+ result.addServerIdToDomain(serverId, baseDN);
+ }
+ }
+ return result;
+ }
+ catch (DirectoryException e)
+ {
+ throw new ChangelogException(e.getMessageObject(), e);
+ }
+ }
+
+ private Map<byte[], byte[]> readWholeState() throws ChangelogException
+ {
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
Cursor cursor = changelogStateDb.openCursor(null, null);
try
{
- final ChangelogState result = new ChangelogState();
+ final Map<byte[], byte[]> results = new LinkedHashMap<byte[], byte[]>();
OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
while (status == OperationStatus.SUCCESS)
{
- final String stringData = toString(data.getData());
-
- if (debugEnabled())
- debug("read (" + GENERATION_ID_TAG + " generationId baseDN) OR "
- + "(serverId baseDN): " + stringData);
-
- final String[] str = stringData.split(FIELD_SEPARATOR, 3);
- if (str[0].equals(GENERATION_ID_TAG))
- {
- long generationId = toLong(str[1]);
- DN baseDN = DN.decode(str[2]);
-
- if (debugEnabled())
- debug("has read baseDN=" + baseDN + " generationId=" +generationId);
-
- result.setDomainGenerationId(baseDN, generationId);
- }
- else
- {
- int serverId = toInt(str[0]);
- DN baseDN = DN.decode(str[1]);
-
- if (debugEnabled())
- debug("has read: baseDN=" + baseDN + " serverId=" + serverId);
-
- result.addServerIdToDomain(serverId, baseDN);
- }
-
+ results.put(key.getData(), data.getData());
status = cursor.getNext(key, data, LockMode.DEFAULT);
}
- return result;
+ return results;
}
catch (RuntimeException e)
{
final Message message = ERR_JEB_DATABASE_EXCEPTION.get(e.getMessage());
throw new ChangelogException(message, e);
}
- catch (DirectoryException e)
- {
- throw new ChangelogException(e.getMessageObject(), e);
- }
finally
{
close(cursor);
@@ -261,7 +321,8 @@
try
{
return Integer.parseInt(data);
- } catch (NumberFormatException e)
+ }
+ catch (NumberFormatException e)
{
// should never happen
// TODO: i18n
@@ -301,7 +362,14 @@
}
}
- private byte[] toBytes(String s)
+ /**
+ * Converts the string to a UTF8-encoded byte array.
+ *
+ * @param s
+ * the string to convert
+ * @return the byte array representation of the UTF8-encoded string
+ */
+ static byte[] toBytes(String s)
{
try
{
@@ -315,8 +383,8 @@
}
/**
- * Finds or creates the database used to store changes from the server with
- * the given serverId and the given baseDN.
+ * Finds or creates the database used to store changes for a replica with the
+ * given baseDN and serverId.
*
* @param serverId
* The server id that identifies the server.
@@ -328,26 +396,27 @@
* @throws ChangelogException
* in case of underlying Exception.
*/
- public Database getOrAddDb(int serverId, DN baseDN, long generationId)
+ public Database getOrAddReplicationDB(int serverId, DN baseDN, long generationId)
throws ChangelogException
{
if (debugEnabled())
+ {
debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDN + ", "
+ generationId + ")");
+ }
try
{
// JNR: redundant info is stored between the key and data down below.
// It is probably ok since "changelogstate" DB does not receive a high
// volume of inserts.
- final String serverIdToBaseDn = buildServerIdKey(baseDN, serverId);
+ Entry<String, String> replicaEntry = toReplicaEntry(baseDN, serverId);
// Opens the DB for the changes received from this server on this domain.
- Database db = openDatabase(serverIdToBaseDn);
+ final Database replicaDB = openDatabase(replicaEntry.getKey());
- putInChangelogStateDBIfNotExist(serverIdToBaseDn, serverIdToBaseDn);
- putInChangelogStateDBIfNotExist(buildGenIdKey(baseDN),
- buildGenIdData(baseDN, generationId));
- return db;
+ putInChangelogStateDBIfNotExist(replicaEntry);
+ putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
+ return replicaDB;
}
catch (RuntimeException e)
{
@@ -355,36 +424,57 @@
}
}
- private String buildGenIdKey(DN baseDN)
+ /**
+ * Return an entry to store in the changelog state database representing a
+ * replica in the topology.
+ *
+ * @param baseDN
+ * the replica's baseDN
+ * @param serverId
+ * the replica's serverId
+ * @return a database entry for the replica
+ */
+ Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
{
- return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDN.toNormalizedString();
+ final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
+ return new SimpleImmutableEntry<String, String>(key, key);
}
- private String buildServerIdKey(DN baseDN, int serverId)
+ /**
+ * Return an entry to store in the changelog state database representing the
+ * domain generation id.
+ *
+ * @param baseDN
+ * the domain's baseDN
+ * @param generationId
+ * the domain's generationId
+ * @return a database entry for the generationId
+ */
+ Entry<String, String> toGenIdEntry(DN baseDN, long generationId)
{
- return serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
+ final String normDn = baseDN.toNormalizedString();
+ final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn;
+ final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
+ + FIELD_SEPARATOR + normDn;
+ return new SimpleImmutableEntry<String, String>(key, data);
}
- private String buildGenIdData(DN baseDN, long generationId)
+ private void putInChangelogStateDBIfNotExist(Entry<String, String> entry)
+ throws RuntimeException
{
- return GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR
- + baseDN.toNormalizedString();
- }
-
- private void putInChangelogStateDBIfNotExist(String keyString,
- String dataString) throws RuntimeException
- {
- DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
+ DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
DatabaseEntry data = new DatabaseEntry();
if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
{
Transaction txn = dbEnvironment.beginTransaction(null, null);
try
{
- data.setData(toBytes(dataString));
+ data.setData(toBytes(entry.getValue()));
if (debugEnabled())
- debug("putting record in the changelogstate Db key=[" + keyString
- + "] value=[" + dataString + "]");
+ {
+ debug("putting record in the changelogstate Db key=["
+ + entry.getKey() + "] value=[" + entry.getValue() + "]");
+ }
changelogStateDb.put(txn, key, data);
txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
}
@@ -475,7 +565,8 @@
*/
public void clearGenerationId(DN baseDN) throws ChangelogException
{
- deleteFromChangelogStateDB(buildGenIdKey(baseDN),
+ final int unusedGenId = 0;
+ deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
"clearGenerationId(baseDN=" + baseDN + ")");
}
@@ -492,19 +583,21 @@
*/
public void clearServerId(DN baseDN, int serverId) throws ChangelogException
{
- deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId),
+ deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId),
"clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
}
- private void deleteFromChangelogStateDB(String keyString,
+ private void deleteFromChangelogStateDB(Entry<String, ?> entry,
String methodInvocation) throws ChangelogException
{
if (debugEnabled())
+ {
debug(methodInvocation + " starting");
+ }
try
{
- final DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
+ final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
final DatabaseEntry data = new DatabaseEntry();
if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
{
@@ -514,7 +607,9 @@
changelogStateDb.delete(txn, key);
txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
if (debugEnabled())
+ {
debug(methodInvocation + " succeeded");
+ }
}
catch (RuntimeException dbe)
{
@@ -526,8 +621,10 @@
else
{
if (debugEnabled())
- debug(methodInvocation + " failed: key=[ " + keyString
+ {
+ debug(methodInvocation + " failed: key=[ " + entry.getKey()
+ "] not found");
+ }
}
}
catch (RuntimeException dbe)
@@ -570,7 +667,9 @@
try
{
if (txn != null)
+ {
txn.abort();
+ }
}
catch(Exception e)
{ /* do nothing */ }
--
Gitblit v1.10.0