From 8ed297692b7674b67b8d05a26fa9b04c20930e37 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 06 Sep 2013 21:41:58 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 147 +----
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 452 ++++++++++++++++++
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 19
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java | 229 +++++++++
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java | 36
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 235 ++------
opends/src/server/org/opends/server/util/Pair.java | 162 ++++++
opends/src/server/org/opends/server/replication/server/ChangelogState.java | 104 ++++
9 files changed, 1,070 insertions(+), 316 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ChangelogState.java b/opends/src/server/org/opends/server/replication/server/ChangelogState.java
new file mode 100644
index 0000000..18ea636
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/ChangelogState.java
@@ -0,0 +1,104 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the changelog state stored in the changelogStateDB. For each
+ * replication domain, it contains:
+ * <ul>
+ * <li>its generationId</li>
+ * <li>the list of serverIds composing it</li>
+ * </ul>
+ * <p>
+ * This class is used during replication initialization to decouple the code
+ * that reads the changelogStateDB from the code that makes use of its data.
+ */
+public class ChangelogState
+{
+
+ private final Map<String, Long> domainToGenerationId =
+ new HashMap<String, Long>();
+ private final Map<String, List<Integer>> domainToServerIds =
+ new HashMap<String, List<Integer>>();
+
+ /**
+ * Sets the generationId for the supplied replication domain.
+ *
+ * @param baseDn
+ * the targeted replication domain baseDN
+ * @param generationId
+ * the generation Id to set
+ */
+ public void setDomainGenerationId(String baseDn, long generationId)
+ {
+ domainToGenerationId.put(baseDn, generationId);
+ }
+
+ /**
+ * Adds the serverId to the serverIds list of the supplied replication domain.
+ *
+ * @param serverId
+ * the serverId to add
+ * @param baseDn
+ * the targeted replication domain baseDN
+ */
+ public void addServerIdToDomain(int serverId, String baseDn)
+ {
+ List<Integer> serverIds = domainToServerIds.get(baseDn);
+ if (serverIds == null)
+ {
+ serverIds = new LinkedList<Integer>();
+ domainToServerIds.put(baseDn, serverIds);
+ }
+ serverIds.add(serverId);
+ }
+
+ /**
+ * Returns the Map of domainBaseDN => generationId.
+ *
+ * @return a Map of domainBaseDN => generationId
+ */
+ public Map<String, Long> getDomainToGenerationId()
+ {
+ return domainToGenerationId;
+ }
+
+ /**
+ * Returns the Map of domainBaseDN => List<serverId>.
+ *
+ * @return a Map of domainBaseDN => List<serverId>.
+ */
+ public Map<String, List<Integer>> getDomainToServerIds()
+ {
+ return domainToServerIds;
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 62fee3a..f351728 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -27,7 +27,6 @@
*/
package org.opends.server.replication.server;
-import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.net.*;
@@ -54,10 +53,9 @@
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.CNIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.je.DbHandler;
-import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
-import org.opends.server.replication.server.changelog.je.ReplicationDbEnv;
+import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.types.*;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.ServerConstants;
@@ -101,10 +99,9 @@
new HashMap<String, ReplicationServerDomain>();
private volatile boolean shutdown = false;
- private ReplicationDbEnv dbEnv;
private int rcvWindow;
private int queueSize;
- private String dbDirname = null;
+ private final ChangelogDB changelogDB = new JEChangelogDB(this);
/**
* The delay (in sec) after which the changes must be deleted from the
@@ -225,30 +222,11 @@
replicationServerUrls = new ArrayList<String>();
queueSize = configuration.getQueueSize();
purgeDelay = configuration.getReplicationPurgeDelay();
- dbDirname = configuration.getReplicationDBDirectory();
rcvWindow = configuration.getWindowSize();
- if (dbDirname == null)
- {
- dbDirname = "changelogDb";
- }
- // Check that this path exists or create it.
- File f = getFileForPath(dbDirname);
- try
- {
- if (!f.exists())
- {
- f.mkdir();
- }
- }
- catch (Exception e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(e.getLocalizedMessage());
- mb.append(" ");
- mb.append(String.valueOf(getFileForPath(dbDirname)));
- Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
- throw new ConfigException(msg, e);
- }
+
+ this.changelogDB.setReplicationDBDirectory(configuration
+ .getReplicationDBDirectory());
+
groupId = (byte)configuration.getGroupId();
weight = configuration.getWeight();
assuredTimeout = configuration.getAssuredTimeout();
@@ -504,10 +482,7 @@
try
{
- // Initialize the replicationServer database.
- dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
- this);
- dbEnv.initializeFromChangelogStateDB();
+ this.changelogDB.initializeDB();
setServerURL();
listenSocket = new ServerSocket();
@@ -539,16 +514,9 @@
if (debugEnabled())
TRACER.debugInfo("RS " +getMonitorInstanceName()+
" successfully initialized");
- } catch (ChangelogException e)
- {
- Message message = ERR_COULD_NOT_READ_DB.get(
- getFileForPath(dbDirname).getAbsolutePath(),
- e.getLocalizedMessage());
- logError(message);
} catch (UnknownHostException e)
{
- Message message = ERR_UNKNOWN_HOSTNAME.get();
- logError(message);
+ logError(ERR_UNKNOWN_HOSTNAME.get());
} catch (IOException e)
{
Message message =
@@ -827,37 +795,12 @@
shutdownECL();
- if (dbEnv != null)
- {
- dbEnv.shutdown();
- }
+ this.changelogDB.shutdownDB();
// Remove this instance from the global instance list
allInstances.remove(this);
}
-
- /**
- * Creates a new DB handler for this ReplicationServer and the serverId and DN
- * given in parameter.
- *
- * @param serverId
- * The serverId for which the dbHandler must be created.
- * @param baseDn
- * The DN for which the dbHandler must be created.
- * @return The new DB handler for this ReplicationServer and the serverId and
- * DN given in parameter.
- * @throws ChangelogException
- * in case of underlying database problem.
- */
- public DbHandler newDbHandler(int serverId, String baseDn)
- throws ChangelogException
- {
- return new DbHandler(serverId, baseDn, this, dbEnv, queueSize);
- }
-
-
-
/**
* Clears the generationId for the replicationServerDomain related to the
* provided baseDn.
@@ -867,18 +810,6 @@
*/
public void clearGenerationId(String baseDn)
{
- try
- {
- dbEnv.clearGenerationId(baseDn);
- }
- catch (Exception ignored)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
- }
- }
-
synchronized (cnIndexDBLock)
{
if (cnIndexDB != null)
@@ -962,11 +893,7 @@
if (newPurgeDelay != purgeDelay)
{
purgeDelay = newPurgeDelay;
- // propagate
- for (ReplicationServerDomain domain : getReplicationServerDomains())
- {
- domain.setPurgeDelay(purgeDelay*1000);
- }
+ this.changelogDB.setPurgeDelay(purgeDelay * 1000);
}
rcvWindow = configuration.getWindowSize();
@@ -1047,7 +974,7 @@
}
final String newDir = configuration.getReplicationDBDirectory();
- if (newDir != null && !dbDirname.equals(newDir))
+ if (newDir != null && !this.changelogDB.getDBDirName().equals(newDir))
{
return new ConfigChangeResult(ResultCode.SUCCESS, true);
}
@@ -1597,7 +1524,7 @@
* @throws DirectoryException
* when needed.
*/
- public ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException
+ ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException
{
synchronized (cnIndexDBLock)
{
@@ -1605,9 +1532,9 @@
{
if (cnIndexDB == null)
{
- cnIndexDB = new DraftCNDbHandler(this, this.dbEnv);
+ cnIndexDB = this.changelogDB.newChangeNumberIndexDB();
final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord();
- // initialization of the lastGeneratedChangeNumebr from the DB content
+ // initialization of the lastGeneratedChangeNumber from the DB content
// if DB is empty => last record does not exist => default to 0
lastGeneratedChangeNumber =
(lastCNRecord != null) ? lastCNRecord.getChangeNumber() : 0;
@@ -1617,7 +1544,8 @@
catch (Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- Message message = ERR_CHANGENUMBER_DATABASE.get(e.getMessage());
+ Message message =
+ ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage());
throw new DirectoryException(OPERATIONS_ERROR, message, e);
}
}
@@ -1816,6 +1744,16 @@
}
/**
+ * Returns the changelogDB.
+ *
+ * @return the changelogDB.
+ */
+ ChangelogDB getChangelogDB()
+ {
+ return this.changelogDB;
+ }
+
+ /**
* Get the replication server DB directory.
* This is useful for tests to be able to do some cleanup. Might even be
* useful for the server some day.
@@ -1824,7 +1762,7 @@
*/
public String getDbDirName()
{
- return dbDirname;
+ return this.changelogDB.getDBDirName();
}
/*
@@ -1896,33 +1834,4 @@
+ baseDNs.keySet();
}
- /**
- * Initializes the generationId for the specified replication domain.
- *
- * @param baseDn
- * the replication domain
- * @param generationId
- * the the generationId value for initialization
- */
- public void initDomainGenerationID(String baseDn, long generationId)
- {
- getReplicationServerDomain(baseDn, true).initGenerationID(generationId);
- }
-
- /**
- * Adds the specified serverId to the specified replication domain.
- *
- * @param serverId
- * the server Id to add to the replication domain
- * @param baseDn
- * the replication domain where to add the serverId
- * @throws ChangelogException
- * If a database error happened.
- */
- public void addServerIdToDomain(int serverId, String baseDn)
- throws ChangelogException
- {
- DbHandler dbHandler = newDbHandler(serverId, baseDn);
- getReplicationServerDomain(baseDn, true).setDbHandler(serverId, dbHandler);
- }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index c4e4eee..eef08e4 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -46,9 +47,9 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
-import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -117,17 +118,26 @@
private final Queue<MessageHandler> otherHandlers =
new ConcurrentLinkedQueue<MessageHandler>();
- /**
- * This map contains the List of updates received from each LDAP server.
- */
- private final Map<Integer, DbHandler> sourceDbHandlers =
- new ConcurrentHashMap<Integer, DbHandler>();
+ private final ChangelogDB changelogDB;
/** The ReplicationServer that created the current instance. */
private ReplicationServer localReplicationServer;
- /** GenerationId management. */
+ /**
+ * The generationId of the current replication domain. The generationId is
+ * computed by hashing the first 1000 entries in the DB.
+ */
private volatile long generationId = -1;
- private boolean generationIdSavedStatus = false;
+ /**
+ * JNR, this is legacy code, hard to follow logic. I think what this field
+ * tries to say is: "is the generationId in use anywhere?", i.e. is there a
+ * replication topology in place? As soon as an answer to any of these
+ * question comes true, then it is set to true.
+ * <p>
+ * It looks like the only use of this field is to prevent the
+ * {@link #generationId} from being reset by
+ * {@link #resetGenerationIdIfPossible()}.
+ */
+ private volatile boolean generationIdSavedStatus = false;
/** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
@@ -177,6 +187,7 @@
this.assuredTimeoutTimer = new Timer("Replication server RS("
+ localReplicationServer.getServerId()
+ ") assured timer for domain \"" + baseDn + "\"", true);
+ this.changelogDB = localReplicationServer.getChangelogDB();
DirectoryServer.registerMonitorProvider(this);
}
@@ -252,7 +263,7 @@
}
}
- if (!publishMessage(update, serverId))
+ if (!publishUpdateMsg(update, serverId))
{
return;
}
@@ -390,43 +401,46 @@
}
}
- private boolean publishMessage(UpdateMsg update, int serverId)
+ private boolean publishUpdateMsg(UpdateMsg updateMsg, int serverId)
{
- // look for the dbHandler that is responsible for the LDAP server which
- // generated the change.
- DbHandler dbHandler;
- synchronized (sourceDbHandlers)
+ try
{
- dbHandler = sourceDbHandlers.get(serverId);
- if (dbHandler == null)
+ if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg))
{
- try
- {
- dbHandler = localReplicationServer.newDbHandler(serverId, baseDn);
- generationIdSavedStatus = true;
- } catch (ChangelogException e)
+ /*
+ * JNR: Matt and I had a hard time figuring out where to put this
+ * synchronized block. We elected to put it here, but without a strong
+ * conviction.
+ */
+ synchronized (generationIDLock)
{
/*
- * Because of database problem we can't save any more changes
- * from at least one LDAP server.
- * This replicationServer therefore can't do it's job properly anymore
- * and needs to close all its connections and shutdown itself.
+ * JNR: I think the generationIdSavedStatus is set to true because
+ * method above created a ReplicaDB which assumes the generationId was
+ * communicated to another server. Hence setting true on this field
+ * prevent the generationId from being reset.
*/
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- localReplicationServer.shutdown();
- return false;
+ generationIdSavedStatus = true;
}
- sourceDbHandlers.put(serverId, dbHandler);
}
+ return true;
}
-
- // Publish the messages to the source handler
- dbHandler.add(update);
- return true;
+ catch (ChangelogException e)
+ {
+ /*
+ * Because of database problem we can't save any more changes from at
+ * least one LDAP server. This replicationServer therefore can't do it's
+ * job properly anymore and needs to close all its connections and
+ * shutdown itself.
+ */
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ mb.append(" ");
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ localReplicationServer.shutdown();
+ return false;
+ }
}
private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
@@ -1261,7 +1275,7 @@
*/
public Set<Integer> getServerIds()
{
- return sourceDbHandlers.keySet();
+ return changelogDB.getDomainServerIds(baseDn);
}
/**
@@ -1278,29 +1292,7 @@
*/
public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
{
- DbHandler dbHandler = sourceDbHandlers.get(serverId);
- if (dbHandler == null)
- {
- return null;
- }
-
- ReplicaDBCursor cursor;
- try
- {
- cursor = dbHandler.generateCursorFrom(startAfterCSN);
- }
- catch (Exception e)
- {
- return null;
- }
-
- if (!cursor.next())
- {
- close(cursor);
- return null;
- }
-
- return cursor;
+ return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN);
}
/**
@@ -1313,12 +1305,7 @@
*/
public long getCount(int serverId, CSN from, CSN to)
{
- DbHandler dbHandler = sourceDbHandlers.get(serverId);
- if (dbHandler != null)
- {
- return dbHandler.getCount(from, to);
- }
- return 0;
+ return changelogDB.getCount(baseDn, serverId, from, to);
}
/**
@@ -1328,12 +1315,7 @@
*/
public long getChangesCount()
{
- long entryCount = 0;
- for (DbHandler dbHandler : sourceDbHandlers.values())
- {
- entryCount += dbHandler.getChangesCount();
- }
- return entryCount;
+ return changelogDB.getDomainChangesCount(baseDn);
}
/**
@@ -1346,24 +1328,6 @@
}
/**
- * Sets the provided DbHandler associated to the provided serverId.
- *
- * @param serverId the serverId for the server to which is
- * associated the DbHandler.
- * @param dbHandler the dbHandler associated to the serverId.
- *
- * @throws ChangelogException If a database error happened.
- */
- public void setDbHandler(int serverId, DbHandler dbHandler)
- throws ChangelogException
- {
- synchronized (sourceDbHandlers)
- {
- sourceDbHandlers.put(serverId, dbHandler);
- }
- }
-
- /**
* Retrieves the destination handlers for a routable message.
*
* @param msg The message to route.
@@ -1734,20 +1698,7 @@
stopAllServers(true);
- shutdownDbHandlers();
- }
-
- /** Shutdown all the dbHandlers. */
- private void shutdownDbHandlers()
- {
- synchronized (sourceDbHandlers)
- {
- for (DbHandler dbHandler : sourceDbHandlers.values())
- {
- dbHandler.shutdown();
- }
- sourceDbHandlers.clear();
- }
+ changelogDB.shutdownDomain(baseDn);
}
/**
@@ -1758,9 +1709,9 @@
public ServerState getDbServerState()
{
ServerState serverState = new ServerState();
- for (DbHandler db : sourceDbHandlers.values())
+ for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values())
{
- serverState.update(db.getLastChange());
+ serverState.update(lastCSN);
}
return serverState;
}
@@ -2235,24 +2186,7 @@
public void clearDbs()
{
// Reset the localchange and state db for the current domain
- synchronized (sourceDbHandlers)
- {
- for (DbHandler dbHandler : sourceDbHandlers.values())
- {
- try
- {
- dbHandler.clear();
- } catch (Exception e)
- {
- // TODO: i18n
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
- e.getMessage() + " " + stackTraceToSingleLineString(e)));
- logError(mb.toMessage());
- }
- }
- shutdownDbHandlers();
- }
+ changelogDB.clearDomain(baseDn);
try
{
localReplicationServer.clearGenerationId(baseDn);
@@ -2397,20 +2331,6 @@
}
/**
- * Set the purge delay on all the db Handlers for this Domain
- * of Replication.
- *
- * @param delay The new purge delay to use.
- */
- public void setPurgeDelay(long delay)
- {
- for (DbHandler dbHandler : sourceDbHandlers.values())
- {
- dbHandler.setPurgeDelay(delay);
- }
- }
-
- /**
* Get the map of connected DSs.
* @return The map of connected DSs
*/
@@ -2667,7 +2587,6 @@
{
for (int serverId : dbState)
{
- DbHandler h = sourceDbHandlers.get(serverId);
CSN mostRecentDbCSN = dbState.getCSN(serverId);
try {
// Is the most recent change in the Db newer than eligible CSN ?
@@ -2676,19 +2595,8 @@
if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
{
// let's try to seek the first change <= eligibleCSN
- ReplicaDBCursor cursor = null;
- try {
- cursor = h.generateCursorFrom(eligibleCSN);
- if (cursor != null && cursor.getChange() != null) {
- CSN newCSN = cursor.getChange().getCSN();
- result.update(newCSN);
- }
- } catch (ChangelogException e) {
- // there's no change older than eligibleCSN (case of s3/csn31)
- result.update(new CSN(0, 0, serverId));
- } finally {
- close(cursor);
- }
+ CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN);
+ result.update(newCSN);
} else {
// for this serverId, all changes in the ChangelogDb are holder
// than eligibleCSN, the most recent in the db is our guy.
@@ -2721,9 +2629,9 @@
public ServerState getStartState()
{
ServerState domainStartState = new ServerState();
- for (DbHandler dbHandler : sourceDbHandlers.values())
+ for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values())
{
- domainStartState.update(dbHandler.getFirstChange());
+ domainStartState.update(firstCSN);
}
return domainStartState;
}
@@ -2741,10 +2649,12 @@
{
CSN eligibleCSN = null;
- for (DbHandler db : sourceDbHandlers.values())
+ for (Entry<Integer, CSN> entry :
+ changelogDB.getDomainLastCSNs(baseDn).entrySet())
{
// Consider this producer (DS/db).
- int serverId = db.getServerId();
+ final int serverId = entry.getKey();
+ final CSN changelogLastCSN = entry.getValue();
// Should it be considered for eligibility ?
CSN heartbeatLastCSN =
@@ -2774,7 +2684,6 @@
continue;
}
- CSN changelogLastCSN = db.getLastChange();
if (changelogLastCSN != null
&& (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN)))
{
@@ -2935,15 +2844,7 @@
*/
public long getLatestDomainTrimDate()
{
- long latest = 0;
- for (DbHandler db : sourceDbHandlers.values())
- {
- if (latest == 0 || latest < db.getLatestTrimDate())
- {
- latest = db.getLatestTrimDate();
- }
- }
- return latest;
+ return changelogDB.getDomainLatestTrimDate(baseDn);
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
new file mode 100644
index 0000000..9686ee0
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -0,0 +1,229 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.api;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.opends.server.config.ConfigException;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+
+/**
+ * The changelogDB stores the replication data on persistent storage.
+ * <p>
+ * This interface allows to:
+ * <ul>
+ * <li>set the storage directory and the purge interval</li>
+ * <li>get access to the {@link ChangeNumberIndexDB}</li>
+ * <li>query or control the replication domain database(s) (composed of one or
+ * more ReplicaDBs)</li>
+ * <li>query/update each ReplicaDB</li>
+ * </ul>
+ */
+public interface ChangelogDB
+{
+
+ // DB control methods
+
+ /**
+ * Set the directory to be used by the replication database.
+ *
+ * @param dbDirName
+ * the directory for use by the replication database
+ * @throws ConfigException
+ * if a problem occurs opening the directory
+ */
+ void setReplicationDBDirectory(String dbDirName) throws ConfigException;
+
+ /**
+ * Get the replication server database directory. This is used by tests to do
+ * some cleanup.
+ *
+ * @return the database directory name
+ */
+ String getDBDirName();
+
+ /**
+ * Initializes the replication database.
+ */
+ void initializeDB();
+
+ /**
+ * Sets the purge delay for the replication database. This purge delay is a
+ * best effort.
+ *
+ * @param delayInMillis
+ * the purge delay in milliseconds
+ */
+ void setPurgeDelay(long delayInMillis);
+
+ /**
+ * Shutdown the replication database.
+ */
+ void shutdownDB();
+
+ /**
+ * Returns a new {@link ChangeNumberIndexDB} object.
+ *
+ * @return a new {@link ChangeNumberIndexDB} object
+ * @throws ChangelogException
+ * If a database problem happened
+ */
+ ChangeNumberIndexDB newChangeNumberIndexDB() throws ChangelogException;
+
+ // Domain methods
+
+ /**
+ * Returns the serverIds for the servers that are or have been part of the
+ * provided replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ * @return a set of integers holding the serverIds
+ */
+ Set<Integer> getDomainServerIds(String baseDn);
+
+ /**
+ * Get the number of changes for the specified replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ * @return the number of changes.
+ */
+ long getDomainChangesCount(String baseDn);
+
+ /**
+ * Returns the FIRST {@link CSN}s of each serverId for the specified
+ * replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ * @return a {serverId => FIRST CSN} Map
+ */
+ Map<Integer, CSN> getDomainFirstCSNs(String baseDn);
+
+ /**
+ * Returns the LAST {@link CSN}s of each serverId for the specified
+ * replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ * @return a {serverId => LAST CSN} Map
+ */
+ Map<Integer, CSN> getDomainLastCSNs(String baseDn);
+
+ /**
+ * Retrieves the latest trim date for the specified replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ * @return the domain latest trim date
+ */
+ long getDomainLatestTrimDate(String baseDn);
+
+ /**
+ * Shutdown the specified replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ */
+ void shutdownDomain(String baseDn);
+
+ /**
+ * Clear DB and shutdown for the specified replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ */
+ void clearDomain(String baseDn);
+
+ // serverId methods
+
+ /**
+ * Return the number of changes between 2 provided {@link CSN}s for the
+ * specified serverId and replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ * @param serverId
+ * the serverId on which to act
+ * @param from
+ * The lower (older) CSN
+ * @param to
+ * The upper (newer) CSN
+ * @return The computed number of changes
+ */
+ long getCount(String baseDn, int serverId, CSN from, CSN to);
+
+ /**
+ * Returns the {@link CSN} situated immediately after the specified
+ * {@link CSN} for the specified serverId and replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ * @param serverId
+ * the serverId for which we want the information
+ * @param startAfterCSN
+ * The position where the iterator must start
+ * @return a new ReplicationIterator that allows to browse the db managed by
+ * this dbHandler and starting at the position defined by a given CSN.
+ */
+ CSN getCSNAfter(String baseDn, int serverId, CSN startAfterCSN);
+
+ /**
+ * Generates a non empty {@link ReplicaDBCursor} for the specified serverId
+ * and replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ * @param serverId
+ * the serverId on which to act
+ * @param startAfterCSN
+ * The position where the iterator must start
+ * @return a {@link ReplicaDBCursor} if the ReplicaDB is not empty, null
+ * otherwise
+ */
+ ReplicaDBCursor getCursorFrom(String baseDn, int serverId, CSN startAfterCSN);
+
+ /**
+ * for the specified serverId and replication domain.
+ *
+ * @param baseDn
+ * the replication domain baseDn
+ * @param serverId
+ * the serverId on which to act
+ * @param updateMsg
+ * the update message to publish to the replicaDB
+ * @return true if a db had to be created to publish this message
+ * @throws ChangelogException
+ * If a database problem happened
+ */
+ boolean publishUpdateMsg(String baseDn, int serverId, UpdateMsg updateMsg)
+ throws ChangelogException;
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java
index cc6f328..aec222a 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java
@@ -70,7 +70,7 @@
* @param cause
* The underlying cause that triggered this exception.
*/
- protected ChangelogException(Message message, Throwable cause)
+ public ChangelogException(Message message, Throwable cause)
{
super(message, cause);
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
new file mode 100644
index 0000000..bea6ac5
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -0,0 +1,452 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.io.File;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.server.config.ConfigException;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.util.Pair;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+
+/**
+ * JE implementation of the ChangelogDB.
+ */
+public class JEChangelogDB implements ChangelogDB
+{
+
+ /** The tracer object for the debug logger. */
+ private static final DebugTracer TRACER = getTracer();
+
+ /**
+ * This map contains the List of updates received from each LDAP server.
+ */
+ private final Map<String, Map<Integer, DbHandler>> sourceDbHandlers =
+ new ConcurrentHashMap<String, Map<Integer, DbHandler>>();
+ private ReplicationDbEnv dbEnv;
+ private String dbDirName = null;
+ private File dbDirectory;
+
+ /** The local replication server. */
+ private final ReplicationServer replicationServer;
+
+ /**
+ * Builds an instance of this class.
+ *
+ * @param replicationServer
+ * the local replication server.
+ */
+ public JEChangelogDB(ReplicationServer replicationServer)
+ {
+ this.replicationServer = replicationServer;
+ }
+
+ private Map<Integer, DbHandler> getDomainMap(String baseDn)
+ {
+ final Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDn);
+ if (domainMap != null)
+ {
+ return domainMap;
+ }
+ return Collections.emptyMap();
+ }
+
+ private DbHandler getDbHandler(String baseDn, int serverId)
+ {
+ return getDomainMap(baseDn).get(serverId);
+ }
+
+ /**
+ * Provision resources for the specified serverId in the specified replication
+ * domain.
+ *
+ * @param baseDn
+ * the replication domain where to add the serverId
+ * @param serverId
+ * the server Id to add to the replication domain
+ * @throws ChangelogException
+ * If a database error happened.
+ */
+ private void commission(String baseDn, int serverId, ReplicationServer rs)
+ throws ChangelogException
+ {
+ getOrCreateDbHandler(baseDn, serverId, rs);
+ }
+
+ private Pair<DbHandler, Boolean> getOrCreateDbHandler(String baseDn,
+ int serverId, ReplicationServer rs) throws ChangelogException
+ {
+ synchronized (sourceDbHandlers)
+ {
+ Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDn);
+ if (domainMap == null)
+ {
+ domainMap = new ConcurrentHashMap<Integer, DbHandler>();
+ sourceDbHandlers.put(baseDn, domainMap);
+ }
+
+ DbHandler dbHandler = domainMap.get(serverId);
+ if (dbHandler == null)
+ {
+ dbHandler =
+ new DbHandler(serverId, baseDn, rs, dbEnv, rs.getQueueSize());
+ domainMap.put(serverId, dbHandler);
+ return Pair.of(dbHandler, true);
+ }
+ return Pair.of(dbHandler, false);
+ }
+ }
+
+
+ /** {@inheritDoc} */
+ @Override
+ public void initializeDB()
+ {
+ try
+ {
+ dbEnv = new ReplicationDbEnv(getFileForPath(dbDirName).getAbsolutePath(),
+ replicationServer);
+ initializeChangelogState(dbEnv.readChangelogState());
+ }
+ catch (ChangelogException e)
+ {
+ Message message =
+ ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e
+ .getLocalizedMessage());
+ logError(message);
+ }
+ }
+
+ private void initializeChangelogState(final ChangelogState changelogState)
+ throws ChangelogException
+ {
+ for (Map.Entry<String, Long> entry :
+ changelogState.getDomainToGenerationId().entrySet())
+ {
+ replicationServer.getReplicationServerDomain(entry.getKey(), true)
+ .initGenerationID(entry.getValue());
+ }
+ for (Map.Entry<String, List<Integer>> entry : changelogState
+ .getDomainToServerIds().entrySet())
+ {
+ final String baseDn = entry.getKey();
+ for (int serverId : entry.getValue())
+ {
+ commission(baseDn, serverId, replicationServer);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void shutdownDB()
+ {
+ if (dbEnv != null)
+ {
+ dbEnv.shutdown();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Set<Integer> getDomainServerIds(String baseDn)
+ {
+ return getDomainMap(baseDn).keySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getCount(String baseDn, int serverId, CSN from, CSN to)
+ {
+ DbHandler dbHandler = getDbHandler(baseDn, serverId);
+ if (dbHandler != null)
+ {
+ return dbHandler.getCount(from, to);
+ }
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getDomainChangesCount(String baseDn)
+ {
+ long entryCount = 0;
+ for (DbHandler dbHandler : getDomainMap(baseDn).values())
+ {
+ entryCount += dbHandler.getChangesCount();
+ }
+ return entryCount;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void shutdownDomain(String baseDn)
+ {
+ shutdownDbHandlers(getDomainMap(baseDn));
+ }
+
+ private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap)
+ {
+ synchronized (domainMap)
+ {
+ for (DbHandler dbHandler : domainMap.values())
+ {
+ dbHandler.shutdown();
+ }
+ domainMap.clear();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Map<Integer, CSN> getDomainFirstCSNs(String baseDn)
+ {
+ final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn);
+ final Map<Integer, CSN> results =
+ new HashMap<Integer, CSN>(domainMap.size());
+ for (DbHandler dbHandler : domainMap.values())
+ {
+ results.put(dbHandler.getServerId(), dbHandler.getFirstChange());
+ }
+ return results;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Map<Integer, CSN> getDomainLastCSNs(String baseDn)
+ {
+ final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn);
+ final Map<Integer, CSN> results =
+ new HashMap<Integer, CSN>(domainMap.size());
+ for (DbHandler dbHandler : domainMap.values())
+ {
+ results.put(dbHandler.getServerId(), dbHandler.getLastChange());
+ }
+ return results;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void clearDomain(String baseDn)
+ {
+ final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn);
+ synchronized (domainMap)
+ {
+ for (DbHandler dbHandler : domainMap.values())
+ {
+ try
+ {
+ dbHandler.clear();
+ }
+ catch (Exception e)
+ {
+ // TODO: i18n
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e
+ .getMessage()
+ + " " + stackTraceToSingleLineString(e)));
+ logError(mb.toMessage());
+ }
+ }
+ shutdownDbHandlers(domainMap);
+ }
+
+ try
+ {
+ dbEnv.clearGenerationId(baseDn);
+ }
+ catch (Exception ignored)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setPurgeDelay(long delay)
+ {
+ for (Map<Integer, DbHandler> domainMap : sourceDbHandlers.values())
+ {
+ for (DbHandler dbHandler : domainMap.values())
+ {
+ dbHandler.setPurgeDelay(delay);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getDomainLatestTrimDate(String baseDn)
+ {
+ long latest = 0;
+ for (DbHandler dbHandler : getDomainMap(baseDn).values())
+ {
+ if (latest == 0 || latest < dbHandler.getLatestTrimDate())
+ {
+ latest = dbHandler.getLatestTrimDate();
+ }
+ }
+ return latest;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CSN getCSNAfter(String baseDn, int serverId, CSN startAfterCSN)
+ {
+ final DbHandler dbHandler = getDbHandler(baseDn, serverId);
+
+ ReplicaDBCursor cursor = null;
+ try
+ {
+ cursor = dbHandler.generateCursorFrom(startAfterCSN);
+ if (cursor != null && cursor.getChange() != null)
+ {
+ return cursor.getChange().getCSN();
+ }
+ return null;
+ }
+ catch (ChangelogException e)
+ {
+ // there's no change older than startAfterCSN
+ return new CSN(0, 0, serverId);
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ChangeNumberIndexDB newChangeNumberIndexDB() throws ChangelogException
+ {
+ return new DraftCNDbHandler(replicationServer, this.dbEnv);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setReplicationDBDirectory(String dbDirName)
+ throws ConfigException
+ {
+ if (dbDirName == null)
+ {
+ dbDirName = "changelogDb";
+ }
+ this.dbDirName = dbDirName;
+
+ // Check that this path exists or create it.
+ dbDirectory = getFileForPath(this.dbDirName);
+ try
+ {
+ if (!dbDirectory.exists())
+ {
+ dbDirectory.mkdir();
+ }
+ }
+ catch (Exception e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(e.getLocalizedMessage());
+ mb.append(" ");
+ mb.append(String.valueOf(dbDirectory));
+ Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
+ throw new ConfigException(msg, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getDBDirName()
+ {
+ return this.dbDirName;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReplicaDBCursor getCursorFrom(String baseDn, int serverId,
+ CSN startAfterCSN)
+ {
+ DbHandler dbHandler = getDbHandler(baseDn, serverId);
+ if (dbHandler == null)
+ {
+ return null;
+ }
+
+ ReplicaDBCursor it;
+ try
+ {
+ it = dbHandler.generateCursorFrom(startAfterCSN);
+ }
+ catch (Exception e)
+ {
+ return null;
+ }
+
+ if (!it.next())
+ {
+ close(it);
+ return null;
+ }
+
+ return it;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean publishUpdateMsg(String baseDn, int serverId,
+ UpdateMsg updateMsg) throws ChangelogException
+ {
+ final Pair<DbHandler, Boolean> pair =
+ getOrCreateDbHandler(baseDn, serverId, replicationServer);
+ final DbHandler dbHandler = pair.getFirst();
+ final boolean wasCreated = pair.getSecond();
+
+ dbHandler.add(updateMsg);
+ return wasCreated;
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index d4965c6..4f1f166 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -34,6 +34,7 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -166,12 +167,14 @@
}
/**
- * Read the list of known servers from the database and start dbHandler
- * for each of them.
+ * Read the list of known servers from the database and start dbHandler for
+ * each of them.
*
- * @throws ChangelogException in case of underlying Exception
+ * @return the {@link ChangelogState} read from the changelogState DB
+ * @throws ChangelogException
+ * if a database problem occurs
*/
- public void initializeFromChangelogStateDB() throws ChangelogException
+ public ChangelogState readChangelogState() throws ChangelogException
{
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
@@ -179,6 +182,8 @@
try
{
+ final ChangelogState result = new ChangelogState();
+
OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
while (status == OperationStatus.SUCCESS)
{
@@ -197,7 +202,7 @@
if (debugEnabled())
debug("has read baseDn=" + baseDn + " generationId=" +generationId);
- replicationServer.initDomainGenerationID(baseDn, generationId);
+ result.setDomainGenerationId(baseDn, generationId);
}
else
{
@@ -207,11 +212,13 @@
if (debugEnabled())
debug("has read: baseDn=" + baseDn + " serverId=" + serverId);
- replicationServer.addServerIdToDomain(serverId, baseDn);
+ result.addServerIdToDomain(serverId, baseDn);
}
status = cursor.getNext(key, data, LockMode.DEFAULT);
}
+
+ return result;
}
catch (RuntimeException e)
{
diff --git a/opends/src/server/org/opends/server/util/Pair.java b/opends/src/server/org/opends/server/util/Pair.java
new file mode 100644
index 0000000..188b6f4
--- /dev/null
+++ b/opends/src/server/org/opends/server/util/Pair.java
@@ -0,0 +1,162 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.util;
+
+
+/**
+ * Ordered pair of various objects.
+ *
+ * @param <F>
+ * type of the first pair element
+ * @param <S>
+ * type of the second pair element
+ */
+public class Pair<F, S>
+{
+
+ /** An empty Pair. */
+ public static final Pair<?, ?> EMPTY = Pair.of(null, null);
+
+ /** The first pair element. */
+ private final F first;
+ /** The second pair element. */
+ private final S second;
+
+ /**
+ * Default ctor.
+ *
+ * @param first
+ * the first element of the constructed pair
+ * @param second
+ * the second element of the constructed pair
+ */
+ private Pair(F first, S second)
+ {
+ this.first = first;
+ this.second = second;
+ }
+
+ /**
+ * Factory method to build a new Pair.
+ *
+ * @param first
+ * the first element of the constructed pair
+ * @param second
+ * the second element of the constructed pair
+ * @param <F>
+ * type of the first pair element
+ * @param <S>
+ * type of the second pair element
+ * @return A new Pair built with the provided elements
+ */
+ public static <F, S> Pair<F, S> of(F first, S second)
+ {
+ return new Pair<F, S>(first, second);
+ }
+
+ /**
+ * Returns an empty Pair matching the required types.
+ *
+ * @param <F>
+ * type of the first pair element
+ * @param <S>
+ * type of the second pair element
+ * @return An empty Pair matching the required types
+ */
+ @SuppressWarnings("unchecked")
+ public static <F, S> Pair<F, S> empty()
+ {
+ return (Pair<F, S>) EMPTY;
+ }
+
+ /**
+ * Returns the first element of this pair.
+ *
+ * @return the first element of this pair
+ */
+ public F getFirst()
+ {
+ return first;
+ }
+
+ /**
+ * Returns the second element of this pair.
+ *
+ * @return the second element of this pair
+ */
+ public S getSecond()
+ {
+ return second;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((first == null) ? 0 : first.hashCode());
+ result = prime * result + ((second == null) ? 0 : second.hashCode());
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Pair<?, ?> other = (Pair<?, ?>) obj;
+ if (first == null)
+ {
+ if (other.first != null)
+ return false;
+ }
+ else if (!first.equals(other.first))
+ return false;
+ if (second == null)
+ {
+ if (other.second != null)
+ return false;
+ }
+ else if (!second.equals(other.second))
+ return false;
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "Pair [" + first + ", " + second + "]";
+ }
+
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
similarity index 98%
rename from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
rename to opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index 50ce40a..fe08024 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -25,7 +25,7 @@
* Copyright 2006-2010 Sun Microsystems, Inc.
* Portions copyright 2011-2013 ForgeRock AS
*/
-package org.opends.server.replication;
+package org.opends.server.replication.server;
import java.io.*;
import java.net.Socket;
@@ -44,6 +44,7 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.*;
+import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.common.MultiDomainServerState;
@@ -53,7 +54,6 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
@@ -828,14 +828,12 @@
publishDeleteMsgInOTest(s2test, csn9, tn, 9);
sleep(500);
- ReplicationServerDomain rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING);
- ServerState startState = rsd.getStartState();
+ ServerState startState = getReplicationDomainStartState(TEST_ROOT_DN_STRING);
assertEquals(startState.getCSN(s1test.getServerId()).getSeqnum(), 1);
assertTrue(startState.getCSN(s2test.getServerId()) != null);
assertEquals(startState.getCSN(s2test.getServerId()).getSeqnum(), 7);
- rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2);
- startState = rsd.getStartState();
+ startState = getReplicationDomainStartState(TEST_ROOT_DN_STRING2);
assertEquals(startState.getCSN(s2test2.getServerId()).getSeqnum(), 2);
assertEquals(startState.getCSN(s1test2.getServerId()).getSeqnum(), 6);
@@ -891,6 +889,11 @@
debugInfo(tn, "Ending test successfully");
}
+ private ServerState getReplicationDomainStartState(String baseDn)
+ {
+ return replicationServer.getReplicationServerDomain(baseDn).getStartState();
+ }
+
private String getCookie(List<SearchResultEntry> entries,
int expectedNbEntries, String tn, LDIFWriter ldifWriter, String cookie)
throws Exception
@@ -979,8 +982,6 @@
debugInfo(tn, "Starting test");
ReplicationBroker server01 = null;
- ReplicationServerDomain d1 = null;
- ReplicationServerDomain d2 = null;
try
{
@@ -1010,10 +1011,7 @@
// ---
// 2. Now set up a very short purge delay on the replication changelogs
// so that this test can play with a trimmed changelog.
- d1 = replicationServer.getReplicationServerDomain("o=test");
- d2 = replicationServer.getReplicationServerDomain("o=test2");
- d1.setPurgeDelay(1);
- d2.setPurgeDelay(1);
+ replicationServer.getChangelogDB().setPurgeDelay(1);
// Sleep longer than this delay - so that the changelog is trimmed
Thread.sleep(1000);
@@ -1047,8 +1045,8 @@
// returns the appropriate error.
publishDeleteMsgInOTest(server01, csns[3], tn, 1);
- debugInfo(tn, "d1 trimdate" + d1.getStartState());
- debugInfo(tn, "d2 trimdate" + d2.getStartState());
+ debugInfo(tn, "d1 trimdate" + getReplicationDomainStartState("o=test"));
+ debugInfo(tn, "d2 trimdate" + getReplicationDomainStartState("o=test2"));
searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, tn, UNWILLING_TO_PERFORM);
assertEquals(searchOp.getSearchEntries().size(), 0);
assertTrue(searchOp.getErrorMessage().toString().startsWith(
@@ -1059,15 +1057,7 @@
{
stop(server01);
// And reset changelog purge delay for the other tests.
- if (d1 != null)
- {
- d1.setPurgeDelay(15 * 1000);
- }
- if (d2 != null)
- {
- d2.setPurgeDelay(15 * 1000);
- }
-
+ replicationServer.getChangelogDB().setPurgeDelay(15 * 1000);
replicationServer.clearDb();
}
debugInfo(tn, "Ending test successfully");
--
Gitblit v1.10.0