From 9d5b1c7a628471604be4768f97fcdaf13cf0639f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Oct 2013 13:45:10 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 46 +++++----
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 150 ++++++++++++++++++++---------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 21 +--
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java | 4
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 4
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java | 11 -
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 11 -
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 41 ++++++-
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 12 ++
9 files changed, 191 insertions(+), 109 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 69028a0..59b7486 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -1348,7 +1348,7 @@
* if a database problem occurs.
*/
private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
- throws DirectoryException, ChangelogException
+ throws ChangelogException
{
// We also need to check if the draftCNdb is consistent with
// the changelogdb.
@@ -1451,7 +1451,7 @@
}
private void assignNewChangeNumberAndStore(ECLUpdateMsg change)
- throws DirectoryException, ChangelogException
+ throws ChangelogException
{
ChangeNumberIndexDB cnIndexDB = replicationServer.getChangeNumberIndexDB();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 66d17ef..5961572 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2202,7 +2202,17 @@
*/
public void clearDbs()
{
- changelogDB.removeDomain(baseDN);
+ try
+ {
+ changelogDB.removeDomain(baseDN);
+ }
+ catch (ChangelogException e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_ERROR_CLEARING_DB.get(baseDN.toString(), e.getMessage()
+ + " " + stackTraceToSingleLineString(e)));
+ logError(mb.toMessage());
+ }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
index 57489c9..cab92e2 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -172,8 +172,10 @@
*
* @param baseDN
* the replication domain baseDN
+ * @throws ChangelogException
+ * If a database problem happened
*/
- void removeDomain(DN baseDN);
+ void removeDomain(DN baseDN) throws ChangelogException;
// serverId methods
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
index fb5faf6..a8f14f9 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -653,14 +653,9 @@
return;
}
- String dbName = db.getDatabaseName();
-
- // Closing is requested by the Berkeley DB before truncate
- db.close();
+ final Database oldDb = db;
db = null; // In case there's a failure between here and recreation.
-
- // Clears the changes
- dbenv.clearDb(dbName);
+ dbenv.clearDb(oldDb);
// RE-create the db
db = dbenv.getOrCreateDraftCNDb();
@@ -687,6 +682,6 @@
*/
private boolean isDBClosed()
{
- return db == null;
+ return db == null || !db.getEnvironment().isValid();
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 7360012..dff2b0b 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -33,7 +33,6 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.config.ConfigException;
-import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -43,13 +42,11 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.types.DN;
-import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.Pair;
import org.opends.server.util.StaticUtils;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -58,9 +55,6 @@
public class JEChangelogDB implements ChangelogDB
{
- /** The tracer object for the debug logger. */
- private static final DebugTracer TRACER = getTracer();
-
/**
* This map contains the List of updates received from each LDAP server.
*/
@@ -313,6 +307,11 @@
*/
public void clearDB() throws ChangelogException
{
+ if (!dbDirectory.exists())
+ {
+ return;
+ }
+
// Remember the first exception because :
// - we want to try to remove everything we want to remove
// - then throw the first encountered exception
@@ -402,6 +401,7 @@
public void shutdownDomain(DN baseDN)
{
shutdownDbHandlers(getDomainMap(baseDN));
+ sourceDbHandlers.remove(baseDN);
}
private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap)
@@ -446,8 +446,13 @@
/** {@inheritDoc} */
@Override
- public void removeDomain(DN baseDN)
+ public void removeDomain(DN baseDN) throws ChangelogException
{
+ // Remember the first exception because :
+ // - we want to try to remove everything we want to remove
+ // - then throw the first encountered exception
+ ChangelogException firstException = null;
+
// 1- clear the replica DBs
final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
synchronized (domainMap)
@@ -458,17 +463,13 @@
{
dbHandler.clear();
}
- catch (Exception e)
+ catch (ChangelogException e)
{
- // TODO: i18n
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e
- .getMessage()
- + " " + stackTraceToSingleLineString(e)));
- logError(mb.toMessage());
+ firstException = e;
}
}
shutdownDbHandlers(domainMap);
+ sourceDbHandlers.remove(baseDN);
}
// 2- clear the ChangeNumber index DB
@@ -480,11 +481,11 @@
{
cnIndexDB.clear(baseDN);
}
- catch (Exception ignored)
+ catch (ChangelogException e)
{
- if (debugEnabled())
+ if (firstException == null)
{
- TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
+ firstException = e;
}
}
}
@@ -495,13 +496,18 @@
{
dbEnv.clearGenerationId(baseDN);
}
- catch (Exception ignored)
+ catch (ChangelogException e)
{
- if (debugEnabled())
+ if (firstException == null)
{
- TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
+ firstException = e;
}
}
+
+ if (firstException != null)
+ {
+ throw firstException;
+ }
}
/** {@inheritDoc} */
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index d2fe3b8..56b2a46 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -874,17 +874,12 @@
return;
}
- String dbName = db.getDatabaseName();
-
// Clears the reference to this serverID
dbenv.clearServerId(baseDN, serverId);
- // Closing is requested by the Berkeley DB before truncate
- db.close();
+ final Database oldDb = db;
db = null; // In case there's a failure between here and recreation.
-
- // Clears the changes
- dbenv.clearDb(dbName);
+ dbenv.clearDb(oldDb);
// RE-create the db
db = dbenv.getOrAddDb(serverId, baseDN, -1);
@@ -1190,7 +1185,7 @@
*/
private boolean isDBClosed()
{
- return db == null;
+ return db == null || !db.getEnvironment().isValid();
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index 8db0837..e10248f 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -29,7 +29,10 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -41,7 +44,9 @@
import org.opends.server.types.DirectoryException;
import com.sleepycat.je.*;
+import com.sleepycat.je.config.EnvironmentParams;
+import static com.sleepycat.je.EnvironmentConfig.*;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
@@ -51,19 +56,19 @@
import static org.opends.server.util.StaticUtils.*;
/**
- * This class is used to represent a Db environment that can be used
- * to create ReplicationDB.
+ * This class represents a DB environment that acts as a factory for
+ * ReplicationDBs.
*/
public class ReplicationDbEnv
{
private Environment dbEnvironment;
private Database changelogStateDb;
+ private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
private ReplicationServer replicationServer;
+ private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private static final String GENERATION_ID_TAG = "GENID";
private static final String FIELD_SEPARATOR = " ";
- /**
- * The tracer object for the debug logger.
- */
+ /** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
/**
@@ -92,9 +97,9 @@
*/
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
- envConfig.setConfigParam("je.cleaner.threads", "2");
- envConfig.setConfigParam("je.checkpointer.highPriority", "true");
-
+ envConfig.setConfigParam(STATS_COLLECT, "false");
+ envConfig.setConfigParam(CLEANER_THREADS, "2");
+ envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
/*
* Tests have shown that since the parsing of the Replication log is
* always done sequentially, it is not necessary to use a large DB cache.
@@ -106,15 +111,15 @@
* read buffers. This will result in more scalable checkpointer and
* cleaner performance.
*/
- envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2));
- envConfig.setConfigParam("je.log.iteratorReadSize", mb(2));
- envConfig.setConfigParam("je.log.faultReadSize", kb(4));
+ envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
+ envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
+ envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
/*
* The cache size must be bigger in order to accommodate the larger
* buffers - see OPENDJ-943.
*/
- envConfig.setConfigParam("je.maxMemory", mb(16));
+ envConfig.setConfigParam(MAX_MEMORY, mb(16));
}
else
{
@@ -122,7 +127,7 @@
* Use 5M so that the replication can be used with 64M total for the
* JVM.
*/
- envConfig.setConfigParam("je.maxMemory", mb(5));
+ envConfig.setConfigParam(MAX_MEMORY, mb(5));
}
// Since records are always added at the end of the Replication log and
@@ -160,12 +165,27 @@
return String.valueOf(sizeInMb * 1024 * 1024);
}
- private Database openDatabase(String databaseName) throws RuntimeException
+ private Database openDatabase(String databaseName) throws ChangelogException,
+ RuntimeException
{
+ if (isShuttingDown.get())
+ {
+ // TODO JNR i18n
+ throw new ChangelogException(Message.raw("DB is closing"));
+ }
final DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(true);
- return dbEnvironment.openDatabase(null, databaseName, dbConfig);
+ final Database db =
+ dbEnvironment.openDatabase(null, databaseName, dbConfig);
+ if (isShuttingDown.get())
+ {
+ closeDB(db);
+ // TODO JNR i18n
+ throw new ChangelogException(Message.raw("DB is closing"));
+ }
+ allDbs.add(db);
+ return db;
}
/**
@@ -395,44 +415,69 @@
}
}
- /**
- * Shutdown the Db environment.
- */
- public void shutdown()
+ /**
+ * Shutdown the Db environment.
+ */
+ public void shutdown()
+ {
+ isShuttingDown.set(true);
+ // CopyOnWriteArrayList iterator never throw ConcurrentModificationException
+ // This code rely on openDatabase() to close databases opened concurrently
+ // with this code
+ final Database[] allDbsCopy = allDbs.toArray(new Database[0]);
+ allDbs.clear();
+ for (Database db : allDbsCopy)
{
- try
- {
- changelogStateDb.close();
- } catch (DatabaseException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- }
-
- try
- {
- dbEnvironment.close();
- } catch (DatabaseException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- }
+ closeDB(db);
}
+ try
+ {
+ dbEnvironment.close();
+ }
+ catch (DatabaseException e)
+ {
+ logError(newErrorMessage(e));
+ }
+ }
+
+ private void closeDB(Database db)
+ {
+ allDbs.remove(db);
+ try
+ {
+ db.close();
+ }
+ catch (DatabaseException e)
+ {
+ logError(newErrorMessage(e));
+ }
+ }
+
+ private Message newErrorMessage(DatabaseException e)
+ {
+ if (!isShuttingDown.get())
+ {
+ return NOTE_EXCEPTION_CLOSING_DATABASE
+ .get(stackTraceToSingleLineString(e));
+ }
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
+ mb.append(" ");
+ mb.append(stackTraceToSingleLineString(e));
+ return mb.toMessage();
+ }
+
/**
* Clears the provided generationId associated to the provided baseDN from the
* state Db.
*
* @param baseDN
* The baseDN for which the generationID must be cleared.
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public void clearGenerationId(DN baseDN)
+ public void clearGenerationId(DN baseDN) throws ChangelogException
{
deleteFromChangelogStateDB(buildGenIdKey(baseDN),
"clearGenerationId(baseDN=" + baseDN + ")");
@@ -446,15 +491,17 @@
* The baseDN for which the serverId must be cleared.
* @param serverId
* The serverId to remove from the Db.
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public void clearServerId(DN baseDN, int serverId)
+ public void clearServerId(DN baseDN, int serverId) throws ChangelogException
{
deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId),
"clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
}
private void deleteFromChangelogStateDB(String keyString,
- String methodInvocation)
+ String methodInvocation) throws ChangelogException
{
if (debugEnabled())
debug(methodInvocation + " starting");
@@ -490,18 +537,23 @@
}
catch (RuntimeException dbe)
{
- // FIXME can actually happen (see catch above)
- // what should we do about it?
+ throw new ChangelogException(dbe);
}
}
/**
* Clears the database.
*
- * @param databaseName The name of the database to clear.
+ * @param db
+ * The database to clear.
*/
- public final void clearDb(String databaseName)
+ public final void clearDb(Database db)
{
+ String databaseName = db.getDatabaseName();
+
+ // Closing is requested by Berkeley JE before truncate
+ db.close();
+
Transaction txn = null;
try
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 4304f01..31a6447 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -28,10 +28,7 @@
package org.opends.server.replication;
import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.*;
import java.util.concurrent.locks.Lock;
import org.opends.messages.Category;
@@ -406,9 +403,6 @@
logError(Message.raw(Category.SYNC, Severity.NOTICE,
" ##### Calling ReplicationTestCase.classCleanUp ##### "));
- // Clean RS databases
- cleanUpReplicationServersDB();
-
removeReplicationServerDB();
cleanConfigEntries();
@@ -487,15 +481,18 @@
*/
protected void removeReplicationServerDB() throws Exception
{
- for (ReplicationServer rs : ReplicationServer.getAllInstances())
- {
- clearChangelogDB(rs);
- rs.getChangelogDB().removeDB();
- }
+ // avoid ConcurrentModificationException
+ remove(new ArrayList<ReplicationServer>(ReplicationServer.getAllInstances()));
}
protected void remove(ReplicationServer... replicationServers) throws Exception
{
+ remove(Arrays.asList(replicationServers));
+ }
+
+ protected void remove(Collection<ReplicationServer> replicationServers)
+ throws Exception
+ {
for (ReplicationServer rs : replicationServers)
{
if (rs != null)
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index e401945..20c856d 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -29,7 +29,6 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -168,7 +167,6 @@
@Test(enabled=true, dependsOnMethods = { "searchBackend"})
public void replicationServerTest() throws Exception
{
- clearChangelogDB(replicationServer);
changelogBasic();
newClientLateServer1();
newClient();
@@ -192,7 +190,6 @@
@Test(enabled=false, dependsOnMethods = { "searchBackend"})
public void replicationServerTestLoop() throws Exception
{
- clearChangelogDB(replicationServer);
changelogBasic();
while (true)
{
@@ -286,7 +283,36 @@
"ReplicationServer basic : incorrect message type received: "
+ receivedMsg.getClass() + ": content: " + receivedMsg);
assertEquals(receivedMsg.toString(), sentMsg.toString(),
- "ReplicationServer basic : incorrect message body received.");
+ "ReplicationServer basic : incorrect message body received. CSN is same as \""
+ + getCSNFieldName(((DeleteMsg) receivedMsg).getCSN()) + "\" field.");
+ }
+
+ private String getCSNFieldName(CSN csn)
+ {
+ if (csn == null) {
+ return "";
+ }
+ if (csn.equals(firstCSNServer1))
+ {
+ return "firstCSNServer1";
+ }
+ else if (csn.equals(secondCSNServer1))
+ {
+ return "secondCSNServer1";
+ }
+ else if (csn.equals(firstCSNServer2))
+ {
+ return "firstCSNServer2";
+ }
+ else if (csn.equals(secondCSNServer2))
+ {
+ return "secondCSNServer2";
+ }
+ else if (csn.equals(unknownCSNServer1))
+ {
+ return "unknownCSNServer1";
+ }
+ return null;
}
private ServerState newServerState(CSN... csns)
@@ -855,7 +881,6 @@
@Test(enabled=true, dependsOnMethods = { "searchBackend"})
public void windowProbeTest() throws Exception
{
-
debugInfo("Starting windowProbeTest");
final int WINDOW = 10;
@@ -877,13 +902,13 @@
*/
// open the first session to the replication server
- InetSocketAddress ServerAddr = new InetSocketAddress(
- InetAddress.getByName("localhost"), replicationServerPort);
+ InetSocketAddress serverAddr =
+ new HostPort("localhost", replicationServerPort).toInetSocketAddress();
Socket socket = new Socket();
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
- socket.connect(ServerAddr, timeoutMS);
+ socket.connect(serverAddr, timeoutMS);
ReplSessionSecurity replSessionSecurity = getReplSessionSecurity();
Session session = replSessionSecurity.createClientSession(socket, timeoutMS);
--
Gitblit v1.10.0