From 2f54bf6d307ffcdd3c65937e181b16bffb277a2c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 07 Jul 2014 15:29:48 +0000
Subject: [PATCH] OPENDJ-1453 Replica offline messages should be synced with updates
---
opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java | 27 +++
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 19 +-
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 72 +++++++-
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java | 18 +
opendj-sdk/opends/src/messages/messages/replication.properties | 2
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java | 100 ++++++++----
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 13 -
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java | 52 ++++--
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 19 +-
opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java | 14 +
opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogState.java | 89 ++++++++--
11 files changed, 305 insertions(+), 120 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index d049647..e8a0ac6 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -531,8 +531,6 @@
change %s to replicaDB %s %s because: %s
SEVERE_ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB_240=Could not add \
change %s to replicaDB %s %s because flushing thread is shutting down
-SEVERE_ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \
- state from root path '%s' : directory might not exist
SEVERE_ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY_243=Error when retrieving \
changelog state from root path '%s' : IO error on domain directory '%s' when retrieving \
list of server ids
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index f451cab..705a9a2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -26,7 +26,10 @@
*/
package org.opends.server.replication.common;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -168,7 +171,31 @@
}
/**
+ * Returns a snapshot of this object.
+ *
+ * @return an unmodifiable Map representing a snapshot of this object.
+ */
+ public Map<DN, List<CSN>> getSnapshot()
+ {
+ if (list.isEmpty())
+ {
+ return Collections.emptyMap();
+ }
+ final Map<DN, List<CSN>> map = new HashMap<DN, List<CSN>>();
+ for (Entry<DN, ServerState> entry : list.entrySet())
+ {
+ final List<CSN> l = entry.getValue().getSnapshot();
+ if (!l.isEmpty())
+ {
+ map.put(entry.getKey(), l);
+ }
+ }
+ return Collections.unmodifiableMap(map);
+ }
+
+ /**
* Returns a string representation of this object.
+ *
* @return The string representation.
*/
@Override
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index 73d9714..c084dec 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -250,6 +250,20 @@
}
/**
+ * Returns a snapshot of this object.
+ *
+ * @return an unmodifiable List representing a snapshot of this object.
+ */
+ public List<CSN> getSnapshot()
+ {
+ if (serverIdToCSN.isEmpty())
+ {
+ return Collections.emptyList();
+ }
+ return Collections.unmodifiableList(new ArrayList<CSN>(serverIdToCSN.values()));
+ }
+
+ /**
* Return the text representation of ServerState.
* @return the text representation of ServerState
*/
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogState.java
index a0cb04b..503851e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogState.java
@@ -25,12 +25,13 @@
*/
package org.opends.server.replication.server;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.types.DN;
/**
@@ -43,15 +44,16 @@
* <p>
* This class is used during replication initialization to decouple the code
* that reads the changelogStateDB from the code that makes use of its data.
+ *
+ * @ThreadSafe
*/
public class ChangelogState
{
- private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>();
- private final Map<DN, List<Integer>> domainToServerIds =
- new HashMap<DN, List<Integer>>();
- private final Map<DN, List<CSN>> offlineReplicas =
- new HashMap<DN, List<CSN>>();
+ private final ConcurrentSkipListMap<DN, Long> domainToGenerationId = new ConcurrentSkipListMap<DN, Long>();
+ private final ConcurrentSkipListMap<DN, Set<Integer>> domainToServerIds =
+ new ConcurrentSkipListMap<DN, Set<Integer>>();
+ private final MultiDomainServerState offlineReplicas = new MultiDomainServerState();
/**
* Sets the generationId for the supplied replication domain.
@@ -76,11 +78,16 @@
*/
public void addServerIdToDomain(int serverId, DN baseDN)
{
- List<Integer> serverIds = domainToServerIds.get(baseDN);
+ Set<Integer> serverIds = domainToServerIds.get(baseDN);
if (serverIds == null)
{
- serverIds = new LinkedList<Integer>();
- domainToServerIds.put(baseDN, serverIds);
+ serverIds = new HashSet<Integer>();
+ final Set<Integer> existingServerIds =
+ domainToServerIds.putIfAbsent(baseDN, serverIds);
+ if (existingServerIds != null)
+ {
+ serverIds = existingServerIds;
+ }
}
serverIds.add(serverId);
}
@@ -95,13 +102,25 @@
*/
public void addOfflineReplica(DN baseDN, CSN offlineCSN)
{
- List<CSN> offlineCSNs = offlineReplicas.get(baseDN);
- if (offlineCSNs == null)
+ offlineReplicas.update(baseDN, offlineCSN);
+ }
+
+ /**
+ * Removes the following replica information from the offline list.
+ *
+ * @param baseDN
+ * the baseDN of the offline replica
+ * @param serverId
+ * the serverId that is not offline anymore
+ */
+ public void removeOfflineReplica(DN baseDN, int serverId)
+ {
+ CSN csn;
+ do
{
- offlineCSNs = new LinkedList<CSN>();
- offlineReplicas.put(baseDN, offlineCSNs);
+ csn = offlineReplicas.getCSN(baseDN, serverId);
}
- offlineCSNs.add(offlineCSN);
+ while (csn != null && !offlineReplicas.removeCSN(baseDN, csn));
}
/**
@@ -119,21 +138,51 @@
*
* @return a Map of domainBaseDN => List<serverId>.
*/
- public Map<DN, List<Integer>> getDomainToServerIds()
+ public Map<DN, Set<Integer>> getDomainToServerIds()
{
return domainToServerIds;
}
/**
- * Returns the Map of domainBaseDN => List<offlineCSN>.
+ * Returns the internal MultiDomainServerState for offline replicas.
*
- * @return a Map of domainBaseDN => List<offlineCSN>.
+ * @return the MultiDomainServerState for offline replicas.
*/
- public Map<DN, List<CSN>> getOfflineReplicas()
+ public MultiDomainServerState getOfflineReplicas()
{
return offlineReplicas;
}
+ /**
+ * Returns whether the current ChangelogState is equal to the provided
+ * ChangelogState.
+ * <p>
+ * Note: Only use for tests!!<br>
+ * This method should only be used by tests because it creates a lot of
+ * intermediate objects which is not suitable for production.
+ *
+ * @param other
+ * the ChangelogState to compare with
+ * @return true if the current ChangelogState is equal to the provided
+ * ChangelogState, false otherwise.
+ */
+ public boolean isEqualTo(ChangelogState other)
+ {
+ if (other == null)
+ {
+ return false;
+ }
+ if (this == other)
+ {
+ return true;
+ }
+ return domainToGenerationId.equals(other.domainToGenerationId)
+ && domainToServerIds.equals(other.domainToServerIds)
+ // Note: next line is not suitable for production
+ // because it creates lots of Lists and Maps
+ && offlineReplicas.getSnapshot().equals(other.offlineReplicas.getSnapshot());
+ }
+
/** {@inheritDoc} */
@Override
public String toString()
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 19e68b6..7592fd9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -38,6 +38,7 @@
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -259,7 +260,7 @@
{
final File dbDir = getFileForPath(config.getReplicationDBDirectory());
replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer);
- final ChangelogState changelogState = replicationEnv.readChangelogState();
+ final ChangelogState changelogState = replicationEnv.getChangelogState();
initializeToChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
@@ -284,7 +285,7 @@
{
replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
}
- for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
+ for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
{
for (int serverId : entry.getValue())
{
@@ -568,7 +569,7 @@
{
if (computeChangeNumber)
{
- startIndexer(replicationEnv.readChangelogState());
+ startIndexer(replicationEnv.getChangelogState());
}
else
{
@@ -632,7 +633,7 @@
throws ChangelogException
{
final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
- final ChangelogState state = replicationEnv.readChangelogState();
+ final MultiDomainServerState offlineReplicas = replicationEnv.getChangelogState().getOfflineReplicas();
final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
for (int serverId : serverIds)
{
@@ -640,7 +641,7 @@
final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
replicaDBCursor.next();
- final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
+ final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
}
// recycle exhausted cursors,
@@ -648,13 +649,13 @@
return new CompositeDBCursor<Void>(cursors, true);
}
- private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
+ private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
ServerState startAfterServerState)
{
- final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
- if (domain != null)
+ final ServerState domainState = offlineReplicas.getServerState(baseDN);
+ if (domainState != null)
{
- for (CSN offlineCSN : domain)
+ for (CSN offlineCSN : domainState)
{
if (serverId == offlineCSN.getServerId()
&& !startAfterServerState.cover(offlineCSN))
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index 65cd39c..c04fc94 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -173,15 +173,33 @@
/** Root path where the replication log is stored. */
private final String replicationRootPath;
+ /**
+ * The current changelogState. This is in-memory version of what is inside the
+ * on-disk changelogStateDB. It improves performances in case the
+ * changelogState is read often.
+ *
+ * @GuardedBy("domainsLock")
+ */
+ private final ChangelogState changelogState;
/** The list of logs that are in use. */
private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
- /** Maps each domain DN to a domain id that is used to name directory in file system. */
+ /**
+ * Maps each domain DN to a domain id that is used to name directory in file system.
+ *
+ * @GuardedBy("domainsLock")
+ */
private final Map<DN, String> domains = new HashMap<DN, String>();
- /** Exclusive lock to guard the domains mapping and change of state to a domain.*/
- private final Object domainLock = new Object();
+ /**
+ * Exclusive lock to synchronize:
+ * <ul>
+ * <li>the domains mapping</li>
+ * <li>changes to the in-memory changelogState</li>
+ * <li>changes to the on-disk state of a domain</li>
+ */
+ private final Object domainsLock = new Object();
/** The underlying replication server. */
private final ReplicationServer replicationServer;
@@ -203,21 +221,21 @@
{
this.replicationRootPath = rootPath;
this.replicationServer = replicationServer;
+ this.changelogState = readOnDiskChangelogState();
}
/**
- * Returns the state of the replication changelog, which includes the list of
- * known servers and the generation id.
+ * Returns the state of the replication changelog.
*
- * @return the {@link ChangelogState}
+ * @return the {@link ChangelogState} read from the changelogState DB
* @throws ChangelogException
- * if a problem occurs while retrieving the state.
+ * if a database problem occurs
*/
- ChangelogState readChangelogState() throws ChangelogException
+ ChangelogState readOnDiskChangelogState() throws ChangelogException
{
final ChangelogState state = new ChangelogState();
final File changelogPath = new File(replicationRootPath);
- synchronized (domainLock)
+ synchronized (domainsLock)
{
readDomainsStateFile();
checkDomainDirectories(changelogPath);
@@ -230,6 +248,16 @@
}
/**
+ * Returns the current state of the replication changelog.
+ *
+ * @return the current {@link ChangelogState}
+ */
+ ChangelogState getChangelogState()
+ {
+ return changelogState;
+ }
+
+ /**
* Finds or creates the log used to store changes from the replication server
* with the given serverId and the given baseDN.
*
@@ -256,7 +284,7 @@
ensureRootDirectoryExists();
String domainId = null;
- synchronized (domainLock)
+ synchronized (domainsLock)
{
domainId = domains.get(domainDN);
if (domainId == null)
@@ -266,9 +294,11 @@
final File serverIdPath = getServerIdPath(domainId, serverId);
ensureServerIdDirectoryExists(serverIdPath);
+ changelogState.addServerIdToDomain(serverId, domainDN);
final File generationIdPath = getGenerationIdPath(domainId, generationId);
ensureGenerationIdFileExists(generationIdPath);
+ changelogState.setDomainGenerationId(domainDN, generationId);
return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
}
@@ -333,12 +363,12 @@
*/
void clearGenerationId(final DN domainDN) throws ChangelogException
{
- synchronized (domainLock)
+ synchronized (domainsLock)
{
final String domainId = domains.get(domainDN);
if (domainId == null)
{
- return; // unknow domain => no-op
+ return; // unknown domain => no-op
}
final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
if (idFile != null)
@@ -350,6 +380,7 @@
ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
}
}
+ changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID);
}
}
@@ -364,16 +395,17 @@
*/
void resetGenerationId(final DN baseDN) throws ChangelogException
{
- synchronized (domainLock)
+ synchronized (domainsLock)
{
clearGenerationId(baseDN);
final String domainId = domains.get(baseDN);
if (domainId == null)
{
- return; // unknow domain => no-op
+ return; // unknown domain => no-op
}
final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
ensureGenerationIdFileExists(generationIdPath);
+ changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID);
}
}
@@ -390,12 +422,12 @@
*/
void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
{
- synchronized (domainLock)
+ synchronized (domainsLock)
{
final String domainId = domains.get(domainDN);
if (domainId == null)
{
- return; // unknow domain => no-op
+ return; // unknown domain => no-op
}
final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
if (!serverIdPath.exists())
@@ -409,6 +441,7 @@
// Overwrite file, only the last sent offline CSN is kept
writer = newFileWriter(offlineFile);
writer.write(offlineCSN.toString());
+ changelogState.addOfflineReplica(domainDN, offlineCSN);
}
catch (IOException e)
{
@@ -435,12 +468,12 @@
*/
void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
{
- synchronized (domainLock)
+ synchronized (domainsLock)
{
final String domainId = domains.get(domainDN);
if (domainId == null)
{
- return; // unknow domain => no-op
+ return; // unknown domain => no-op
}
final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
if (offlineFile.exists())
@@ -452,6 +485,7 @@
offlineFile.getPath(), domainDN.toString(), serverId));
}
}
+ changelogState.removeOfflineReplica(domainDN, serverId);
}
}
@@ -497,24 +531,22 @@
private void checkDomainDirectories(final File changelogPath) throws ChangelogException
{
final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
- if (dnDirectories == null)
+ if (dnDirectories != null)
{
- throw new ChangelogException(ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH.get(replicationRootPath));
- }
+ final Set<String> domainIdsFromFileSystem = new HashSet<String>();
+ for (final File dnDir : dnDirectories)
+ {
+ final String fileName = dnDir.getName();
+ final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
+ domainIdsFromFileSystem.add(domainId);
+ }
- Set<String> domainIdsFromFileSystem = new HashSet<String>();
- for (final File dnDir : dnDirectories)
- {
- final String fileName = dnDir.getName();
- final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
- domainIdsFromFileSystem.add(domainId);
- }
-
- Set<String> expectedDomainIds = new HashSet<String>(domains.values());
- if (!domainIdsFromFileSystem.equals(expectedDomainIds))
- {
- throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
- domainIdsFromFileSystem.toString()));
+ final Set<String> expectedDomainIds = new HashSet<String>(domains.values());
+ if (!domainIdsFromFileSystem.equals(expectedDomainIds))
+ {
+ throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
+ domainIdsFromFileSystem.toString()));
+ }
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 052b0d0..deacf2a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -33,9 +33,9 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -378,8 +378,7 @@
// initialize the DB cursor and the last seen updates
// to ensure the medium consistency CSN can move forward
final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
- for (Entry<DN, List<Integer>> entry
- : changelogState.getDomainToServerIds().entrySet())
+ for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
{
final DN baseDN = entry.getKey();
if (!isECLEnabledDomain(baseDN))
@@ -422,12 +421,10 @@
nextChangeForInsertDBCursor.next();
}
- for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas()
- .entrySet())
+ final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
+ for (DN baseDN : offlineReplicas)
{
- final DN baseDN = entry.getKey();
- final List<CSN> offlineCSNs = entry.getValue();
- for (CSN offlineCSN : offlineCSNs)
+ for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
{
if (isECLEnabledDomain(baseDN))
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 9d575dc..723b682 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -39,6 +39,7 @@
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -316,7 +317,7 @@
{
final File dbDir = getFileForPath(config.getReplicationDBDirectory());
dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
- final ChangelogState changelogState = dbEnv.readChangelogState();
+ final ChangelogState changelogState = dbEnv.getChangelogState();
initializeToChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
@@ -341,7 +342,7 @@
{
replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
}
- for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
+ for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
{
for (int serverId : entry.getValue())
{
@@ -643,7 +644,7 @@
{
if (computeChangeNumber)
{
- startIndexer(dbEnv.readChangelogState());
+ startIndexer(dbEnv.getChangelogState());
}
else
{
@@ -707,7 +708,7 @@
throws ChangelogException
{
final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
- final ChangelogState state = dbEnv.readChangelogState();
+ final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
for (int serverId : serverIds)
{
@@ -715,7 +716,7 @@
final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
replicaDBCursor.next();
- final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
+ final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
}
// recycle exhausted cursors,
@@ -723,13 +724,13 @@
return new CompositeDBCursor<Void>(cursors, true);
}
- private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
+ private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
ServerState startAfterServerState)
{
- final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
- if (domain != null)
+ final ServerState domainState = offlineReplicas.getServerState(baseDN);
+ if (domainState != null)
{
- for (CSN offlineCSN : domain)
+ for (CSN offlineCSN : domainState)
{
if (serverId == offlineCSN.getServerId()
&& !startAfterServerState.cover(offlineCSN))
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index cc9e286..3f5aa10 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -66,6 +66,16 @@
{
private Environment dbEnvironment;
private Database changelogStateDb;
+ /**
+ * The current changelogState. This is in-memory version of what is inside the
+ * on-disk changelogStateDB. It improves performances in case the
+ * changelogState is read often.
+ *
+ * @GuardedBy("stateLock")
+ */
+ private final ChangelogState changelogState;
+ /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState */
+ private final Object stateLock = new Object();
private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
private ReplicationServer replicationServer;
private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
@@ -101,6 +111,7 @@
* of all the servers that have been seen in the past.
*/
changelogStateDb = openDatabase("changelogstate");
+ changelogState = readOnDiskChangelogState();
}
catch (RuntimeException e)
{
@@ -222,13 +233,23 @@
}
/**
- * Read and return the list of known servers from the database.
+ * Return the current changelog state.
+ *
+ * @return the current {@link ChangelogState}
+ */
+ public ChangelogState getChangelogState()
+ {
+ return changelogState;
+ }
+
+ /**
+ * Read and return the changelog state from the database.
*
* @return the {@link ChangelogState} read from the changelogState DB
* @throws ChangelogException
* if a database problem occurs
*/
- public ChangelogState readChangelogState() throws ChangelogException
+ protected ChangelogState readOnDiskChangelogState() throws ChangelogException
{
return decodeChangelogState(readWholeState());
}
@@ -434,8 +455,13 @@
// Opens the DB for the changes received from this server on this domain.
final Database replicaDB = openDatabase(replicaEntry.getKey());
- putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
- putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
+ synchronized (stateLock)
+ {
+ putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
+ changelogState.addServerIdToDomain(serverId, baseDN);
+ putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
+ changelogState.setDomainGenerationId(baseDN, generationId);
+ }
return replicaDB;
}
catch (RuntimeException e)
@@ -638,9 +664,13 @@
*/
public void clearGenerationId(DN baseDN) throws ChangelogException
{
- final int unusedGenId = 0;
- deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
- "clearGenerationId(baseDN=" + baseDN + ")");
+ synchronized (stateLock)
+ {
+ final int unusedGenId = 0;
+ deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
+ "clearGenerationId(baseDN=" + baseDN + ")");
+ changelogState.setDomainGenerationId(baseDN, unusedGenId);
+ }
}
/**
@@ -656,8 +686,12 @@
*/
public void clearServerId(DN baseDN, int serverId) throws ChangelogException
{
- deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
- "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
+ synchronized (stateLock)
+ {
+ deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
+ "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
+ changelogState.setDomainGenerationId(baseDN, -1);
+ }
}
private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
@@ -721,10 +755,14 @@
public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
throws ChangelogException
{
- // just overwrite any older entry as it is assumed a newly received offline
- // CSN is newer than the previous one
- putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
- "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+ synchronized (stateLock)
+ {
+ // just overwrite any older entry as it is assumed a newly received offline
+ // CSN is newer than the previous one
+ putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
+ "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+ changelogState.addOfflineReplica(baseDN, offlineCSN);
+ }
}
/**
@@ -742,8 +780,12 @@
*/
public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
{
- deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
- "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+ synchronized (stateLock)
+ {
+ deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
+ "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
+ changelogState.removeOfflineReplica(baseDN, serverId);
+ }
}
private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
index de8c1f5..dab5f44 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -25,9 +25,6 @@
*/
package org.opends.server.replication.server.changelog.file;
-import static org.assertj.core.api.Assertions.*;
-import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
-
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,10 +42,14 @@
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
+
@SuppressWarnings("javadoc")
public class ReplicationEnvironmentTest extends DirectoryServerTestCase
{
@@ -65,7 +66,13 @@
public void setUp() throws Exception
{
// This test suite depends on having the schema available for DN decoding.
- TestCaseUtils.startServer();
+ TestCaseUtils.startFakeServer();
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception
+ {
+ TestCaseUtils.shutdownFakeServer();
}
@AfterMethod
@@ -92,11 +99,13 @@
replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
- ChangelogState state = environment.readChangelogState();
+ final ChangelogState state = environment.readOnDiskChangelogState();
assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2);
assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+
+ assertThat(state).isEqualTo(environment.getChangelogState());
}
finally
{
@@ -124,7 +133,7 @@
}
}
- ChangelogState state = environment.readChangelogState();
+ final ChangelogState state = environment.readOnDiskChangelogState();
assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
for (int i = 0; i <= 2 ; i++)
@@ -135,6 +144,8 @@
MapEntry.entry(domainDNs.get(0), 1L),
MapEntry.entry(domainDNs.get(1), 2L),
MapEntry.entry(domainDNs.get(2), 3L));
+
+ assertThat(state).isEqualTo(environment.getChangelogState());
}
finally
{
@@ -160,9 +171,12 @@
CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
environment.notifyReplicaOffline(domainDN, offlineCSN);
- ChangelogState state = environment.readChangelogState();
+ final ChangelogState state = environment.readOnDiskChangelogState();
- assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+ assertThat(state.getOfflineReplicas().getSnapshot())
+ .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+
+ assertThat(state).isEqualTo(environment.getChangelogState());
}
finally
{
@@ -186,7 +200,7 @@
File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME);
offlineStateFile.createNewFile();
- environment.readChangelogState();
+ environment.readOnDiskChangelogState();
}
finally
{
@@ -214,9 +228,10 @@
CSN lastOfflineCSN = csnGenerator.newCSN();
environment.notifyReplicaOffline(domainDN, lastOfflineCSN);
- ChangelogState state = environment.readChangelogState();
-
- assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
+ final ChangelogState state = environment.readOnDiskChangelogState();
+ assertThat(state.getOfflineReplicas().getSnapshot())
+ .containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
+ assertThat(state).isEqualTo(environment.getChangelogState());
}
finally
{
@@ -243,9 +258,9 @@
// put server id 1 online again
environment.notifyReplicaOnline(domainDN, SERVER_ID_1);
- ChangelogState state = environment.readChangelogState();
-
+ final ChangelogState state = environment.readOnDiskChangelogState();
assertThat(state.getOfflineReplicas()).isEmpty();
+ assertThat(state).isEqualTo(environment.getChangelogState());
}
finally
{
@@ -269,12 +284,15 @@
CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
environment.notifyReplicaOffline(domainDN, offlineCSN);
- ChangelogState state = environment.readChangelogState();
+ final ChangelogState state = environment.readOnDiskChangelogState();
assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1);
assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
- assertThat(state.getOfflineReplicas()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+ assertThat(state.getOfflineReplicas().getSnapshot())
+ .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+
+ assertThat(state).isEqualTo(environment.getChangelogState());
}
finally
{
@@ -299,7 +317,7 @@
// consistency with domain state file
StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
- environment.readChangelogState();
+ environment.readOnDiskChangelogState();
}
finally
{
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
index 6aac87b..6af0702 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.HashSet;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
@@ -70,11 +71,16 @@
}
@Override
- protected Database openDatabase(String databaseName)
- throws ChangelogException, RuntimeException
+ protected Database openDatabase(String databaseName) throws ChangelogException, RuntimeException
{
return null;
}
+
+ @Override
+ protected ChangelogState readOnDiskChangelogState() throws ChangelogException
+ {
+ return new ChangelogState();
+ }
}
@BeforeClass
@@ -130,8 +136,8 @@
entry(baseDN, generationId));
if (!replicas.isEmpty())
{
- assertThat(state.getDomainToServerIds()).containsExactly(
- entry(baseDN, replicas));
+ assertThat(state.getDomainToServerIds())
+ .containsExactly(entry(baseDN, new HashSet<Integer>(replicas)));
}
else
{
@@ -139,8 +145,8 @@
}
if (!offlineReplicas.isEmpty())
{
- assertThat(state.getOfflineReplicas()).containsExactly(
- entry(baseDN, offlineReplicas));
+ assertThat(state.getOfflineReplicas().getSnapshot())
+ .containsExactly(entry(baseDN, offlineReplicas));
}
else
{
--
Gitblit v1.10.0