mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
26.31.2007 71ebb3724c79a7d1218c36f080acd6ee162b9cd2
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
File was renamed from opends/src/server/org/opends/server/replication/server/Changelog.java
@@ -62,16 +62,16 @@
import com.sleepycat.je.DatabaseException;
/**
 * Changelog Listener.
 * ReplicationServer Listener.
 *
 * This singleton is the main object of the changelog server
 * This singleton is the main object of the replication server
 * It waits for the incoming connections and create listener
 * and publisher objects for
 * connection with LDAP servers and with changelog servers
 * connection with LDAP servers and with replication servers
 *
 * It is responsible for creating the changelog cache and managing it
 * It is responsible for creating the replication server cache and managing it
 */
public class Changelog
public class ReplicationServer
  implements Runnable, ConfigurableComponent,
             ConfigurationChangeListener<ChangelogServerCfg>
{
@@ -84,14 +84,14 @@
  private boolean runListen = true;
  /* The list of changelog servers configured by the administrator */
  private Collection<String> changelogServers;
  /* The list of replication servers configured by the administrator */
  private Collection<String> replicationServers;
  /* This table is used to store the list of dn for which we are currently
   * handling servers.
   */
  private HashMap<DN, ChangelogCache> baseDNs =
          new HashMap<DN, ChangelogCache>();
  private HashMap<DN, ReplicationCache> baseDNs =
          new HashMap<DN, ReplicationCache>();
  private String localURL = "null";
  private boolean shutdown = false;
@@ -99,7 +99,7 @@
  private DN configDn;
  private List<ConfigAttribute> configAttributes =
          new ArrayList<ConfigAttribute>();
  private ChangelogDbEnv dbEnv;
  private ReplicationDbEnv dbEnv;
  private int rcvWindow;
  private int queueSize;
  private String dbDirname = null;
@@ -107,20 +107,21 @@
                        // de deleted from the persistent storage.
  /**
   * Creates a new Changelog using the provided configuration entry.
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param configuration The configuration of this changelog.
   * @param configuration The configuration of this replication server.
   * @throws ConfigException When Configuration is invalid.
   */
  public Changelog(ChangelogServerCfg configuration) throws ConfigException
  public ReplicationServer(ChangelogServerCfg configuration)
         throws ConfigException
  {
    shutdown = false;
    runListen = true;
    int changelogPort = configuration.getChangelogPort();
    changelogServerId = (short) configuration.getChangelogServerId();
    changelogServers = configuration.getChangelogServer();
    if (changelogServers == null)
      changelogServers = new ArrayList<String>();
    replicationServers = configuration.getChangelogServer();
    if (replicationServers == null)
      replicationServers = new ArrayList<String>();
    queueSize = configuration.getQueueSize();
    trimAge = configuration.getChangelogPurgeDelay();
    dbDirname = configuration.getChangelogDbDirectory();
@@ -201,8 +202,8 @@
  /**
   * The run method for the Listen thread.
   * This thread accept incoming connections on the changelog server
   * ports from other changelog servers or from LDAP servers
   * This thread accept incoming connections on the replication server
   * ports from other replication servers or from LDAP servers
   * and spawn further thread responsible for handling those connections
   */
@@ -211,9 +212,9 @@
    Socket newSocket = null;
    while (shutdown == false)
    {
      // Wait on the changelog port.
      // Read incoming messages and create LDAP or Changelog listener and
      // Publisher.
      // Wait on the replicationServer port.
      // Read incoming messages and create LDAP or ReplicationServer listener
      // and Publisher.
      try
      {
@@ -232,9 +233,9 @@
  }
  /**
   * This method manages the connection with the other changelog servers.
   * It periodically checks that this changelog server is indeed connected
   * to all the other changelog servers and if not attempts to
   * This method manages the connection with the other replication servers.
   * It periodically checks that this replication server is indeed connected
   * to all the other replication servers and if not attempts to
   * make the connection.
   */
  private void runConnect()
@@ -243,21 +244,21 @@
    {
      /*
       * periodically check that we are connected to all other
       * changelog servers and if not establish the connection
       * replication servers and if not establish the connection
       */
      for (ChangelogCache changelogCache: baseDNs.values())
      for (ReplicationCache replicationCache: baseDNs.values())
      {
        Set<String> connectedChangelogs = changelogCache.getChangelogs();
        Set<String> connectedChangelogs = replicationCache.getChangelogs();
        /*
         * check that all changelog in the config are in the connected Set
         * if not create the connection
         * check that all replication server in the config are in the connected
         * Set. If not create the connection
         */
        for (String serverURL : changelogServers)
        for (String serverURL : replicationServers)
        {
          if ((serverURL.compareTo(this.serverURL) != 0) &&
              (!connectedChangelogs.contains(serverURL)))
          {
            this.connect(serverURL, changelogCache.getBaseDn());
            this.connect(serverURL, replicationCache.getBaseDn());
          }
        }
      }
@@ -309,10 +310,11 @@
  }
  /**
   * initialization function for the changelog.
   * initialization function for the replicationServer.
   *
   * @param  changelogId       The unique identifier for this changelog.
   * @param  changelogPort     The port on which the changelog should listen.
   * @param  changelogId       The unique identifier for this replicationServer.
   * @param  changelogPort     The port on which the replicationServer should
   *                           listen.
   *
   */
  private void initialize(short changelogId, int changelogPort)
@@ -320,18 +322,18 @@
    try
    {
      /*
       * Initialize the changelog database.
       * Initialize the replicationServer database.
       */
      dbEnv = new ChangelogDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
      dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
          this);
      /*
       * create changelog cache
       * create replicationServer cache
       */
      serverId = changelogId;
      /*
       * Open changelog socket
       * Open replicationServer socket
       */
      String localhostname = InetAddress.getLocalHost().getHostName();
      String localAdddress = InetAddress.getLocalHost().getHostAddress();
@@ -344,9 +346,9 @@
      /*
       * create working threads
       */
      myListenThread = new DirectoryThread(this, "Changelog Listener");
      myListenThread = new DirectoryThread(this, "Replication Server Listener");
      myListenThread.start();
      myConnectThread = new DirectoryThread(this, "Changelog Connect");
      myConnectThread = new DirectoryThread(this, "Replication Server Connect");
      myConnectThread.start();
    } catch (DatabaseException e)
@@ -355,7 +357,7 @@
      String message = getMessage(msgID, dbDirname);
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
    } catch (ChangelogDBException e)
    } catch (ReplicationDBException e)
    {
      int msgID = MSGID_COULD_NOT_READ_DB;
      String message = getMessage(msgID, dbDirname);
@@ -378,28 +380,28 @@
  }
  /**
   * Get the ChangelogCache associated to the base DN given in parameter.
   * Get the ReplicationCache associated to the base DN given in parameter.
   *
   * @param baseDn The base Dn for which the ChangelogCache must be returned.
   * @return The ChangelogCache associated to the base DN given in parameter.
   * @param baseDn The base Dn for which the ReplicationCache must be returned.
   * @return The ReplicationCache associated to the base DN given in parameter.
   */
  public ChangelogCache getChangelogCache(DN baseDn)
  public ReplicationCache getReplicationCache(DN baseDn)
  {
    ChangelogCache changelogCache;
    ReplicationCache replicationCache;
    synchronized (baseDNs)
    {
      changelogCache = baseDNs.get(baseDn);
      if (changelogCache == null)
        changelogCache = new ChangelogCache(baseDn, this);
      baseDNs.put(baseDn, changelogCache);
      replicationCache = baseDNs.get(baseDn);
      if (replicationCache == null)
        replicationCache = new ReplicationCache(baseDn, this);
      baseDNs.put(baseDn, replicationCache);
    }
    return changelogCache;
    return replicationCache;
  }
  /**
   * Shutdown the Changelog service and all its connections.
   * Shutdown the Replication Server service and all its connections.
   */
  public void shutdown()
  {
@@ -421,13 +423,13 @@
      listenSocket.close();
    } catch (IOException e)
    {
      // changelog service is closing anyway.
      // replication Server service is closing anyway.
    }
    // shutdown all the ChangelogCaches
    for (ChangelogCache changelogCache : baseDNs.values())
    for (ReplicationCache replicationCache : baseDNs.values())
    {
      changelogCache.shutdown();
      replicationCache.shutdown();
    }
    dbEnv.shutdown();
@@ -435,12 +437,12 @@
  /**
   * Creates a new DB handler for this Changelog and the serverId and
   * Creates a new DB handler for this ReplicationServer and the serverId and
   * DN given in parameter.
   *
   * @param id The serverId for which the dbHandler must be created.
   * @param baseDn The DN for which the dbHandler muste be created.
   * @return The new DB handler for this Changelog and the serverId and
   * @return The new DB handler for this ReplicationServer and the serverId and
   *         DN given in parameter.
   * @throws DatabaseException in case of underlying database problem.
   */