opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -46,9 +46,9 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.types.*; import static org.opends.messages.ReplicationMessages.*; @@ -117,7 +117,7 @@ private final Queue<MessageHandler> otherHandlers = new ConcurrentLinkedQueue<MessageHandler>(); private final ChangelogDB changelogDB; private final ReplicationDomainDB domainDB; /** The ReplicationServer that created the current instance. */ private ReplicationServer localReplicationServer; @@ -186,7 +186,8 @@ this.assuredTimeoutTimer = new Timer("Replication server RS(" + localReplicationServer.getServerId() + ") assured timer for domain \"" + baseDN + "\"", true); this.changelogDB = localReplicationServer.getChangelogDB(); this.domainDB = localReplicationServer.getChangelogDB().getReplicationDomainDB(); DirectoryServer.registerMonitorProvider(this); } @@ -405,7 +406,7 @@ { try { if (this.changelogDB.publishUpdateMsg(baseDN, serverId, updateMsg)) if (this.domainDB.publishUpdateMsg(baseDN, serverId, updateMsg)) { /* * JNR: Matt and I had a hard time figuring out where to put this @@ -1278,7 +1279,7 @@ */ public Set<Integer> getServerIds() { return changelogDB.getDomainServerIds(baseDN); return domainDB.getDomainServerIds(baseDN); } /** @@ -1296,11 +1297,11 @@ * @param startAfterCSN * Starting point for the cursor. If null, start from the oldest CSN * @return a non null {@link ReplicaDBCursor} * @see ChangelogDB#getCursorFrom(DN, int, CSN) * @see ReplicationDomainDB#getCursorFrom(DN, int, CSN) */ public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN) { return changelogDB.getCursorFrom(baseDN, serverId, startAfterCSN); return domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); } /** @@ -1313,7 +1314,7 @@ */ public long getCount(int serverId, CSN from, CSN to) { return changelogDB.getCount(baseDN, serverId, from, to); return domainDB.getCount(baseDN, serverId, from, to); } /** @@ -1323,7 +1324,7 @@ */ public long getChangesCount() { return changelogDB.getDomainChangesCount(baseDN); return domainDB.getDomainChangesCount(baseDN); } /** @@ -1708,7 +1709,7 @@ stopAllServers(true); changelogDB.shutdownDomain(baseDN); domainDB.shutdownDomain(baseDN); } /** @@ -1720,7 +1721,7 @@ */ public ServerState getLatestServerState() { return changelogDB.getDomainNewestCSNs(baseDN); return domainDB.getDomainNewestCSNs(baseDN); } /** @@ -2193,7 +2194,7 @@ { try { changelogDB.removeDomain(baseDN); domainDB.removeDomain(baseDN); } catch (ChangelogException e) { @@ -2602,7 +2603,7 @@ if (eligibleCSN.olderOrEqual(mostRecentDbCSN)) { // let's try to seek the first change <= eligibleCSN CSN newCSN = changelogDB.getCSNAfter(baseDN, serverId, eligibleCSN); CSN newCSN = domainDB.getCSNAfter(baseDN, serverId, eligibleCSN); result.update(newCSN); } else { // for this serverId, all changes in the ChangelogDb are holder @@ -2635,7 +2636,7 @@ */ public ServerState getStartState() { return changelogDB.getDomainOldestCSNs(baseDN); return domainDB.getDomainOldestCSNs(baseDN); } /** @@ -2651,7 +2652,7 @@ { CSN eligibleCSN = null; final ServerState newestCSNs = changelogDB.getDomainNewestCSNs(baseDN); final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN); for (final int serverId : newestCSNs) { // Consider this producer (DS/db). @@ -2845,7 +2846,7 @@ */ public long getLatestDomainTrimDate() { return changelogDB.getDomainLatestTrimDate(baseDN); return domainDB.getDomainLatestTrimDate(baseDN); } /** opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -26,30 +26,16 @@ */ package org.opends.server.replication.server.changelog.api; import java.util.Set; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.types.DN; /** * The changelogDB stores the replication data on persistent storage. * <p> * This interface allows to: * <ul> * <li>set the storage directory and the purge interval</li> * <li>get access to the {@link ChangeNumberIndexDB}</li> * <li>query or control the replication domain database(s) (composed of one or * more ReplicaDBs)</li> * <li>query/update each ReplicaDB</li> * </ul> * This interface is the entry point for the changelog database which stores the * replication data on persistent storage. It allows to control the overall * database or access more specialized interfaces. */ public interface ChangelogDB { // DB control methods /** * Initializes the replication database by reading its previous state and * building the relevant ReplicaDBs according to the previous state. This @@ -93,190 +79,10 @@ */ ChangeNumberIndexDB getChangeNumberIndexDB(); // Domain methods /** * Returns the serverIds for the servers that are or have been part of the * provided replication domain. * Returns the {@link ReplicationDomainDB} object. * * @param baseDN * the replication domain baseDN * @return an unmodifiable set of integers holding the serverIds * @return the {@link ReplicationDomainDB} object */ Set<Integer> getDomainServerIds(DN baseDN); /** * Get the number of changes for the specified replication domain. * * @param baseDN * the replication domain baseDN * @return the number of changes. */ long getDomainChangesCount(DN baseDN); /** * Returns the oldest {@link CSN}s of each serverId for the specified * replication domain. * * @param baseDN * the replication domain baseDN * @return a new ServerState object holding the {serverId => oldest CSN} * mapping. If a replica DB is empty or closed, the oldest CSN will be * null for that replica. The caller owns the generated ServerState. */ ServerState getDomainOldestCSNs(DN baseDN); /** * Returns the newest {@link CSN}s of each serverId for the specified * replication domain. * * @param baseDN * the replication domain baseDN * @return a new ServerState object holding the {serverId => newest CSN} Map. * If a replica DB is empty or closed, the newest CSN will be null for * that replica. The caller owns the generated ServerState. */ ServerState getDomainNewestCSNs(DN baseDN); /** * Retrieves the latest trim date for the specified replication domain. * * @param baseDN * the replication domain baseDN * @return the domain latest trim date */ long getDomainLatestTrimDate(DN baseDN); /** * Shutdown all the replica databases for the specified replication domain. * * @param baseDN * the replication domain baseDN */ void shutdownDomain(DN baseDN); /** * Removes all the data relating to the specified replication domain and * shutdown all its replica databases. In particular, it will: * <ol> * <li>remove all the changes from the replica databases</li> * <li>remove all knowledge of the serverIds in this domain</li> * <li>remove any knowledge of the current generationId for this domain</li> * </ol> * * @param baseDN * the replication domain baseDN * @throws ChangelogException * If a database problem happened */ void removeDomain(DN baseDN) throws ChangelogException; // serverId methods /** * Return the number of changes inclusive between 2 provided {@link CSN}s for * the specified serverId and replication domain. i.e. the <code>from</code> * and <code>to</code> CSNs are included in the count. * <p> * Note that: * <ol> * <li>If <code>from</code> is null, the count starts at the oldest CSN in the * database.</li> * <li>If <code>to</code> is null, the count is 0.</li> * <li>If both from and to are present, then the count includes them both * <code>to</code> is null, the count ends at the newest CSN in the database. * </li> * <li>incidentally, if both <code>from</code> and <code>to</code> are null, * the total count of entries in the replica database is returned.</li> * </ol> * <h6>Example</h6> * <p> * Given the following replica database for baseDN "dc=example,dc=com" and * serverId 1: * * <pre> * CSN1 <= Oldest * CSN2 * CSN3 * CSN4 * CSN5 <= Newest * </pre> * * Then: * * <pre> * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN1), 1); * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN2), 2); * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN5), 5); * assertEquals(getCount("dc=example,dc=com", 1, null, CSN5), 5); * assertEquals(getCount("dc=example,dc=com", 1, CSN1, null), 0); * assertEquals(getCount("dc=example,dc=com", 1, null, null), 5); * </pre> * * @param baseDN * the replication domain baseDN * @param serverId * the serverId on which to act * @param from * The older CSN where to start the count * @param to * The newer CSN where to end the count * @return The computed number of changes */ long getCount(DN baseDN, int serverId, CSN from, CSN to); /** * Returns the {@link CSN} situated immediately after the specified * {@link CSN} for the specified serverId and replication domain according to * the order specified by {@link CSN#compareTo(CSN)}. If an Exception occurs * in this method, then it returns the oldest possible CSN for the provided * serverId. * * @param baseDN * the replication domain baseDN * @param serverId * the serverId for which we want the information * @param startAfterCSN * The position where the iterator must start * @return the CSN immediately after startAfterCSN, or null if no CSN exist * after startAfterCSN */ CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN); /** * Generates a {@link ReplicaDBCursor} for the specified serverId and * replication domain starting after the provided CSN. * <p> * The cursor is already advanced to the record after startAfterCSN. * <p> * When the cursor is not used anymore, client code MUST call the * {@link ReplicaDBCursor#close()} method to free the resources and locks used * by the cursor. * * @param baseDN * the replication domain baseDN * @param serverId * Identifier of the server for which the cursor is created * @param startAfterCSN * Starting point for the cursor. If null, start from the oldest CSN * @return a non null {@link ReplicaDBCursor} */ ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN); /** * for the specified serverId and replication domain. * * @param baseDN * the replication domain baseDN * @param serverId * the serverId on which to act * @param updateMsg * the update message to publish to the replicaDB * @return true if a db had to be created to publish this message * @throws ChangelogException * If a database problem happened */ boolean publishUpdateMsg(DN baseDN, int serverId, UpdateMsg updateMsg) throws ChangelogException; ReplicationDomainDB getReplicationDomainDB(); } opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
New file @@ -0,0 +1,227 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.api; import java.util.Set; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.types.DN; /** * This interface allows to query or control the replication domain database(s) * (composed of one or more ReplicaDBs) and query/update each ReplicaDB. */ public interface ReplicationDomainDB { /** * Returns the serverIds for the servers that are or have been part of the * provided replication domain. * * @param baseDN * the replication domain baseDN * @return an unmodifiable set of integers holding the serverIds */ Set<Integer> getDomainServerIds(DN baseDN); /** * Get the number of changes for the specified replication domain. * * @param baseDN * the replication domain baseDN * @return the number of changes. */ long getDomainChangesCount(DN baseDN); /** * Returns the oldest {@link CSN}s of each serverId for the specified * replication domain. * * @param baseDN * the replication domain baseDN * @return a new ServerState object holding the {serverId => oldest CSN} * mapping. If a replica DB is empty or closed, the oldest CSN will be * null for that replica. The caller owns the generated ServerState. */ ServerState getDomainOldestCSNs(DN baseDN); /** * Returns the newest {@link CSN}s of each serverId for the specified * replication domain. * * @param baseDN * the replication domain baseDN * @return a new ServerState object holding the {serverId => newest CSN} Map. * If a replica DB is empty or closed, the newest CSN will be null for * that replica. The caller owns the generated ServerState. */ ServerState getDomainNewestCSNs(DN baseDN); /** * Retrieves the latest trim date for the specified replication domain. * * @param baseDN * the replication domain baseDN * @return the domain latest trim date */ long getDomainLatestTrimDate(DN baseDN); /** * Shutdown all the replica databases for the specified replication domain. * * @param baseDN * the replication domain baseDN */ void shutdownDomain(DN baseDN); /** * Removes all the data relating to the specified replication domain and * shutdown all its replica databases. In particular, it will: * <ol> * <li>remove all the changes from the replica databases</li> * <li>remove all knowledge of the serverIds in this domain</li> * <li>remove any knowledge of the current generationId for this domain</li> * </ol> * * @param baseDN * the replication domain baseDN * @throws ChangelogException * If a database problem happened */ void removeDomain(DN baseDN) throws ChangelogException; // serverId methods /** * Return the number of changes inclusive between 2 provided {@link CSN}s for * the specified serverId and replication domain. i.e. the <code>from</code> * and <code>to</code> CSNs are included in the count. * <p> * Note that: * <ol> * <li>If <code>from</code> is null, the count starts at the oldest CSN in the * database.</li> * <li>If <code>to</code> is null, the count is 0.</li> * <li>If both from and to are present, then the count includes them both * <code>to</code> is null, the count ends at the newest CSN in the database. * </li> * <li>incidentally, if both <code>from</code> and <code>to</code> are null, * the total count of entries in the replica database is returned.</li> * </ol> * <h6>Example</h6> * <p> * Given the following replica database for baseDN "dc=example,dc=com" and * serverId 1: * * <pre> * CSN1 <= Oldest * CSN2 * CSN3 * CSN4 * CSN5 <= Newest * </pre> * * Then: * * <pre> * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN1), 1); * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN2), 2); * assertEquals(getCount("dc=example,dc=com", 1, CSN1, CSN5), 5); * assertEquals(getCount("dc=example,dc=com", 1, null, CSN5), 5); * assertEquals(getCount("dc=example,dc=com", 1, CSN1, null), 0); * assertEquals(getCount("dc=example,dc=com", 1, null, null), 5); * </pre> * * @param baseDN * the replication domain baseDN * @param serverId * the serverId on which to act * @param from * The older CSN where to start the count * @param to * The newer CSN where to end the count * @return The computed number of changes */ long getCount(DN baseDN, int serverId, CSN from, CSN to); /** * Returns the {@link CSN} situated immediately after the specified * {@link CSN} for the specified serverId and replication domain according to * the order specified by {@link CSN#compareTo(CSN)}. If an Exception occurs * in this method, then it returns the oldest possible CSN for the provided * serverId. * * @param baseDN * the replication domain baseDN * @param serverId * the serverId for which we want the information * @param startAfterCSN * The position where the iterator must start * @return the CSN immediately after startAfterCSN, or null if no CSN exist * after startAfterCSN */ CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN); /** * Generates a {@link ReplicaDBCursor} for the specified serverId and * replication domain starting after the provided CSN. * <p> * The cursor is already advanced to the record after startAfterCSN. * <p> * When the cursor is not used anymore, client code MUST call the * {@link ReplicaDBCursor#close()} method to free the resources and locks used * by the cursor. * * @param baseDN * the replication domain baseDN * @param serverId * Identifier of the server for which the cursor is created * @param startAfterCSN * Starting point for the cursor. If null, start from the oldest CSN * @return a non null {@link ReplicaDBCursor} */ ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN); /** * for the specified serverId and replication domain. * * @param baseDN * the replication domain baseDN * @param serverId * the serverId on which to act * @param updateMsg * the update message to publish to the replicaDB * @return true if a db had to be created to publish this message * @throws ChangelogException * If a database problem happened */ boolean publishUpdateMsg(DN baseDN, int serverId, UpdateMsg updateMsg) throws ChangelogException; } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -41,10 +41,7 @@ import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.types.DN; import org.opends.server.util.Pair; import org.opends.server.util.StaticUtils; @@ -56,7 +53,7 @@ /** * JE implementation of the ChangelogDB. */ public class JEChangelogDB implements ChangelogDB public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB { /** @@ -588,6 +585,13 @@ /** {@inheritDoc} */ @Override public ReplicationDomainDB getReplicationDomainDB() { return this; } /** {@inheritDoc} */ @Override public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN) {