mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
06.52.2009 b891a41e31f548f0ef888cf8de090f4785715169
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.service;
@@ -170,7 +170,7 @@
   * The ReplicationBroker that is used by this ReplicationDomain to
   * connect to the ReplicationService.
   */
  private ReplicationBroker broker;
  private ReplicationBroker broker = null;
  /**
   * This Map is used to store all outgoing assured messages in order
@@ -991,8 +991,6 @@
  {
    // The task that initiated the operation.
    Task initializeTask;
    // The input stream for the import
    ReplInputStream ldifImportInputStream = null;
    // The target in the case of an export
    short exportTarget = RoutableMsg.UNKNOWN_SERVER;
    // The source in the case of an import
@@ -1553,7 +1551,6 @@
    ieContext.setCounters(
        initializeMessage.getEntryCount(),
        initializeMessage.getEntryCount());
    ieContext.ldifImportInputStream = new ReplInputStream(this);
    try
    {
@@ -1682,6 +1679,52 @@
  }
  /**
   * Check the value of the Replication Servers generation ID.
   *
   * @param generationID        The expected value of the generation ID.
   *
   * @throws DirectoryException When the generation ID of the Replication
   *                            Servers is not the expected value.
   */
  private void checkGenerationID(long generationID) throws DirectoryException
  {
    boolean flag = false;
    for (int i = 0; i< 10; i++)
    {
      for (RSInfo rsInfo : getRsList())
      {
        if (rsInfo.getGenerationId() == generationID)
        {
          flag = true;
          break;
        }
        else
        {
          try
          {
            Thread.sleep(i*100);
          } catch (InterruptedException e)
          {
          }
        }
      }
      if (flag)
      {
        break;
      }
    }
    if (!flag)
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
      throw new DirectoryException(
          resultCode, message);
    }
  }
  /**
   * Reset the Replication Log.
   * Calling this method will remove all the Replication information that
   * was kept on all the Replication Servers currently connected in the
@@ -1693,7 +1736,21 @@
   */
  public void resetReplicationLog() throws DirectoryException
  {
    // Reset the Generation ID to -1 to clean the ReplicationServers.
    resetGenerationId((long)-1);
    // check that at least one ReplicationServer did change its generation-id
    checkGenerationID(-1);
    // Reconnect to the Replication Server so that it adopt our
    // GenerationID.
    disableService();
    enableService();
    resetGenerationId(getGenerationID());
    // check that at least one ReplicationServer did change its generation-id
    checkGenerationID(getGenerationID());
  }
  /**
@@ -1715,8 +1772,7 @@
    if (!isConnected())
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
          serviceID);
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
      throw new DirectoryException(
         resultCode, message);
    }
@@ -2088,26 +2144,29 @@
      Collection<String> replicationServers, int window,
      long heartbeatInterval) throws ConfigException
  {
    /*
     * create the broker object used to publish and receive changes
     */
    broker = new ReplicationBroker(
        this, state, serviceID,
        serverID, window,
        getGenerationID(),
        heartbeatInterval,
        new ReplSessionSecurity(),
        getGroupId());
    if (broker == null)
    {
      /*
       * create the broker object used to publish and receive changes
       */
      broker = new ReplicationBroker(
          this, state, serviceID,
          serverID, window,
          getGenerationID(),
          heartbeatInterval,
          new ReplSessionSecurity(),
          getGroupId());
    broker.start(replicationServers);
      broker.start(replicationServers);
   /*
    * Create a replication monitor object responsible for publishing
    * monitoring information below cn=monitor.
    */
   monitor = new ReplicationMonitor(this);
      /*
       * Create a replication monitor object responsible for publishing
       * monitoring information below cn=monitor.
       */
      monitor = new ReplicationMonitor(this);
   DirectoryServer.registerMonitorProvider(monitor);
      DirectoryServer.registerMonitorProvider(monitor);
    }
  }
  /**
@@ -2115,9 +2174,14 @@
   * <p>
   * After this method has been called, the Replication Service will start
   * calling the {@link #processUpdate(UpdateMsg)}.
   * <p>
   * This method must be called once and must be called after the
   * {@link #startPublishService(Collection, int, long)}.
   *
   */
  public void startListenService()
  {
    //
    // Create the listener thread
    listenerThread = new ListenerThread(this);
    listenerThread.start();