mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.34.2013 50dca335649cdd5deae4ddbcd846b2214e4ef694
OPENDJ-1116 Introduce abstraction for the changelog DB


Changes after review from Matthew Swift.


ReplicationServerDomain.java
Renamed changelogDB field to domainDB + changed type.

ChangelogDB.java, JEChangelogDB.java:
Extracted interface ReplicationDomainDB from this interface.
Added getReplicationDomainDB().

ReplicationDomainDB.java: ADDED
1 files added
3 files modified
480 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 33 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java 206 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 227 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 14 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -46,9 +46,9 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -117,7 +117,7 @@
  private final Queue<MessageHandler> otherHandlers =
    new ConcurrentLinkedQueue<MessageHandler>();
  private final ChangelogDB changelogDB;
  private final ReplicationDomainDB domainDB;
  /** The ReplicationServer that created the current instance. */
  private ReplicationServer localReplicationServer;
@@ -186,7 +186,8 @@
    this.assuredTimeoutTimer = new Timer("Replication server RS("
        + localReplicationServer.getServerId()
        + ") assured timer for domain \"" + baseDN + "\"", true);
    this.changelogDB = localReplicationServer.getChangelogDB();
    this.domainDB =
        localReplicationServer.getChangelogDB().getReplicationDomainDB();
    DirectoryServer.registerMonitorProvider(this);
  }
@@ -405,7 +406,7 @@
  {
    try
    {
      if (this.changelogDB.publishUpdateMsg(baseDN, serverId, updateMsg))
      if (this.domainDB.publishUpdateMsg(baseDN, serverId, updateMsg))
      {
        /*
         * JNR: Matt and I had a hard time figuring out where to put this
@@ -1278,7 +1279,7 @@
   */
  public Set<Integer> getServerIds()
  {
    return changelogDB.getDomainServerIds(baseDN);
    return domainDB.getDomainServerIds(baseDN);
  }
  /**
@@ -1296,11 +1297,11 @@
   * @param startAfterCSN
   *          Starting point for the cursor. If null, start from the oldest CSN
   * @return a non null {@link ReplicaDBCursor}
   * @see ChangelogDB#getCursorFrom(DN, int, CSN)
   * @see ReplicationDomainDB#getCursorFrom(DN, int, CSN)
   */
  public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
  {
    return changelogDB.getCursorFrom(baseDN, serverId, startAfterCSN);
    return domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
  }
 /**
@@ -1313,7 +1314,7 @@
  */
  public long getCount(int serverId, CSN from, CSN to)
  {
    return changelogDB.getCount(baseDN, serverId, from, to);
    return domainDB.getCount(baseDN, serverId, from, to);
  }
  /**
@@ -1323,7 +1324,7 @@
   */
  public long getChangesCount()
  {
    return changelogDB.getDomainChangesCount(baseDN);
    return domainDB.getDomainChangesCount(baseDN);
  }
  /**
@@ -1708,7 +1709,7 @@
    stopAllServers(true);
    changelogDB.shutdownDomain(baseDN);
    domainDB.shutdownDomain(baseDN);
  }
  /**
@@ -1720,7 +1721,7 @@
   */
  public ServerState getLatestServerState()
  {
    return changelogDB.getDomainNewestCSNs(baseDN);
    return domainDB.getDomainNewestCSNs(baseDN);
  }
  /**
@@ -2193,7 +2194,7 @@
  {
    try
    {
      changelogDB.removeDomain(baseDN);
      domainDB.removeDomain(baseDN);
    }
    catch (ChangelogException e)
    {
@@ -2602,7 +2603,7 @@
          if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
          {
            // let's try to seek the first change <= eligibleCSN
            CSN newCSN = changelogDB.getCSNAfter(baseDN, serverId, eligibleCSN);
            CSN newCSN = domainDB.getCSNAfter(baseDN, serverId, eligibleCSN);
            result.update(newCSN);
          } else {
            // for this serverId, all changes in the ChangelogDb are holder
@@ -2635,7 +2636,7 @@
   */
  public ServerState getStartState()
  {
    return changelogDB.getDomainOldestCSNs(baseDN);
    return domainDB.getDomainOldestCSNs(baseDN);
  }
  /**
@@ -2651,7 +2652,7 @@
  {
    CSN eligibleCSN = null;
    final ServerState newestCSNs = changelogDB.getDomainNewestCSNs(baseDN);
    final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN);
    for (final int serverId : newestCSNs)
    {
      // Consider this producer (DS/db).
@@ -2845,7 +2846,7 @@
   */
  public long getLatestDomainTrimDate()
  {
    return changelogDB.getDomainLatestTrimDate(baseDN);
    return domainDB.getDomainLatestTrimDate(baseDN);
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -26,30 +26,16 @@
 */
package org.opends.server.replication.server.changelog.api;
import java.util.Set;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.DN;
/**
 * The changelogDB stores the replication data on persistent storage.
 * <p>
 * This interface allows to:
 * <ul>
 * <li>set the storage directory and the purge interval</li>
 * <li>get access to the {@link ChangeNumberIndexDB}</li>
 * <li>query or control the replication domain database(s) (composed of one or
 * more ReplicaDBs)</li>
 * <li>query/update each ReplicaDB</li>
 * </ul>
 * This interface is the entry point for the changelog database which stores the
 * replication data on persistent storage. It allows to control the overall
 * database or access more specialized interfaces.
 */
public interface ChangelogDB
{
  // DB control methods
  /**
   * Initializes the replication database by reading its previous state and
   * building the relevant ReplicaDBs according to the previous state. This
@@ -93,190 +79,10 @@
   */
  ChangeNumberIndexDB getChangeNumberIndexDB();
  // Domain methods
  /**
   * Returns the serverIds for the servers that are or have been part of the
   * provided replication domain.
   * Returns the {@link ReplicationDomainDB} object.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return an unmodifiable set of integers holding the serverIds
   * @return the {@link ReplicationDomainDB} object
   */
  Set<Integer> getDomainServerIds(DN baseDN);
  /**
   * Get the number of changes for the specified replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return the number of changes.
   */
  long getDomainChangesCount(DN baseDN);
  /**
   * Returns the oldest {@link CSN}s of each serverId for the specified
   * replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return a new ServerState object holding the {serverId => oldest CSN}
   *         mapping. If a replica DB is empty or closed, the oldest CSN will be
   *         null for that replica. The caller owns the generated ServerState.
   */
  ServerState getDomainOldestCSNs(DN baseDN);
  /**
   * Returns the newest {@link CSN}s of each serverId for the specified
   * replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return a new ServerState object holding the {serverId => newest CSN} Map.
   *         If a replica DB is empty or closed, the newest CSN will be null for
   *         that replica. The caller owns the generated ServerState.
   */
  ServerState getDomainNewestCSNs(DN baseDN);
  /**
   * Retrieves the latest trim date for the specified replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return the domain latest trim date
   */
  long getDomainLatestTrimDate(DN baseDN);
  /**
   * Shutdown all the replica databases for the specified replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   */
  void shutdownDomain(DN baseDN);
  /**
   * Removes all the data relating to the specified replication domain and
   * shutdown all its replica databases. In particular, it will:
   * <ol>
   * <li>remove all the changes from the replica databases</li>
   * <li>remove all knowledge of the serverIds in this domain</li>
   * <li>remove any knowledge of the current generationId for this domain</li>
   * </ol>
   *
   * @param baseDN
   *          the replication domain baseDN
   * @throws ChangelogException
   *           If a database problem happened
   */
  void removeDomain(DN baseDN) throws ChangelogException;
  // serverId methods
  /**
   * Return the number of changes inclusive between 2 provided {@link CSN}s for
   * the specified serverId and replication domain. i.e. the <code>from</code>
   * and <code>to</code> CSNs are included in the count.
   * <p>
   * Note that:
   * <ol>
   * <li>If <code>from</code> is null, the count starts at the oldest CSN in the
   * database.</li>
   * <li>If <code>to</code> is null, the count is 0.</li>
   * <li>If both from and to are present, then the count includes them both
   * <code>to</code> is null, the count ends at the newest CSN in the database.
   * </li>
   * <li>incidentally, if both <code>from</code> and <code>to</code> are null,
   * the total count of entries in the replica database is returned.</li>
   * </ol>
   * <h6>Example</h6>
   * <p>
   * Given the following replica database for baseDN "dc=example,dc=com" and
   * serverId 1:
   *
   * <pre>
   * CSN1  <=  Oldest
   * CSN2
   * CSN3
   * CSN4
   * CSN5  <=  Newest
   * </pre>
   *
   * Then:
   *
   * <pre>
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, CSN1), 1);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, CSN2), 2);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, CSN5), 5);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, null, CSN5), 5);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, null), 0);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, null, null), 5);
   * </pre>
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param serverId
   *          the serverId on which to act
   * @param from
   *          The older CSN where to start the count
   * @param to
   *          The newer CSN where to end the count
   * @return The computed number of changes
   */
  long getCount(DN baseDN, int serverId, CSN from, CSN to);
  /**
   * Returns the {@link CSN} situated immediately after the specified
   * {@link CSN} for the specified serverId and replication domain according to
   * the order specified by {@link CSN#compareTo(CSN)}. If an Exception occurs
   * in this method, then it returns the oldest possible CSN for the provided
   * serverId.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param serverId
   *          the serverId for which we want the information
   * @param startAfterCSN
   *          The position where the iterator must start
   * @return the CSN immediately after startAfterCSN, or null if no CSN exist
   *         after startAfterCSN
   */
  CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN);
  /**
   * Generates a {@link ReplicaDBCursor} for the specified serverId and
   * replication domain starting after the provided CSN.
   * <p>
   * The cursor is already advanced to the record after startAfterCSN.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
   * by the cursor.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param serverId
   *          Identifier of the server for which the cursor is created
   * @param startAfterCSN
   *          Starting point for the cursor. If null, start from the oldest CSN
   * @return a non null {@link ReplicaDBCursor}
   */
  ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN);
  /**
   * for the specified serverId and replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param serverId
   *          the serverId on which to act
   * @param updateMsg
   *          the update message to publish to the replicaDB
   * @return true if a db had to be created to publish this message
   * @throws ChangelogException
   *           If a database problem happened
   */
  boolean publishUpdateMsg(DN baseDN, int serverId, UpdateMsg updateMsg)
      throws ChangelogException;
  ReplicationDomainDB getReplicationDomainDB();
}
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
New file
@@ -0,0 +1,227 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.api;
import java.util.Set;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.DN;
/**
 * This interface allows to query or control the replication domain database(s)
 * (composed of one or more ReplicaDBs) and query/update each ReplicaDB.
 */
public interface ReplicationDomainDB
{
  /**
   * Returns the serverIds for the servers that are or have been part of the
   * provided replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return an unmodifiable set of integers holding the serverIds
   */
  Set<Integer> getDomainServerIds(DN baseDN);
  /**
   * Get the number of changes for the specified replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return the number of changes.
   */
  long getDomainChangesCount(DN baseDN);
  /**
   * Returns the oldest {@link CSN}s of each serverId for the specified
   * replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return a new ServerState object holding the {serverId => oldest CSN}
   *         mapping. If a replica DB is empty or closed, the oldest CSN will be
   *         null for that replica. The caller owns the generated ServerState.
   */
  ServerState getDomainOldestCSNs(DN baseDN);
  /**
   * Returns the newest {@link CSN}s of each serverId for the specified
   * replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return a new ServerState object holding the {serverId => newest CSN} Map.
   *         If a replica DB is empty or closed, the newest CSN will be null for
   *         that replica. The caller owns the generated ServerState.
   */
  ServerState getDomainNewestCSNs(DN baseDN);
  /**
   * Retrieves the latest trim date for the specified replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return the domain latest trim date
   */
  long getDomainLatestTrimDate(DN baseDN);
  /**
   * Shutdown all the replica databases for the specified replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   */
  void shutdownDomain(DN baseDN);
  /**
   * Removes all the data relating to the specified replication domain and
   * shutdown all its replica databases. In particular, it will:
   * <ol>
   * <li>remove all the changes from the replica databases</li>
   * <li>remove all knowledge of the serverIds in this domain</li>
   * <li>remove any knowledge of the current generationId for this domain</li>
   * </ol>
   *
   * @param baseDN
   *          the replication domain baseDN
   * @throws ChangelogException
   *           If a database problem happened
   */
  void removeDomain(DN baseDN) throws ChangelogException;
  // serverId methods
  /**
   * Return the number of changes inclusive between 2 provided {@link CSN}s for
   * the specified serverId and replication domain. i.e. the <code>from</code>
   * and <code>to</code> CSNs are included in the count.
   * <p>
   * Note that:
   * <ol>
   * <li>If <code>from</code> is null, the count starts at the oldest CSN in the
   * database.</li>
   * <li>If <code>to</code> is null, the count is 0.</li>
   * <li>If both from and to are present, then the count includes them both
   * <code>to</code> is null, the count ends at the newest CSN in the database.
   * </li>
   * <li>incidentally, if both <code>from</code> and <code>to</code> are null,
   * the total count of entries in the replica database is returned.</li>
   * </ol>
   * <h6>Example</h6>
   * <p>
   * Given the following replica database for baseDN "dc=example,dc=com" and
   * serverId 1:
   *
   * <pre>
   * CSN1  <=  Oldest
   * CSN2
   * CSN3
   * CSN4
   * CSN5  <=  Newest
   * </pre>
   *
   * Then:
   *
   * <pre>
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, CSN1), 1);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, CSN2), 2);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, CSN5), 5);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, null, CSN5), 5);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, CSN1, null), 0);
   * assertEquals(getCount(&quot;dc=example,dc=com&quot;, 1, null, null), 5);
   * </pre>
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param serverId
   *          the serverId on which to act
   * @param from
   *          The older CSN where to start the count
   * @param to
   *          The newer CSN where to end the count
   * @return The computed number of changes
   */
  long getCount(DN baseDN, int serverId, CSN from, CSN to);
  /**
   * Returns the {@link CSN} situated immediately after the specified
   * {@link CSN} for the specified serverId and replication domain according to
   * the order specified by {@link CSN#compareTo(CSN)}. If an Exception occurs
   * in this method, then it returns the oldest possible CSN for the provided
   * serverId.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param serverId
   *          the serverId for which we want the information
   * @param startAfterCSN
   *          The position where the iterator must start
   * @return the CSN immediately after startAfterCSN, or null if no CSN exist
   *         after startAfterCSN
   */
  CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN);
  /**
   * Generates a {@link ReplicaDBCursor} for the specified serverId and
   * replication domain starting after the provided CSN.
   * <p>
   * The cursor is already advanced to the record after startAfterCSN.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
   * by the cursor.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param serverId
   *          Identifier of the server for which the cursor is created
   * @param startAfterCSN
   *          Starting point for the cursor. If null, start from the oldest CSN
   * @return a non null {@link ReplicaDBCursor}
   */
  ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN);
  /**
   * for the specified serverId and replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param serverId
   *          the serverId on which to act
   * @param updateMsg
   *          the update message to publish to the replicaDB
   * @return true if a db had to be created to publish this message
   * @throws ChangelogException
   *           If a database problem happened
   */
  boolean publishUpdateMsg(DN baseDN, int serverId, UpdateMsg updateMsg)
      throws ChangelogException;
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -41,10 +41,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.DN;
import org.opends.server.util.Pair;
import org.opends.server.util.StaticUtils;
@@ -56,7 +53,7 @@
/**
 * JE implementation of the ChangelogDB.
 */
public class JEChangelogDB implements ChangelogDB
public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB
{
  /**
@@ -588,6 +585,13 @@
  /** {@inheritDoc} */
  @Override
  public ReplicationDomainDB getReplicationDomainDB()
  {
    return this;
  }
  /** {@inheritDoc} */
  @Override
  public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
      CSN startAfterCSN)
  {