From a670c31c686f19c2e8619602de6518cde032f61f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 06 Sep 2013 21:41:58 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 235 +++++++++++++++++-----------------------------------------
1 files changed, 68 insertions(+), 167 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index c4e4eee..eef08e4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -46,9 +47,9 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
-import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -117,17 +118,26 @@
private final Queue<MessageHandler> otherHandlers =
new ConcurrentLinkedQueue<MessageHandler>();
- /**
- * This map contains the List of updates received from each LDAP server.
- */
- private final Map<Integer, DbHandler> sourceDbHandlers =
- new ConcurrentHashMap<Integer, DbHandler>();
+ private final ChangelogDB changelogDB;
/** The ReplicationServer that created the current instance. */
private ReplicationServer localReplicationServer;
- /** GenerationId management. */
+ /**
+ * The generationId of the current replication domain. The generationId is
+ * computed by hashing the first 1000 entries in the DB.
+ */
private volatile long generationId = -1;
- private boolean generationIdSavedStatus = false;
+ /**
+ * JNR, this is legacy code, hard to follow logic. I think what this field
+ * tries to say is: "is the generationId in use anywhere?", i.e. is there a
+ * replication topology in place? As soon as an answer to any of these
+ * question comes true, then it is set to true.
+ * <p>
+ * It looks like the only use of this field is to prevent the
+ * {@link #generationId} from being reset by
+ * {@link #resetGenerationIdIfPossible()}.
+ */
+ private volatile boolean generationIdSavedStatus = false;
/** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
@@ -177,6 +187,7 @@
this.assuredTimeoutTimer = new Timer("Replication server RS("
+ localReplicationServer.getServerId()
+ ") assured timer for domain \"" + baseDn + "\"", true);
+ this.changelogDB = localReplicationServer.getChangelogDB();
DirectoryServer.registerMonitorProvider(this);
}
@@ -252,7 +263,7 @@
}
}
- if (!publishMessage(update, serverId))
+ if (!publishUpdateMsg(update, serverId))
{
return;
}
@@ -390,43 +401,46 @@
}
}
- private boolean publishMessage(UpdateMsg update, int serverId)
+ private boolean publishUpdateMsg(UpdateMsg updateMsg, int serverId)
{
- // look for the dbHandler that is responsible for the LDAP server which
- // generated the change.
- DbHandler dbHandler;
- synchronized (sourceDbHandlers)
+ try
{
- dbHandler = sourceDbHandlers.get(serverId);
- if (dbHandler == null)
+ if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg))
{
- try
- {
- dbHandler = localReplicationServer.newDbHandler(serverId, baseDn);
- generationIdSavedStatus = true;
- } catch (ChangelogException e)
+ /*
+ * JNR: Matt and I had a hard time figuring out where to put this
+ * synchronized block. We elected to put it here, but without a strong
+ * conviction.
+ */
+ synchronized (generationIDLock)
{
/*
- * Because of database problem we can't save any more changes
- * from at least one LDAP server.
- * This replicationServer therefore can't do it's job properly anymore
- * and needs to close all its connections and shutdown itself.
+ * JNR: I think the generationIdSavedStatus is set to true because
+ * method above created a ReplicaDB which assumes the generationId was
+ * communicated to another server. Hence setting true on this field
+ * prevent the generationId from being reset.
*/
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- localReplicationServer.shutdown();
- return false;
+ generationIdSavedStatus = true;
}
- sourceDbHandlers.put(serverId, dbHandler);
}
+ return true;
}
-
- // Publish the messages to the source handler
- dbHandler.add(update);
- return true;
+ catch (ChangelogException e)
+ {
+ /*
+ * Because of database problem we can't save any more changes from at
+ * least one LDAP server. This replicationServer therefore can't do it's
+ * job properly anymore and needs to close all its connections and
+ * shutdown itself.
+ */
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ mb.append(" ");
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ localReplicationServer.shutdown();
+ return false;
+ }
}
private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
@@ -1261,7 +1275,7 @@
*/
public Set<Integer> getServerIds()
{
- return sourceDbHandlers.keySet();
+ return changelogDB.getDomainServerIds(baseDn);
}
/**
@@ -1278,29 +1292,7 @@
*/
public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
{
- DbHandler dbHandler = sourceDbHandlers.get(serverId);
- if (dbHandler == null)
- {
- return null;
- }
-
- ReplicaDBCursor cursor;
- try
- {
- cursor = dbHandler.generateCursorFrom(startAfterCSN);
- }
- catch (Exception e)
- {
- return null;
- }
-
- if (!cursor.next())
- {
- close(cursor);
- return null;
- }
-
- return cursor;
+ return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN);
}
/**
@@ -1313,12 +1305,7 @@
*/
public long getCount(int serverId, CSN from, CSN to)
{
- DbHandler dbHandler = sourceDbHandlers.get(serverId);
- if (dbHandler != null)
- {
- return dbHandler.getCount(from, to);
- }
- return 0;
+ return changelogDB.getCount(baseDn, serverId, from, to);
}
/**
@@ -1328,12 +1315,7 @@
*/
public long getChangesCount()
{
- long entryCount = 0;
- for (DbHandler dbHandler : sourceDbHandlers.values())
- {
- entryCount += dbHandler.getChangesCount();
- }
- return entryCount;
+ return changelogDB.getDomainChangesCount(baseDn);
}
/**
@@ -1346,24 +1328,6 @@
}
/**
- * Sets the provided DbHandler associated to the provided serverId.
- *
- * @param serverId the serverId for the server to which is
- * associated the DbHandler.
- * @param dbHandler the dbHandler associated to the serverId.
- *
- * @throws ChangelogException If a database error happened.
- */
- public void setDbHandler(int serverId, DbHandler dbHandler)
- throws ChangelogException
- {
- synchronized (sourceDbHandlers)
- {
- sourceDbHandlers.put(serverId, dbHandler);
- }
- }
-
- /**
* Retrieves the destination handlers for a routable message.
*
* @param msg The message to route.
@@ -1734,20 +1698,7 @@
stopAllServers(true);
- shutdownDbHandlers();
- }
-
- /** Shutdown all the dbHandlers. */
- private void shutdownDbHandlers()
- {
- synchronized (sourceDbHandlers)
- {
- for (DbHandler dbHandler : sourceDbHandlers.values())
- {
- dbHandler.shutdown();
- }
- sourceDbHandlers.clear();
- }
+ changelogDB.shutdownDomain(baseDn);
}
/**
@@ -1758,9 +1709,9 @@
public ServerState getDbServerState()
{
ServerState serverState = new ServerState();
- for (DbHandler db : sourceDbHandlers.values())
+ for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values())
{
- serverState.update(db.getLastChange());
+ serverState.update(lastCSN);
}
return serverState;
}
@@ -2235,24 +2186,7 @@
public void clearDbs()
{
// Reset the localchange and state db for the current domain
- synchronized (sourceDbHandlers)
- {
- for (DbHandler dbHandler : sourceDbHandlers.values())
- {
- try
- {
- dbHandler.clear();
- } catch (Exception e)
- {
- // TODO: i18n
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
- e.getMessage() + " " + stackTraceToSingleLineString(e)));
- logError(mb.toMessage());
- }
- }
- shutdownDbHandlers();
- }
+ changelogDB.clearDomain(baseDn);
try
{
localReplicationServer.clearGenerationId(baseDn);
@@ -2397,20 +2331,6 @@
}
/**
- * Set the purge delay on all the db Handlers for this Domain
- * of Replication.
- *
- * @param delay The new purge delay to use.
- */
- public void setPurgeDelay(long delay)
- {
- for (DbHandler dbHandler : sourceDbHandlers.values())
- {
- dbHandler.setPurgeDelay(delay);
- }
- }
-
- /**
* Get the map of connected DSs.
* @return The map of connected DSs
*/
@@ -2667,7 +2587,6 @@
{
for (int serverId : dbState)
{
- DbHandler h = sourceDbHandlers.get(serverId);
CSN mostRecentDbCSN = dbState.getCSN(serverId);
try {
// Is the most recent change in the Db newer than eligible CSN ?
@@ -2676,19 +2595,8 @@
if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
{
// let's try to seek the first change <= eligibleCSN
- ReplicaDBCursor cursor = null;
- try {
- cursor = h.generateCursorFrom(eligibleCSN);
- if (cursor != null && cursor.getChange() != null) {
- CSN newCSN = cursor.getChange().getCSN();
- result.update(newCSN);
- }
- } catch (ChangelogException e) {
- // there's no change older than eligibleCSN (case of s3/csn31)
- result.update(new CSN(0, 0, serverId));
- } finally {
- close(cursor);
- }
+ CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN);
+ result.update(newCSN);
} else {
// for this serverId, all changes in the ChangelogDb are holder
// than eligibleCSN, the most recent in the db is our guy.
@@ -2721,9 +2629,9 @@
public ServerState getStartState()
{
ServerState domainStartState = new ServerState();
- for (DbHandler dbHandler : sourceDbHandlers.values())
+ for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values())
{
- domainStartState.update(dbHandler.getFirstChange());
+ domainStartState.update(firstCSN);
}
return domainStartState;
}
@@ -2741,10 +2649,12 @@
{
CSN eligibleCSN = null;
- for (DbHandler db : sourceDbHandlers.values())
+ for (Entry<Integer, CSN> entry :
+ changelogDB.getDomainLastCSNs(baseDn).entrySet())
{
// Consider this producer (DS/db).
- int serverId = db.getServerId();
+ final int serverId = entry.getKey();
+ final CSN changelogLastCSN = entry.getValue();
// Should it be considered for eligibility ?
CSN heartbeatLastCSN =
@@ -2774,7 +2684,6 @@
continue;
}
- CSN changelogLastCSN = db.getLastChange();
if (changelogLastCSN != null
&& (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN)))
{
@@ -2935,15 +2844,7 @@
*/
public long getLatestDomainTrimDate()
{
- long latest = 0;
- for (DbHandler db : sourceDbHandlers.values())
- {
- if (latest == 0 || latest < db.getLatestTrimDate())
- {
- latest = db.getLatestTrimDate();
- }
- }
- return latest;
+ return changelogDB.getDomainLatestTrimDate(baseDn);
}
/**
--
Gitblit v1.10.0