mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
26.31.2007 98a5896dca14ea8ac850d94ebd46713da552601d
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/server/ChangelogCache.java
@@ -55,7 +55,7 @@
/**
 * This class define an in-memory cache that will be used to store
 * the messages that have been received from an LDAP server or
 * from another changelog server and that should be forwarded to
 * from another replication server and that should be forwarded to
 * other servers.
 *
 * The size of the cache is set by configuration.
@@ -68,7 +68,7 @@
 * received to the disk and for trimming them
 * Decision to trim can be based on disk space or age of the message
 */
public class ChangelogCache
public class ReplicationCache
{
  private Object flowControlLock = new Object();
  private DN baseDn = null;
@@ -80,22 +80,22 @@
   * must push to this particular server
   *
   * We add new TreeSet in the HashMap when a new server register
   * to this changelog server.
   * to this replication server.
   *
   */
  private Map<Short, ServerHandler> connectedServers =
    new ConcurrentHashMap<Short, ServerHandler>();
  /*
   * This map contains one ServerHandler for each changelog servers
   * with which we are connected (so normally all the changelogs)
   * This map contains one ServerHandler for each replication servers
   * with which we are connected (so normally all the replication servers)
   * the first update in the balanced tree is the next change that we
   * must push to this particular server
   *
   * We add new TreeSet in the HashMap when a new changelog server register
   * to this changelog server.
   * We add new TreeSet in the HashMap when a new replication server register
   * to this replication server.
   */
  private Map<Short, ServerHandler> changelogServers =
  private Map<Short, ServerHandler> replicationServers =
    new ConcurrentHashMap<Short, ServerHandler>();
  /*
@@ -104,18 +104,19 @@
   */
  private Map<Short, DbHandler> sourceDbHandlers =
    new ConcurrentHashMap<Short, DbHandler>();
  private Changelog changelog;
  private ReplicationServer replicationServer;
  /**
   * Creates a new ChangelogCache associated to the DN baseDn.
   * Creates a new ReplicationCache associated to the DN baseDn.
   *
   * @param baseDn The baseDn associated to the ChangelogCache.
   * @param changelog the Changelog that created this changelog cache.
   * @param baseDn The baseDn associated to the ReplicationCache.
   * @param replicationServer the ReplicationServer that created this
   *                          replicationServer cache.
   */
  public ChangelogCache(DN baseDn, Changelog changelog)
  public ReplicationCache(DN baseDn, ReplicationServer replicationServer)
  {
    this.baseDn = baseDn;
    this.changelog = changelog;
    this.replicationServer = replicationServer;
  }
  /**
@@ -134,7 +135,7 @@
    /*
     * TODO : In case that the source server is a LDAP server this method
     * should check that change did get pushed to at least one
     * other changelog server before pushing it to the LDAP servers
     * other replication server before pushing it to the LDAP servers
     */
    sourceHandler.updateServerState(update);
@@ -145,7 +146,7 @@
      int count = this.NumServers();
      if (count > 1)
      {
        if (sourceHandler.isChangelogServer())
        if (sourceHandler.isReplicationServer())
          ServerHandler.addWaitingAck(update, sourceHandler.getServerId(),
                                      this, count - 1);
        else
@@ -168,13 +169,13 @@
      {
        try
        {
          dbHandler = changelog.newDbHandler(id, baseDn);
          dbHandler = replicationServer.newDbHandler(id, baseDn);
        } catch (DatabaseException e)
        {
          /*
           * Because of database problem we can't save any more changes
           * from at least one LDAP server.
           * This changelog therefore can't do it's job properly anymore
           * This replicationServer therefore can't do it's job properly anymore
           * and needs to close all its connections and shutdown itself.
           */
          int    msgID   = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR;
@@ -182,7 +183,7 @@
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          changelog.shutdown();
          replicationServer.shutdown();
          return;
        }
        sourceDbHandlers.put(id, dbHandler);
@@ -194,11 +195,11 @@
    /*
     * Push the message to the changelog servers
     * Push the message to the replication servers
     */
    if (!sourceHandler.isChangelogServer())
    if (!sourceHandler.isReplicationServer())
    {
      for (ServerHandler handler : changelogServers.values())
      for (ServerHandler handler : replicationServers.values())
      {
        handler.add(update, sourceHandler);
      }
@@ -224,7 +225,7 @@
  /**
   * Create initialize context necessary for finding the changes
   * that must be sent to a given LDAP or changelog server.
   * that must be sent to a given LDAP or replication server.
   *
   * @param handler handler for the server that must be started
   * @throws Exception when method has failed
@@ -254,32 +255,32 @@
  {
    handler.stopHandler();
    if (handler.isChangelogServer())
      changelogServers.remove(handler.getServerId());
    if (handler.isReplicationServer())
      replicationServers.remove(handler.getServerId());
    else
      connectedServers.remove(handler.getServerId());
  }
  /**
   * Create initialize context necessary for finding the changes
   * that must be sent to a given changelog server.
   * that must be sent to a given replication server.
   *
   * @param handler the server ID to which we want to forward changes
   * @throws Exception in case of errors
   */
  public void startChangelog(ServerHandler handler) throws Exception
  public void startReplicationServer(ServerHandler handler) throws Exception
  {
    /*
     * create the balanced tree that will be used to forward changes
     * TODO throw proper exception
     */
    synchronized (changelogServers)
    synchronized (replicationServers)
    {
      if (changelogServers.containsKey(handler.getServerId()))
      if (replicationServers.containsKey(handler.getServerId()))
      {
        throw new Exception("changelog Id already registered");
        throw new Exception("Replication Server Id already registered");
      }
      changelogServers.put(handler.getServerId(), handler);
      replicationServers.put(handler.getServerId(), handler);
    }
  }
@@ -317,7 +318,7 @@
  }
  /**
   * Return a Set of String containing the lists of Changelog servers
   * Return a Set of String containing the lists of Replication servers
   * connected to this server.
   * @return the set of connected servers
   */
@@ -325,7 +326,7 @@
  {
    LinkedHashSet<String> mySet = new LinkedHashSet<String>();
    for (ServerHandler handler : changelogServers.values())
    for (ServerHandler handler : replicationServers.values())
    {
      mySet.add(handler.getServerAddressURL());
    }
@@ -335,8 +336,8 @@
  /**
   * Return a Set containing the servers known by this changelog.
   * @return a set containing the servers known by this changelog.
   * Return a Set containing the servers known by this replicationServer.
   * @return a set containing the servers known by this replicationServer.
   */
  public Set<Short> getServers()
  {
@@ -349,9 +350,9 @@
   *
   * @param serverId Identifier of the server for which the iterator is created.
   * @param changeNumber Starting point for the iterator.
   * @return the created ChangelogIterator.
   * @return the created ReplicationIterator.
   */
  public ChangelogIterator getChangelogIterator(short serverId,
  public ReplicationIterator getChangelogIterator(short serverId,
                    ChangeNumber changeNumber)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
@@ -377,8 +378,8 @@
  }
  /**
   * creates a new ChangelogDB with specified identifier.
   * @param id the identifier of the new ChangelogDB.
   * creates a new ReplicationDB with specified identifier.
   * @param id the identifier of the new ReplicationDB.
   * @param db the new db.
   *
   * @throws DatabaseException If a database error happened.
@@ -398,7 +399,7 @@
   */
  private int NumServers()
  {
    return changelogServers.size() + connectedServers.size();
    return replicationServers.size() + connectedServers.size();
  }
@@ -417,7 +418,7 @@
     *    In this case, we can find the handler from the connectedServers map
     *  - the message that was acked comes from a server to which we are not
     *    connected.
     *    In this case we need to find the changelog server that forwarded
     *    In this case we need to find the replication server that forwarded
     *    the change and send back the ack to this server.
     */
    ServerHandler handler = connectedServers.get(
@@ -455,10 +456,10 @@
    }
    else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
    {
      if (!senderHandler.isChangelogServer())
      if (!senderHandler.isReplicationServer())
      {
        // Send to all changelogServers
        for (ServerHandler destinationHandler : changelogServers.values())
        // Send to all replicationServers
        for (ServerHandler destinationHandler : replicationServers.values())
        {
          servers.add(destinationHandler);
        }
@@ -488,7 +489,7 @@
        if (senderHandler.isLDAPserver())
        {
          // let's forward to the other changelogs
          servers.addAll(changelogServers.values());
          servers.addAll(replicationServers.values());
        }
      }
    }
@@ -551,7 +552,7 @@
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     *                     change was an LDAP server or a ReplicationServer.
     */
    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
    {
@@ -565,7 +566,7 @@
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     *                     change was an LDAP server or a ReplicationServer.
     * @param serverId     The identifier of the server from which we
     *                     received the change..
     */
@@ -576,7 +577,7 @@
      if (isLDAPserver)
        handler = connectedServers.get(serverId);
      else
        handler = changelogServers.get(serverId);
        handler = replicationServers.get(serverId);
      // TODO : check for null handler and log error
      try
@@ -599,12 +600,12 @@
    }
    /**
     * Shutdown this ChangelogCache.
     * Shutdown this ReplicationCache.
     */
    public void shutdown()
    {
      // Close session with other changelogs
      for (ServerHandler serverHandler : changelogServers.values())
      for (ServerHandler serverHandler : replicationServers.values())
      {
        serverHandler.shutdown();
      }
@@ -647,7 +648,7 @@
    @Override
    public String toString()
    {
      return "ChangelogCache " + baseDn;
      return "ReplicationCache " + baseDn;
    }
    /**
@@ -656,7 +657,7 @@
     */
    public void checkAllSaturation() throws IOException
    {
      for (ServerHandler handler : changelogServers.values())
      for (ServerHandler handler : replicationServers.values())
      {
        handler.checkWindow();
      }
@@ -676,7 +677,7 @@
     */
    public boolean restartAfterSaturation(ServerHandler sourceHandler)
    {
      for (ServerHandler handler : changelogServers.values())
      for (ServerHandler handler : replicationServers.values())
      {
        if (!handler.restartAfterSaturation(sourceHandler))
          return false;