From 50dca335649cdd5deae4ddbcd846b2214e4ef694 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 04 Oct 2013 12:34:29 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 14 +
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java | 206 -------------------------
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 33 ++--
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 227 ++++++++++++++++++++++++++++
4 files changed, 259 insertions(+), 221 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index ebe4ada..9c9e6c5 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -46,9 +46,9 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -117,7 +117,7 @@
private final Queue<MessageHandler> otherHandlers =
new ConcurrentLinkedQueue<MessageHandler>();
- private final ChangelogDB changelogDB;
+ private final ReplicationDomainDB domainDB;
/** The ReplicationServer that created the current instance. */
private ReplicationServer localReplicationServer;
@@ -186,7 +186,8 @@
this.assuredTimeoutTimer = new Timer("Replication server RS("
+ localReplicationServer.getServerId()
+ ") assured timer for domain \"" + baseDN + "\"", true);
- this.changelogDB = localReplicationServer.getChangelogDB();
+ this.domainDB =
+ localReplicationServer.getChangelogDB().getReplicationDomainDB();
DirectoryServer.registerMonitorProvider(this);
}
@@ -405,7 +406,7 @@
{
try
{
- if (this.changelogDB.publishUpdateMsg(baseDN, serverId, updateMsg))
+ if (this.domainDB.publishUpdateMsg(baseDN, serverId, updateMsg))
{
/*
* JNR: Matt and I had a hard time figuring out where to put this
@@ -1278,7 +1279,7 @@
*/
public Set<Integer> getServerIds()
{
- return changelogDB.getDomainServerIds(baseDN);
+ return domainDB.getDomainServerIds(baseDN);
}
/**
@@ -1296,11 +1297,11 @@
* @param startAfterCSN
* Starting point for the cursor. If null, start from the oldest CSN
* @return a non null {@link ReplicaDBCursor}
- * @see ChangelogDB#getCursorFrom(DN, int, CSN)
+ * @see ReplicationDomainDB#getCursorFrom(DN, int, CSN)
*/
public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
{
- return changelogDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+ return domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
}
/**
@@ -1313,7 +1314,7 @@
*/
public long getCount(int serverId, CSN from, CSN to)
{
- return changelogDB.getCount(baseDN, serverId, from, to);
+ return domainDB.getCount(baseDN, serverId, from, to);
}
/**
@@ -1323,7 +1324,7 @@
*/
public long getChangesCount()
{
- return changelogDB.getDomainChangesCount(baseDN);
+ return domainDB.getDomainChangesCount(baseDN);
}
/**
@@ -1708,7 +1709,7 @@
stopAllServers(true);
- changelogDB.shutdownDomain(baseDN);
+ domainDB.shutdownDomain(baseDN);
}
/**
@@ -1720,7 +1721,7 @@
*/
public ServerState getLatestServerState()
{
- return changelogDB.getDomainNewestCSNs(baseDN);
+ return domainDB.getDomainNewestCSNs(baseDN);
}
/**
@@ -2193,7 +2194,7 @@
{
try
{
- changelogDB.removeDomain(baseDN);
+ domainDB.removeDomain(baseDN);
}
catch (ChangelogException e)
{
@@ -2602,7 +2603,7 @@
if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
{
// let's try to seek the first change <= eligibleCSN
- CSN newCSN = changelogDB.getCSNAfter(baseDN, serverId, eligibleCSN);
+ CSN newCSN = domainDB.getCSNAfter(baseDN, serverId, eligibleCSN);
result.update(newCSN);
} else {
// for this serverId, all changes in the ChangelogDb are holder
@@ -2635,7 +2636,7 @@
*/
public ServerState getStartState()
{
- return changelogDB.getDomainOldestCSNs(baseDN);
+ return domainDB.getDomainOldestCSNs(baseDN);
}
/**
@@ -2651,7 +2652,7 @@
{
CSN eligibleCSN = null;
- final ServerState newestCSNs = changelogDB.getDomainNewestCSNs(baseDN);
+ final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN);
for (final int serverId : newestCSNs)
{
// Consider this producer (DS/db).
@@ -2845,7 +2846,7 @@
*/
public long getLatestDomainTrimDate()
{
- return changelogDB.getDomainLatestTrimDate(baseDN);
+ return domainDB.getDomainLatestTrimDate(baseDN);
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
index 34f8644..445a05c 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -26,30 +26,16 @@
*/
package org.opends.server.replication.server.changelog.api;
-import java.util.Set;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.types.DN;
/**
- * The changelogDB stores the replication data on persistent storage.
- * <p>
- * This interface allows to:
- * <ul>
- * <li>set the storage directory and the purge interval</li>
- * <li>get access to the {@link ChangeNumberIndexDB}</li>
- * <li>query or control the replication domain database(s) (composed of one or
- * more ReplicaDBs)</li>
- * <li>query/update each ReplicaDB</li>
- * </ul>
+ * This interface is the entry point for the changelog database which stores the
+ * replication data on persistent storage. It allows to control the overall
+ * database or access more specialized interfaces.
*/
public interface ChangelogDB
{
- // DB control methods
-
/**
* Initializes the replication database by reading its previous state and
* building the relevant ReplicaDBs according to the previous state. This
@@ -93,190 +79,10 @@
*/
ChangeNumberIndexDB getChangeNumberIndexDB();
- // Domain methods
-
/**
- * Returns the serverIds for the servers that are or have been part of the
- * provided replication domain.
+ * Returns the {@link ReplicationDomainDB} object.
*
- * @param baseDN
- * the replication domain baseDN
- * @return an unmodifiable set of integers holding the serverIds
+ * @return the {@link ReplicationDomainDB} object
*/
- Set<Integer> getDomainServerIds(DN baseDN);
-
- /**
- * Get the number of changes for the specified replication domain.
- *
- * @param baseDN
- * the replication domain baseDN
- * @return the number of changes.
- */
- long getDomainChangesCount(DN baseDN);
-
- /**
- * Returns the oldest {@link CSN}s of each serverId for the specified
- * replication domain.
- *
- * @param baseDN
- * the replication domain baseDN
- * @return a new ServerState object holding the {serverId => oldest CSN}
- * mapping. If a replica DB is empty or closed, the oldest CSN will be
- * null for that replica. The caller owns the generated ServerState.
- */
- ServerState getDomainOldestCSNs(DN baseDN);
-
- /**
- * Returns the newest {@link CSN}s of each serverId for the specified
- * replication domain.
- *
- * @param baseDN
- * the replication domain baseDN
- * @return a new ServerState object holding the {serverId => newest CSN} Map.
- * If a replica DB is empty or closed, the newest CSN will be null for
- * that replica. The caller owns the generated ServerState.
- */
- ServerState getDomainNewestCSNs(DN baseDN);
-
- /**
- * Retrieves the latest trim date for the specified replication domain.
- *
- * @param baseDN
- * the replication domain baseDN
- * @return the domain latest trim date
- */
- long getDomainLatestTrimDate(DN baseDN);
-
- /**
- * Shutdown all the replica databases for the specified replication domain.
- *
- * @param baseDN
- * the replication domain baseDN
- */
- void shutdownDomain(DN baseDN);
-
- /**
- * Removes all the data relating to the specified replication domain and
- * shutdown all its replica databases. In particular, it will:
- * <ol>
- * <li>remove all the changes from the replica databases</li>
- * <li>remove all knowledge of the serverIds in this domain</li>
- * <li>remove any knowledge of the current generationId for this domain</li>
- * </ol>
- *
- * @param baseDN
- * the replication domain baseDN
- * @throws ChangelogException
- * If a database problem happened
- */
- void removeDomain(DN baseDN) throws ChangelogException;
-
- // serverId methods
-
- /**
- * Return the number of changes inclusive between 2 provided {@link CSN}s for
- * the specified serverId and replication domain. i.e. the <code>from</code>
- * and <code>to</code> CSNs are included in the count.
- * <p>
- * Note that:
- * <ol>
- * <li>If <code>from</code> is null, the count starts at the oldest CSN in the
- * database.</li>
- * <li>If <code>to</code> is null, the count is 0.</li>
- * <li>If both from and to are present, then the count includes them both
- * <code>to</code> is null, the count ends at the newest CSN in the database.
- * </li>
- * <li>incidentally, if both <code>from</code> and <code>to</code> are null,
- * the total count of entries in the replica database is returned.</li>
- * </ol>
- * <h6>Example</h6>
- * <p>
- * Given the following replica database for baseDN "dc=example,dc=com" and
- * serverId 1:
- *
- * <pre>
- * CSN1 <= Oldest
- * CSN2
- * CSN3
- * CSN4
- * CSN5 <= Newest
- * </pre>
- *
- * Then:
- *
- * <pre>
- * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN1), 1);
- * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN2), 2);
- * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN5), 5);
- * assertEquals(getCount("dc=example,dc=com", 1, null, CSN5), 5);
- * assertEquals(getCount("dc=example,dc=com", 1, CSN1, null), 0);
- * assertEquals(getCount("dc=example,dc=com", 1, null, null), 5);
- * </pre>
- *
- * @param baseDN
- * the replication domain baseDN
- * @param serverId
- * the serverId on which to act
- * @param from
- * The older CSN where to start the count
- * @param to
- * The newer CSN where to end the count
- * @return The computed number of changes
- */
- long getCount(DN baseDN, int serverId, CSN from, CSN to);
-
- /**
- * Returns the {@link CSN} situated immediately after the specified
- * {@link CSN} for the specified serverId and replication domain according to
- * the order specified by {@link CSN#compareTo(CSN)}. If an Exception occurs
- * in this method, then it returns the oldest possible CSN for the provided
- * serverId.
- *
- * @param baseDN
- * the replication domain baseDN
- * @param serverId
- * the serverId for which we want the information
- * @param startAfterCSN
- * The position where the iterator must start
- * @return the CSN immediately after startAfterCSN, or null if no CSN exist
- * after startAfterCSN
- */
- CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN);
-
- /**
- * Generates a {@link ReplicaDBCursor} for the specified serverId and
- * replication domain starting after the provided CSN.
- * <p>
- * The cursor is already advanced to the record after startAfterCSN.
- * <p>
- * When the cursor is not used anymore, client code MUST call the
- * {@link ReplicaDBCursor#close()} method to free the resources and locks used
- * by the cursor.
- *
- * @param baseDN
- * the replication domain baseDN
- * @param serverId
- * Identifier of the server for which the cursor is created
- * @param startAfterCSN
- * Starting point for the cursor. If null, start from the oldest CSN
- * @return a non null {@link ReplicaDBCursor}
- */
- ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN);
-
- /**
- * for the specified serverId and replication domain.
- *
- * @param baseDN
- * the replication domain baseDN
- * @param serverId
- * the serverId on which to act
- * @param updateMsg
- * the update message to publish to the replicaDB
- * @return true if a db had to be created to publish this message
- * @throws ChangelogException
- * If a database problem happened
- */
- boolean publishUpdateMsg(DN baseDN, int serverId, UpdateMsg updateMsg)
- throws ChangelogException;
-
+ ReplicationDomainDB getReplicationDomainDB();
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
new file mode 100644
index 0000000..d64a8a5
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -0,0 +1,227 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.api;
+
+import java.util.Set;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.types.DN;
+
+/**
+ * This interface allows to query or control the replication domain database(s)
+ * (composed of one or more ReplicaDBs) and query/update each ReplicaDB.
+ */
+public interface ReplicationDomainDB
+{
+
+ /**
+ * Returns the serverIds for the servers that are or have been part of the
+ * provided replication domain.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @return an unmodifiable set of integers holding the serverIds
+ */
+ Set<Integer> getDomainServerIds(DN baseDN);
+
+ /**
+ * Get the number of changes for the specified replication domain.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @return the number of changes.
+ */
+ long getDomainChangesCount(DN baseDN);
+
+ /**
+ * Returns the oldest {@link CSN}s of each serverId for the specified
+ * replication domain.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @return a new ServerState object holding the {serverId => oldest CSN}
+ * mapping. If a replica DB is empty or closed, the oldest CSN will be
+ * null for that replica. The caller owns the generated ServerState.
+ */
+ ServerState getDomainOldestCSNs(DN baseDN);
+
+ /**
+ * Returns the newest {@link CSN}s of each serverId for the specified
+ * replication domain.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @return a new ServerState object holding the {serverId => newest CSN} Map.
+ * If a replica DB is empty or closed, the newest CSN will be null for
+ * that replica. The caller owns the generated ServerState.
+ */
+ ServerState getDomainNewestCSNs(DN baseDN);
+
+ /**
+ * Retrieves the latest trim date for the specified replication domain.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @return the domain latest trim date
+ */
+ long getDomainLatestTrimDate(DN baseDN);
+
+ /**
+ * Shutdown all the replica databases for the specified replication domain.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ */
+ void shutdownDomain(DN baseDN);
+
+ /**
+ * Removes all the data relating to the specified replication domain and
+ * shutdown all its replica databases. In particular, it will:
+ * <ol>
+ * <li>remove all the changes from the replica databases</li>
+ * <li>remove all knowledge of the serverIds in this domain</li>
+ * <li>remove any knowledge of the current generationId for this domain</li>
+ * </ol>
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @throws ChangelogException
+ * If a database problem happened
+ */
+ void removeDomain(DN baseDN) throws ChangelogException;
+
+ // serverId methods
+
+ /**
+ * Return the number of changes inclusive between 2 provided {@link CSN}s for
+ * the specified serverId and replication domain. i.e. the <code>from</code>
+ * and <code>to</code> CSNs are included in the count.
+ * <p>
+ * Note that:
+ * <ol>
+ * <li>If <code>from</code> is null, the count starts at the oldest CSN in the
+ * database.</li>
+ * <li>If <code>to</code> is null, the count is 0.</li>
+ * <li>If both from and to are present, then the count includes them both
+ * <code>to</code> is null, the count ends at the newest CSN in the database.
+ * </li>
+ * <li>incidentally, if both <code>from</code> and <code>to</code> are null,
+ * the total count of entries in the replica database is returned.</li>
+ * </ol>
+ * <h6>Example</h6>
+ * <p>
+ * Given the following replica database for baseDN "dc=example,dc=com" and
+ * serverId 1:
+ *
+ * <pre>
+ * CSN1 <= Oldest
+ * CSN2
+ * CSN3
+ * CSN4
+ * CSN5 <= Newest
+ * </pre>
+ *
+ * Then:
+ *
+ * <pre>
+ * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN1), 1);
+ * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN2), 2);
+ * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN5), 5);
+ * assertEquals(getCount("dc=example,dc=com", 1, null, CSN5), 5);
+ * assertEquals(getCount("dc=example,dc=com", 1, CSN1, null), 0);
+ * assertEquals(getCount("dc=example,dc=com", 1, null, null), 5);
+ * </pre>
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @param serverId
+ * the serverId on which to act
+ * @param from
+ * The older CSN where to start the count
+ * @param to
+ * The newer CSN where to end the count
+ * @return The computed number of changes
+ */
+ long getCount(DN baseDN, int serverId, CSN from, CSN to);
+
+ /**
+ * Returns the {@link CSN} situated immediately after the specified
+ * {@link CSN} for the specified serverId and replication domain according to
+ * the order specified by {@link CSN#compareTo(CSN)}. If an Exception occurs
+ * in this method, then it returns the oldest possible CSN for the provided
+ * serverId.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @param serverId
+ * the serverId for which we want the information
+ * @param startAfterCSN
+ * The position where the iterator must start
+ * @return the CSN immediately after startAfterCSN, or null if no CSN exist
+ * after startAfterCSN
+ */
+ CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN);
+
+ /**
+ * Generates a {@link ReplicaDBCursor} for the specified serverId and
+ * replication domain starting after the provided CSN.
+ * <p>
+ * The cursor is already advanced to the record after startAfterCSN.
+ * <p>
+ * When the cursor is not used anymore, client code MUST call the
+ * {@link ReplicaDBCursor#close()} method to free the resources and locks used
+ * by the cursor.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @param serverId
+ * Identifier of the server for which the cursor is created
+ * @param startAfterCSN
+ * Starting point for the cursor. If null, start from the oldest CSN
+ * @return a non null {@link ReplicaDBCursor}
+ */
+ ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN);
+
+ /**
+ * for the specified serverId and replication domain.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @param serverId
+ * the serverId on which to act
+ * @param updateMsg
+ * the update message to publish to the replicaDB
+ * @return true if a db had to be created to publish this message
+ * @throws ChangelogException
+ * If a database problem happened
+ */
+ boolean publishUpdateMsg(DN baseDN, int serverId, UpdateMsg updateMsg)
+ throws ChangelogException;
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 0ec74e6..4f42777 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -41,10 +41,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
-import org.opends.server.replication.server.changelog.api.ChangelogDB;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
+import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.DN;
import org.opends.server.util.Pair;
import org.opends.server.util.StaticUtils;
@@ -56,7 +53,7 @@
/**
* JE implementation of the ChangelogDB.
*/
-public class JEChangelogDB implements ChangelogDB
+public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB
{
/**
@@ -588,6 +585,13 @@
/** {@inheritDoc} */
@Override
+ public ReplicationDomainDB getReplicationDomainDB()
+ {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
CSN startAfterCSN)
{
--
Gitblit v1.10.0