mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
21.33.2013 10b76e3a58346e0b6e2e07e58617b75dc383c249
Increased encapsulation in ReplicationServerDomain by:
- Making private the start*() methods
- Moved some code to here from some other classes
- MAking private the buildAndSendTopoInfoTo*() methods.

ReplicationServerDomain.java:
Extracted method sendTopoInfoToAllExcept().
Renamed buildAndSendTopoInfoToDSs() to sendTopoInfoToAllDSsExcept() + made it private.
Renamed buildAndSendTopoInfoToRSs() to sendTopoInfoToAllRSs() + made it private.
Renamed changeStatusFromStatusAnalyzer() to changeStatus().
Made startStatusAnalyzer() and startMonitoringPublisher() private.
Directly used the fields connectedDSs and connectedRSs instead of through their getters.
Moved 2 register() methods here from DataServerHandler and ReplicationServerHandler.
Renamed all variables and parameters named "handler" to disambiguated names.

DataServerHandler.java:
Renamed changeStatusFromStatusAnalyzer() to changeStatus().
Moved registerIntoDomain() + some other code to ReplicationServerDomain.

ReplicationServerHandler.java:
Moved registerIntoDomain() + some other code to ReplicationServerDomain.

ReplicationServer.java
Renamed all variables and parameters named "handler" to disambiguated names.
Consequence of the change to ReplicationServerDomain.sendTopoInfoToAllExcept().

StatusAnalyzer.java:
Consequence of the change to ReplicationServerDomain.changeStatusFromStatusAnalyzer().
5 files modified
565 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 37 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 40 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 465 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 20 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -183,14 +183,15 @@
  }
  /**
   * Change the status according to the event generated from the status
   * analyzer.
   * @param event The event to be used for new status computation
   * Change the status according to the event.
   *
   * @param event
   *          The event to be used for new status computation
   * @return The new status of the DS
   * @throws IOException When raised by the underlying session
   * @throws IOException
   *           When raised by the underlying session
   */
  public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event)
  throws IOException
  public ServerStatus changeStatus(StatusMachineEvent event) throws IOException
  {
    return changeStatus(event, "from status analyzer");
  }
@@ -377,25 +378,6 @@
    return serverStartMsg.getSSLEncryption();
  }
  /**
   * Registers this handler into its related domain and notifies the domain
   * about the new DS.
   */
  public void registerIntoDomain()
  {
    // All-right, connected with new DS: store handler.
    Map<Integer, DataServerHandler> connectedDSs =
      replicationServerDomain.getConnectedDSs();
    connectedDSs.put(serverId, this);
    // Tell peer DSs a new DS just connected to us
    // No need to re-send TopologyMsg to this just new DS so not null
    // argument
    replicationServerDomain.buildAndSendTopoInfoToDSs(this);
    // Tell peer RSs a new DS just connected to us
    replicationServerDomain.buildAndSendTopoInfoToRSs();
  }
  /** Send our own TopologyMsg to DS. */
  private TopologyMsg sendTopoToRemoteDS() throws IOException
  {
@@ -498,10 +480,7 @@
        throw new DirectoryException(ResultCode.OTHER, null, null);
      }
      replicationServerDomain.startStatusAnalyzer();
      replicationServerDomain.startMonitoringPublisher();
      registerIntoDomain();
      replicationServerDomain.register(this);
      Message message = INFO_REPLICATION_SERVER_CONNECTION_FROM_DS
          .get(getReplicationServerId(), getServerId(),
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -336,21 +336,21 @@
        if (msg instanceof ServerStartMsg)
        {
          DataServerHandler handler = new DataServerHandler(
          DataServerHandler dsHandler = new DataServerHandler(
              session, queueSize, this, rcvWindow);
          handler.startFromRemoteDS((ServerStartMsg)msg);
          dsHandler.startFromRemoteDS((ServerStartMsg) msg);
        }
        else if (msg instanceof ReplServerStartMsg)
        {
          ReplicationServerHandler handler = new ReplicationServerHandler(
          ReplicationServerHandler rsHandler = new ReplicationServerHandler(
              session, queueSize, this, rcvWindow);
          handler.startFromRemoteRS((ReplServerStartMsg)msg);
          rsHandler.startFromRemoteRS((ReplServerStartMsg) msg);
        }
        else if (msg instanceof ServerStartECLMsg)
        {
          ECLServerHandler handler = new ECLServerHandler(
          ECLServerHandler eclHandler = new ECLServerHandler(
              session, queueSize, this, rcvWindow);
          handler.startFromRemoteServer((ServerStartECLMsg)msg);
          eclHandler.startFromRemoteServer((ServerStartECLMsg) msg);
        }
        else
        {
@@ -471,9 +471,9 @@
  private Set<String> getConnectedRSUrls(ReplicationServerDomain domain)
  {
    Set<String> results = new LinkedHashSet<String>();
    for (ReplicationServerHandler handler : domain.getConnectedRSs().values())
    for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values())
    {
      results.add(normalizeServerURL(handler.getServerAddressURL()));
      results.add(normalizeServerURL(rsHandler.getServerAddressURL()));
    }
    return results;
  }
@@ -507,9 +507,9 @@
      socket.connect(ServerAddr, timeoutMS);
      session = replSessionSecurity.createClientSession(socket, timeoutMS);
      ReplicationServerHandler handler = new ReplicationServerHandler(
      ReplicationServerHandler rsHandler = new ReplicationServerHandler(
          session, queueSize, this, rcvWindow);
      handler.connect(baseDn, sslEncryption);
      rsHandler.connect(baseDn, sslEncryption);
    }
    catch (Exception e)
    {
@@ -867,19 +867,22 @@
  /**
   * Creates a new DB handler for this ReplicationServer and the serverId and
   * DN given in parameter.
   * Creates a new DB handler for this ReplicationServer and the serverId and DN
   * given in parameter.
   *
   * @param id The serverId for which the dbHandler must be created.
   * @param baseDn The DN for which the dbHandler must be created.
   * @param serverId
   *          The serverId for which the dbHandler must be created.
   * @param baseDn
   *          The DN for which the dbHandler must be created.
   * @return The new DB handler for this ReplicationServer and the serverId and
   *         DN given in parameter.
   * @throws ChangelogException in case of underlying database problem.
   * @throws ChangelogException
   *           in case of underlying database problem.
   */
  public DbHandler newDbHandler(int id, String baseDn)
  public DbHandler newDbHandler(int serverId, String baseDn)
      throws ChangelogException
  {
    return new DbHandler(id, baseDn, this, dbEnv, queueSize);
    return new DbHandler(serverId, baseDn, this, dbEnv, queueSize);
  }
@@ -1139,8 +1142,7 @@
  {
    for (ReplicationServerDomain domain : getReplicationServerDomains())
    {
      domain.buildAndSendTopoInfoToDSs(null);
      domain.buildAndSendTopoInfoToRSs();
      domain.sendTopoInfoToAll();
    }
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -367,13 +367,13 @@
    // Push the message to the replication servers
    if (sourceHandler.isDataServer())
    {
      for (ReplicationServerHandler handler : connectedRSs.values())
      for (ReplicationServerHandler rsHandler : connectedRSs.values())
      {
        /**
         * Ignore updates to RS with bad gen id
         * (no system managed status for a RS)
         */
        if (isDifferentGenerationId(handler.getGenerationId()))
        if (isDifferentGenerationId(rsHandler.getGenerationId()))
        {
          if (debugEnabled())
          {
@@ -383,24 +383,24 @@
                + localReplicationServer.getServerId() + " for dn " + baseDn
                + ", update " + update.getChangeNumber()
                + " will not be sent to replication server "
                + handler.getServerId() + " with generation id "
                + handler.getGenerationId() + " different from local "
                + rsHandler.getServerId() + " with generation id "
                + rsHandler.getGenerationId() + " different from local "
                + "generation id " + generationId);
          }
          continue;
        }
        notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate,
        notAssuredUpdate = addUpdate(rsHandler, update, notAssuredUpdate,
            assuredMessage, expectedServers);
      }
    }
    // Push the message to the LDAP servers
    for (DataServerHandler handler : connectedDSs.values())
    for (DataServerHandler dsHandler : connectedDSs.values())
    {
      // Don't forward the change to the server that just sent it
      if (handler == sourceHandler)
      if (dsHandler == sourceHandler)
      {
        continue;
      }
@@ -416,7 +416,7 @@
       * stop sending updates is interesting anyway. Not taking the RSD lock
       * allows to have better performances in normal mode (most of the time).
       */
      ServerStatus dsStatus = handler.getStatus();
      ServerStatus dsStatus = dsHandler.getStatus();
      if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS
          || dsStatus == ServerStatus.FULL_UPDATE_STATUS)
      {
@@ -427,8 +427,8 @@
            TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update "
                + update.getChangeNumber()
                + " will not be sent to directory server "
                + handler.getServerId() + " with generation id "
                + handler.getGenerationId() + " different from local "
                + dsHandler.getServerId() + " with generation id "
                + dsHandler.getGenerationId() + " different from local "
                + "generation id " + generationId);
          }
          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
@@ -436,20 +436,20 @@
            TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
                + " for dn " + baseDn + ", update " + update.getChangeNumber()
                + " will not be sent to directory server "
                + handler.getServerId() + " as it is in full update");
                + dsHandler.getServerId() + " as it is in full update");
          }
        }
        continue;
      }
      notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate,
      notAssuredUpdate = addUpdate(dsHandler, update, notAssuredUpdate,
          assuredMessage, expectedServers);
    }
    // Push the message to the other subscribing handlers
    for (MessageHandler handler : otherHandlers) {
      handler.add(update);
    for (MessageHandler mHandler : otherHandlers) {
      mHandler.add(update);
    }
  }
@@ -491,7 +491,7 @@
    return true;
  }
  private NotAssuredUpdateMsg addUpdate(ServerHandler handler,
  private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler,
      UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate,
      boolean assuredMessage, List<Integer> expectedServers)
      throws UnsupportedEncodingException
@@ -500,9 +500,9 @@
    {
      // Assured mode: post an assured or not assured matching update
      // message according to what has been computed for the destination server
      if (expectedServers.contains(handler.getServerId()))
      if (expectedServers.contains(sHandler.getServerId()))
      {
        handler.add(update);
        sHandler.add(update);
      }
      else
      {
@@ -510,12 +510,12 @@
        {
          notAssuredUpdate = new NotAssuredUpdateMsg(update);
        }
        handler.add(notAssuredUpdate);
        sHandler.add(notAssuredUpdate);
      }
    }
    else
    {
      handler.add(update);
      sHandler.add(update);
    }
    return notAssuredUpdate;
  }
@@ -578,23 +578,23 @@
      }
      // Look for DS eligible for assured
      for (DataServerHandler handler : connectedDSs.values())
      for (DataServerHandler dsHandler : connectedDSs.values())
      {
        // Don't forward the change to the server that just sent it
        if (handler == sourceHandler)
        if (dsHandler == sourceHandler)
        {
          continue;
        }
        if (handler.getGroupId() == groupId)
        if (dsHandler.getGroupId() == groupId)
          // No ack expected from a DS with different group id
        {
          ServerStatus serverStatus = handler.getStatus();
          ServerStatus serverStatus = dsHandler.getStatus();
          if (serverStatus == ServerStatus.NORMAL_STATUS)
          {
            expectedServers.add(handler.getServerId());
            expectedServers.add(dsHandler.getServerId());
          } else if (serverStatus == ServerStatus.DEGRADED_STATUS) {
            // No ack expected from a DS with wrong status
            wrongStatusServers.add(handler.getServerId());
            wrongStatusServers.add(dsHandler.getServerId());
          }
          /*
           * else
@@ -737,15 +737,15 @@
  private void collectRSsEligibleForAssuredReplication(byte groupId,
      List<Integer> expectedServers)
  {
    for (ReplicationServerHandler handler : connectedRSs.values())
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      if (handler.getGroupId() == groupId
      if (rsHandler.getGroupId() == groupId
      // No ack expected from a RS with different group id
            && isSameGenerationId(handler.getGenerationId())
            && isSameGenerationId(rsHandler.getGenerationId())
        // No ack expected from a RS with bad gen id
        )
      {
        expectedServers.add(handler.getServerId());
        expectedServers.add(rsHandler.getServerId());
      }
    }
  }
@@ -951,11 +951,11 @@
   */
  public void stopReplicationServers(Collection<String> replServerURLs)
  {
    for (ReplicationServerHandler handler : connectedRSs.values())
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      if (replServerURLs.contains(handler.getServerAddressURL()))
      if (replServerURLs.contains(rsHandler.getServerAddressURL()))
      {
        stopServer(handler, false);
        stopServer(rsHandler, false);
      }
    }
  }
@@ -968,35 +968,33 @@
   */
  public void stopAllServers(boolean shutdown)
  {
    // Close session with other replication servers
    for (ReplicationServerHandler serverHandler : connectedRSs.values())
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      stopServer(serverHandler, shutdown);
      stopServer(rsHandler, shutdown);
    }
    // Close session with other LDAP servers
    for (DataServerHandler serverHandler : connectedDSs.values())
    for (DataServerHandler dsHandler : connectedDSs.values())
    {
      stopServer(serverHandler, shutdown);
      stopServer(dsHandler, shutdown);
    }
  }
  /**
   * Checks whether it is already connected to a DS with same id.
   *
   * @param handler
   * @param dsHandler
   *          the DS we want to check
   * @return true if this DS is already connected to the current server
   */
  public boolean isAlreadyConnectedToDS(DataServerHandler handler)
  public boolean isAlreadyConnectedToDS(DataServerHandler dsHandler)
  {
    if (connectedDSs.containsKey(handler.getServerId()))
    if (connectedDSs.containsKey(dsHandler.getServerId()))
    {
      // looks like two connected LDAP servers have the same serverId
      Message message = ERR_DUPLICATE_SERVER_ID.get(
          localReplicationServer.getMonitorInstanceName(),
          connectedDSs.get(handler.getServerId()).toString(),
          handler.toString(), handler.getServerId());
          connectedDSs.get(dsHandler.getServerId()).toString(),
          dsHandler.toString(), dsHandler.getServerId());
      logError(message);
      return true;
    }
@@ -1006,11 +1004,11 @@
  /**
   * Stop operations with a given server.
   *
   * @param handler the server for which we want to stop operations.
   * @param sHandler the server for which we want to stop operations.
   * @param shutdown A boolean indicating if the stop is due to a
   *                 shutdown condition.
   */
  public void stopServer(ServerHandler handler, boolean shutdown)
  public void stopServer(ServerHandler sHandler, boolean shutdown)
  {
    // TODO JNR merge with stopServer(MessageHandler)
    if (debugEnabled())
@@ -1018,7 +1016,7 @@
      TRACER.debugInfo("In "
          + this.localReplicationServer.getMonitorInstanceName()
          + " domain=" + this + " stopServer() on the server handler "
          + handler.getMonitorInstanceName());
          + sHandler.getMonitorInstanceName());
    }
    /*
     * We must prevent deadlock on replication server domain lock, when for
@@ -1027,7 +1025,7 @@
     * the handler. So use a thread safe flag to know if the job must be done
     * or not (is already being processed or not).
     */
    if (!handler.engageShutdown())
    if (!sHandler.engageShutdown())
      // Only do this once (prevent other thread to enter here again)
    {
      if (!shutdown)
@@ -1056,17 +1054,17 @@
          {
            TRACER.debugInfo("In "
                + localReplicationServer.getMonitorInstanceName()
                + " remote server " + handler.getMonitorInstanceName()
                + " remote server " + sHandler.getMonitorInstanceName()
                + " is the last RS/DS to be stopped:"
                + " stopping monitoring publisher");
          }
          stopMonitoringPublisher();
        }
        if (connectedRSs.containsKey(handler.getServerId()))
        if (connectedRSs.containsKey(sHandler.getServerId()))
        {
          unregisterServerHandler(handler, shutdown, false);
        } else if (connectedDSs.containsKey(handler.getServerId()))
          unregisterServerHandler(sHandler, shutdown, false);
        } else if (connectedDSs.containsKey(sHandler.getServerId()))
        {
          // If this is the last DS for the domain,
          // shutdown the status analyzer
@@ -1076,15 +1074,15 @@
            {
              TRACER.debugInfo("In "
                  + localReplicationServer.getMonitorInstanceName()
                  + " remote server " + handler.getMonitorInstanceName()
                  + " remote server " + sHandler.getMonitorInstanceName()
                  + " is the last DS to be stopped: stopping status analyzer");
            }
            stopStatusAnalyzer();
          }
          unregisterServerHandler(handler, shutdown, true);
        } else if (otherHandlers.contains(handler))
          unregisterServerHandler(sHandler, shutdown, true);
        } else if (otherHandlers.contains(sHandler))
        {
          unregisterOtherHandler(handler);
          unregisterOtherHandler(sHandler);
        }
      }
      catch(Exception e)
@@ -1102,17 +1100,17 @@
    }
  }
  private void unregisterOtherHandler(MessageHandler handler)
  private void unregisterOtherHandler(MessageHandler mHandler)
  {
    unRegisterHandler(handler);
    handler.shutdown();
    unRegisterHandler(mHandler);
    mHandler.shutdown();
  }
  private void unregisterServerHandler(ServerHandler handler, boolean shutdown,
  private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
      boolean isDirectoryServer)
  {
    unregisterServerHandler(handler);
    handler.shutdown();
    unregisterServerHandler(sHandler);
    sHandler.shutdown();
    // Check if generation id has to be reset
    mayResetGenerationId();
@@ -1122,19 +1120,19 @@
      {
        // Update the remote replication servers with our list
        // of connected LDAP servers
        buildAndSendTopoInfoToRSs();
        sendTopoInfoToAllRSs();
      }
      // Warn our DSs that a RS or DS has quit (does not use this
      // handler as already removed from list)
      buildAndSendTopoInfoToDSs(null);
      sendTopoInfoToAllDSsExcept(null);
    }
  }
  /**
   * Stop the handler.
   * @param handler The handler to stop.
   * @param mHandler The handler to stop.
   */
  public void stopServer(MessageHandler handler)
  public void stopServer(MessageHandler mHandler)
  {
    // TODO JNR merge with stopServer(ServerHandler, boolean)
    if (debugEnabled())
@@ -1142,7 +1140,7 @@
      TRACER.debugInfo("In "
          + this.localReplicationServer.getMonitorInstanceName()
          + " domain=" + this + " stopServer() on the message handler "
          + handler.getMonitorInstanceName());
          + mHandler.getMonitorInstanceName());
    }
    /*
     * We must prevent deadlock on replication server domain lock, when for
@@ -1151,7 +1149,7 @@
     * the handler. So use a thread safe flag to know if the job must be done
     * or not (is already being processed or not).
     */
    if (!handler.engageShutdown())
    if (!mHandler.engageShutdown())
      // Only do this once (prevent other thread to enter here again)
    {
      try
@@ -1170,9 +1168,9 @@
      try
      {
        if (otherHandlers.contains(handler))
        if (otherHandlers.contains(mHandler))
        {
          unregisterOtherHandler(handler);
          unregisterOtherHandler(mHandler);
        }
      }
      catch(Exception e)
@@ -1190,17 +1188,17 @@
  /**
   * Unregister this handler from the list of handlers registered to this
   * domain.
   * @param handler the provided handler to unregister.
   * @param sHandler the provided handler to unregister.
   */
  private void unregisterServerHandler(ServerHandler handler)
  private void unregisterServerHandler(ServerHandler sHandler)
  {
    if (handler.isReplicationServer())
    if (sHandler.isReplicationServer())
    {
      connectedRSs.remove(handler.getServerId());
      connectedRSs.remove(sHandler.getServerId());
    }
    else
    {
      connectedDSs.remove(handler.getServerId());
      connectedDSs.remove(sHandler.getServerId());
    }
  }
@@ -1215,61 +1213,61 @@
   */
  private void mayResetGenerationId()
  {
    String prefix =
        "In RS " + this.localReplicationServer.getMonitorInstanceName()
            + " for " + baseDn + " ";
    if (debugEnabled())
    {
      TRACER.debugInfo("In RS "
          + this.localReplicationServer.getMonitorInstanceName()
          + " for " + baseDn + " mayResetGenerationId generationIdSavedStatus="
      TRACER.debugInfo(prefix + "mayResetGenerationId generationIdSavedStatus="
          + generationIdSavedStatus);
    }
    // If there is no more any LDAP server connected to this domain in the
    // topology and the generationId has never been saved, then we can reset
    // it and the next LDAP server to connect will become the new reference.
    boolean lDAPServersConnectedInTheTopology = false;
    boolean ldapServersConnectedInTheTopology = false;
    if (connectedDSs.isEmpty())
    {
      for (ReplicationServerHandler rsh : connectedRSs.values())
      for (ReplicationServerHandler rsHandler : connectedRSs.values())
      {
        if (generationId != rsh.getGenerationId())
        if (generationId != rsHandler.getGenerationId())
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In RS "
                + this.localReplicationServer.getMonitorInstanceName() + " for "
                + baseDn + " " + " mayResetGenerationId skip RS"
                + rsh.getMonitorInstanceName() + " that has different genId");
            TRACER.debugInfo(prefix + "mayResetGenerationId skip RS "
                + rsHandler.getMonitorInstanceName()
                + " that has different genId");
          }
        } else if (rsh.hasRemoteLDAPServers())
        }
        else if (rsHandler.hasRemoteLDAPServers())
        {
            lDAPServersConnectedInTheTopology = true;
          ldapServersConnectedInTheTopology = true;
            if (debugEnabled())
            {
              TRACER.debugInfo("In RS "
                  + this.localReplicationServer.getMonitorInstanceName()
                  + " for "+ baseDn + " mayResetGenerationId RS"
                  + rsh.getMonitorInstanceName()
                  + " has servers connected to it"
                  + " - will not reset generationId");
            }
            break;
          if (debugEnabled())
          {
            TRACER.debugInfo(prefix + "mayResetGenerationId RS "
                + rsHandler.getMonitorInstanceName()
                + " has ldap servers connected to it"
                + " - will not reset generationId");
          }
          break;
        }
      }
    } else
    }
    else
    {
      lDAPServersConnectedInTheTopology = true;
      ldapServersConnectedInTheTopology = true;
      if (debugEnabled())
      {
        TRACER.debugInfo("In RS "
            + this.localReplicationServer.getMonitorInstanceName() + " for "
            + baseDn + " "
            + " has servers connected to it - will not reset generationId");
        TRACER.debugInfo(prefix + "has ldap servers connected to it"
            + " - will not reset generationId");
      }
    }
    if (!lDAPServersConnectedInTheTopology && !this.generationIdSavedStatus
    if (!ldapServersConnectedInTheTopology
        && !this.generationIdSavedStatus
        && generationId != -1)
    {
      changeGenerationId(-1, false);
@@ -1279,23 +1277,24 @@
  /**
   * Checks whether a remote RS is already connected to this hosting RS.
   *
   * @param handler
   * @param rsHandler
   *          The handler for the remote RS.
   * @return flag specifying whether the remote RS is already connected.
   * @throws DirectoryException
   *           when a problem occurs.
   */
  public boolean isAlreadyConnectedToRS(ReplicationServerHandler handler)
  public boolean isAlreadyConnectedToRS(ReplicationServerHandler rsHandler)
      throws DirectoryException
  {
    ReplicationServerHandler oldHandler =
        connectedRSs.get(handler.getServerId());
    if (oldHandler == null)
    ReplicationServerHandler oldRsHandler =
        connectedRSs.get(rsHandler.getServerId());
    if (oldRsHandler == null)
    {
      return false;
    }
    if (oldHandler.getServerAddressURL().equals(handler.getServerAddressURL()))
    if (oldRsHandler.getServerAddressURL().equals(
        rsHandler.getServerAddressURL()))
    {
      // this is the same server, this means that our ServerStart messages
      // have been sent at about the same time and 2 connections
@@ -1308,8 +1307,8 @@
    // log an error message and drop this connection.
    Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
        localReplicationServer.getMonitorInstanceName(),
        oldHandler.getServerAddressURL(), handler.getServerAddressURL(),
        handler.getServerId());
        oldRsHandler.getServerAddressURL(), rsHandler.getServerAddressURL(),
        rsHandler.getServerId());
    throw new DirectoryException(ResultCode.OTHER, message);
  }
@@ -1318,11 +1317,11 @@
   * This call is blocking when no update is available or when dependencies
   * do not allow to send the next available change
   *
   * @param  handler  The server handler for the target directory server.
   * @param sHandler The server handler for the target directory server.
   *
   * @return the update that must be forwarded
   */
  public UpdateMsg take(ServerHandler handler)
  public UpdateMsg take(ServerHandler sHandler)
  {
    /*
     * Get the balanced tree that we use to sort the changes to be
@@ -1332,7 +1331,7 @@
     * So this methods simply need to check that dependencies are OK
     * and update this replicaId RUV
     */
    return handler.take();
    return sHandler.take();
  }
  /**
@@ -1360,8 +1359,8 @@
  public ReplicationIterator getChangelogIterator(int serverId,
      ChangeNumber startAfterCN)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
    if (handler == null)
    DbHandler dbHandler = sourceDbHandlers.get(serverId);
    if (dbHandler == null)
    {
      return null;
    }
@@ -1369,7 +1368,7 @@
    ReplicationIterator it;
    try
    {
      it = handler.generateIterator(startAfterCN);
      it = dbHandler.generateIterator(startAfterCN);
    }
    catch (Exception e)
    {
@@ -1395,10 +1394,10 @@
  */
  public long getCount(int serverId, ChangeNumber from, ChangeNumber to)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
    if (handler != null)
    DbHandler dbHandler = sourceDbHandlers.get(serverId);
    if (dbHandler != null)
    {
      return handler.getCount(from, to);
      return dbHandler.getCount(from, to);
    }
    return 0;
  }
@@ -1781,17 +1780,17 @@
      // from the states stored in the serverHandler.
      // - the server state
      // - the older missing change
      for (DataServerHandler lsh : this.connectedDSs.values())
      for (DataServerHandler dsHandler : this.connectedDSs.values())
      {
        monitorMsg.setServerState(lsh.getServerId(),
            lsh.getServerState(), lsh.getApproxFirstMissingDate(), true);
        monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
            .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
      }
      // Same for the connected RS
      for (ReplicationServerHandler rsh : this.connectedRSs.values())
      for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
      {
        monitorMsg.setServerState(rsh.getServerId(),
            rsh.getServerState(), rsh.getApproxFirstMissingDate(), false);
        monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
            .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
      }
      // Populate the RS state in the msg from the DbState
@@ -1857,27 +1856,29 @@
  }
  /**
   * Send a TopologyMsg to all the connected directory servers in order to
   * let them know the topology (every known DSs and RSs).
   * @param notThisOne If not null, the topology message will not be sent to
   * this passed server.
   * Send a TopologyMsg to all the connected directory servers in order to let
   * them know the topology (every known DSs and RSs).
   *
   * @param notThisOne
   *          If not null, the topology message will not be sent to this DS.
   */
  public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne)
  private void sendTopoInfoToAllDSsExcept(DataServerHandler notThisOne)
  {
    for (DataServerHandler handler : connectedDSs.values())
    for (DataServerHandler dsHandler : connectedDSs.values())
    {
      if (notThisOne == null || handler != notThisOne)
        // All except passed one
      if (dsHandler != notThisOne)
      // All except the supplied one
      {
        for (int i=1; i<=2; i++)
        {
          if (!handler.shuttingDown()
              && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          if (!dsHandler.shuttingDown()
              && dsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          {
            TopologyMsg topoMsg = createTopologyMsgForDS(handler.getServerId());
            TopologyMsg topoMsg =
                createTopologyMsgForDS(dsHandler.getServerId());
            try
            {
              handler.sendTopoInfo(topoMsg);
              dsHandler.sendTopoInfo(topoMsg);
              break;
            }
            catch (IOException e)
@@ -1886,7 +1887,7 @@
              {
                Message message =
                    ERR_EXCEPTION_SENDING_TOPO_INFO
                        .get(baseDn, "directory", Integer.toString(handler
                        .get(baseDn, "directory", Integer.toString(dsHandler
                            .getServerId()), e.getMessage());
                logError(message);
              }
@@ -1902,28 +1903,28 @@
   * Send a TopologyMsg to all the connected replication servers
   * in order to let them know our connected LDAP servers.
   */
  public void buildAndSendTopoInfoToRSs()
  private void sendTopoInfoToAllRSs()
  {
    TopologyMsg topoMsg = createTopologyMsgForRS();
    for (ReplicationServerHandler handler : connectedRSs.values())
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      for (int i=1; i<=2; i++)
      {
        if (!handler.shuttingDown()
            && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
        if (!rsHandler.shuttingDown()
            && rsHandler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
        {
          try
          {
            handler.sendTopoInfo(topoMsg);
            rsHandler.sendTopoInfo(topoMsg);
            break;
          }
          catch (IOException e)
          {
            if (i == 2)
            {
              Message message =
                  ERR_EXCEPTION_SENDING_TOPO_INFO.get(baseDn, "replication",
                      Integer.toString(handler.getServerId()), e.getMessage());
              Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                  baseDn, "replication",
                  Integer.toString(rsHandler.getServerId()), e.getMessage());
              logError(message);
            }
          }
@@ -1947,16 +1948,15 @@
  public TopologyMsg createTopologyMsgForRS()
  {
    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
    // Go through every DSs
    for (DataServerHandler serverHandler : connectedDSs.values())
    for (DataServerHandler dsHandler : connectedDSs.values())
    {
      dsInfos.add(serverHandler.toDSInfo());
      dsInfos.add(dsHandler.toDSInfo());
    }
    // Create info for the local RS
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
    rsInfos.add(toRSInfo(localReplicationServer, generationId));
    return new TopologyMsg(dsInfos, rsInfos);
  }
@@ -1974,13 +1974,13 @@
  {
    // Go through every DSs (except recipient of msg)
    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
    for (DataServerHandler serverHandler : connectedDSs.values())
    for (DataServerHandler dsHandler : connectedDSs.values())
    {
      if (serverHandler.getServerId() == destDsId)
      if (dsHandler.getServerId() == destDsId)
      {
        continue;
      }
      dsInfos.add(serverHandler.toDSInfo());
      dsInfos.add(dsHandler.toDSInfo());
    }
@@ -1990,11 +1990,11 @@
    // Go through every peer RSs (and get their connected DSs), also add info
    // for RSs
    for (ReplicationServerHandler serverHandler : connectedRSs.values())
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      rsInfos.add(serverHandler.toRSInfo());
      rsInfos.add(rsHandler.toRSInfo());
      serverHandler.addDSInfos(dsInfos);
      rsHandler.addDSInfos(dsInfos);
    }
    return new TopologyMsg(dsInfos, rsInfos);
@@ -2161,11 +2161,9 @@
      // (consecutive to reset gen id message), we prefer advertising once for
      // all after changes (less packet sent), here at the end of the reset msg
      // treatment.
      buildAndSendTopoInfoToDSs(null);
      buildAndSendTopoInfoToRSs();
      sendTopoInfoToAll();
      Message message = NOTE_RESET_GENERATION_ID.get(baseDn, newGenId);
      logError(message);
      logError(NOTE_RESET_GENERATION_ID.get(baseDn, newGenId));
    }
    catch(Exception e)
    {
@@ -2219,9 +2217,7 @@
        return;
      }
      // Update every peers (RS/DS) with topology changes
      buildAndSendTopoInfoToDSs(senderHandler);
      buildAndSendTopoInfoToRSs();
      sendTopoInfoToAllExcept(senderHandler);
      Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
          senderHandler.getServerId(), baseDn, newStatus.toString());
@@ -2241,12 +2237,12 @@
  /**
   * Change the status of a directory server according to the event generated
   * from the status analyzer.
   * @param serverHandler The handler of the directory server to update
   * @param dsHandler The handler of the directory server to update
   * @param event The event to be used for new status computation
   * @return True if we have been interrupted (must stop), false otherwise
   */
  public boolean changeStatusFromStatusAnalyzer(
      DataServerHandler serverHandler, StatusMachineEvent event)
  public boolean changeStatus(DataServerHandler dsHandler,
      StatusMachineEvent event)
  {
    try
    {
@@ -2272,7 +2268,7 @@
        TRACER.debugInfo("Status analyzer for domain " + baseDn
            + " has been interrupted when"
            + " trying to acquire domain lock for changing the status of DS "
            + serverHandler.getServerId());
            + dsHandler.getServerId());
      }
      return true;
    }
@@ -2280,16 +2276,16 @@
    try
    {
      ServerStatus newStatus = ServerStatus.INVALID_STATUS;
      ServerStatus oldStatus = serverHandler.getStatus();
      ServerStatus oldStatus = dsHandler.getStatus();
      try
      {
        newStatus = serverHandler.changeStatusFromStatusAnalyzer(event);
        newStatus = dsHandler.changeStatus(event);
      }
      catch (IOException e)
      {
        logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER
            .get(baseDn,
                Integer.toString(serverHandler.getServerId()),
                Integer.toString(dsHandler.getServerId()),
                e.getMessage()));
      }
@@ -2300,9 +2296,7 @@
        return false;
      }
      // Update every peers (RS/DS) with topology changes
      buildAndSendTopoInfoToDSs(serverHandler);
      buildAndSendTopoInfoToRSs();
      sendTopoInfoToAllExcept(dsHandler);
    }
    catch (Exception e)
    {
@@ -2318,6 +2312,26 @@
  }
  /**
   * Update every peers (RS/DS) with topology changes.
   */
  public void sendTopoInfoToAll()
  {
    sendTopoInfoToAllExcept(null);
  }
  /**
   * Update every peers (RS/DS) with topology changes but one DS.
   *
   * @param dsHandler
   *          if not null, the topology message will not be sent to this DS
   */
  private void sendTopoInfoToAllExcept(DataServerHandler dsHandler)
  {
    sendTopoInfoToAllDSsExcept(dsHandler);
    sendTopoInfoToAllRSs();
  }
  /**
   * Clears the Db associated with that domain.
   */
  public void clearDbs()
@@ -2372,11 +2386,11 @@
          + " given local generation Id=" + this.generationId);
    }
    ServerHandler handler = connectedRSs.get(serverId);
    if (handler == null)
    ServerHandler sHandler = connectedRSs.get(serverId);
    if (sHandler == null)
    {
      handler = connectedDSs.get(serverId);
      if (handler == null)
      sHandler = connectedDSs.get(serverId);
      if (sHandler == null)
      {
        return false;
      }
@@ -2386,30 +2400,29 @@
    {
      TRACER.debugInfo("In "
          + this.localReplicationServer.getMonitorInstanceName()
          + " baseDN=" + baseDn + " Compute degradation of serverId="
          + serverId + " LS server generation Id=" + handler.getGenerationId());
          + " baseDN=" + baseDn + " Compute degradation of serverId=" + serverId
          + " LS server generation Id=" + sHandler.getGenerationId());
    }
    return handler.getGenerationId() != this.generationId;
    return sHandler.getGenerationId() != this.generationId;
  }
  /**
   * Process topology information received from a peer RS.
   * @param topoMsg The just received topo message from remote RS
   * @param handler The handler that received the message.
   * @param rsHandler The handler that received the message.
   * @param allowResetGenId True for allowing to reset the generation id (
   * when called after initial handshake)
   * @throws IOException If an error occurred.
   * @throws DirectoryException If an error occurred.
   */
  public void receiveTopoInfoFromRS(TopologyMsg topoMsg,
      ReplicationServerHandler handler,
    boolean allowResetGenId)
    throws IOException, DirectoryException
      ReplicationServerHandler rsHandler, boolean allowResetGenId)
      throws IOException, DirectoryException
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In RS " + getLocalRSServerId()
          + " Receiving TopologyMsg from " + handler.getServerId()
          + " Receiving TopologyMsg from " + rsHandler.getServerId()
          + " for baseDn " + baseDn + ":\n" + topoMsg);
    }
@@ -2430,37 +2443,38 @@
    try
    {
      // Store DS connected to remote RS & update information about the peer RS
      handler.processTopoInfoFromRS(topoMsg);
      rsHandler.processTopoInfoFromRS(topoMsg);
      // Handle generation id
      if (allowResetGenId)
      {
        // Check if generation id has to be reseted
        // Check if generation id has to be reset
        mayResetGenerationId();
        if (generationId < 0)
        {
          generationId = handler.getGenerationId();
          generationId = rsHandler.getGenerationId();
        }
      }
      if (isDifferentGenerationId(handler.getGenerationId()))
      if (isDifferentGenerationId(rsHandler.getGenerationId()))
      {
        Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(
            handler.getServerId(), handler.session.getReadableRemoteAddress(),
            handler.getGenerationId(),
            rsHandler.getServerId(),
            rsHandler.session.getReadableRemoteAddress(),
            rsHandler.getGenerationId(),
            baseDn, getLocalRSServerId(), generationId);
        logError(message);
        ErrorMsg errorMsg =
            new ErrorMsg(getLocalRSServerId(), handler.getServerId(), message);
        handler.send(errorMsg);
        ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(),
            rsHandler.getServerId(), message);
        rsHandler.send(errorMsg);
      }
      /*
       * Sends the currently known topology information to every connected
       * DS we have.
       */
      buildAndSendTopoInfoToDSs(null);
      sendTopoInfoToAllDSsExcept(null);
    }
    catch(Exception e)
    {
@@ -2765,9 +2779,9 @@
   */
  public void setPurgeDelay(long delay)
  {
    for (DbHandler handler : sourceDbHandlers.values())
    for (DbHandler dbHandler : sourceDbHandlers.values())
    {
      handler.setPurgeDelay(delay);
      dbHandler.setPurgeDelay(delay);
    }
  }
@@ -2857,7 +2871,7 @@
  /**
   * Starts the status analyzer for the domain if not already started.
   */
  public void startStatusAnalyzer()
  private void startStatusAnalyzer()
  {
    int degradedStatusThreshold =
        localReplicationServer.getDegradedStatusThreshold();
@@ -2888,7 +2902,7 @@
  /**
   * Starts the monitoring publisher for the domain if not already started.
   */
  public void startMonitoringPublisher()
  private void startMonitoringPublisher()
  {
    long period = localReplicationServer.getMonitoringPublisherPeriod();
    if (period > 0) // 0 means no monitoring publisher
@@ -2967,21 +2981,21 @@
  /**
   * Register in the domain an handler that subscribes to changes.
   * @param handler the provided subscribing handler.
   * @param mHandler the provided subscribing handler.
   */
  public void registerHandler(MessageHandler handler)
  public void registerHandler(MessageHandler mHandler)
  {
    this.otherHandlers.add(handler);
    this.otherHandlers.add(mHandler);
  }
  /**
   * Unregister from the domain an handler.
   * @param handler the provided unsubscribing handler.
   * @param mHandler the provided unsubscribing handler.
   * @return Whether this handler has been unregistered with success.
   */
  public boolean unRegisterHandler(MessageHandler handler)
  public boolean unRegisterHandler(MessageHandler mHandler)
  {
    return this.otherHandlers.remove(handler);
    return this.otherHandlers.remove(mHandler);
  }
  /**
@@ -3357,7 +3371,7 @@
    {
      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
    }
    else if (getConnectedDSs().size() > 0)
    else if (connectedDSs.size() > 0)
    {
      // Requested to start analyzers with provided threshold value
      startStatusAnalyzer();
@@ -3384,10 +3398,45 @@
    {
      mpThread.setPeriod(period);
    }
    else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0)
    else if (connectedDSs.size() > 0 || connectedRSs.size() > 0)
    {
      // Requested to start monitoring publishers with provided period value
      startMonitoringPublisher();
    }
  }
  /**
   * Registers a DS handler into this domain and notifies the domain about the
   * new DS.
   *
   * @param dsHandler
   *          The Directory Server Handler to register
   */
  public void register(DataServerHandler dsHandler)
  {
    startStatusAnalyzer();
    startMonitoringPublisher();
    // connected with new DS: store handler.
    connectedDSs.put(dsHandler.getServerId(), dsHandler);
    // Tell peer RSs and DSs a new DS just connected to us
    // No need to re-send TopologyMsg to this just new DS
    sendTopoInfoToAllExcept(dsHandler);
  }
  /**
   * Registers the RS handler into this domain and notifies the domain.
   *
   * @param rsHandler
   *          The Replication Server Handler to register
   */
  public void register(ReplicationServerHandler rsHandler)
  {
    startMonitoringPublisher();
    // connected with new RS (either outgoing or incoming
    // connection): store handler.
    connectedRSs.put(rsHandler.getServerId(), rsHandler);
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -230,13 +230,11 @@
        logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg);
        replicationServerDomain.startMonitoringPublisher();
        /*
        FIXME: i think this should be done for all protocol version !!
        not only those > V1
        */
        registerIntoDomain();
        replicationServerDomain.register(this);
        /*
        Process TopologyMsg sent by remote RS: store matching new info
@@ -376,9 +374,7 @@
        */
      }
      replicationServerDomain.startMonitoringPublisher();
      registerIntoDomain();
      replicationServerDomain.register(this);
      // Process TopologyMsg sent by remote RS: store matching new info
      // (this will also warn our connected DSs of the new received info)
@@ -429,18 +425,6 @@
  }
  /**
   * Registers this handler into its related domain and notifies the domain.
   */
  private void registerIntoDomain()
  {
    // Alright, connected with new RS (either outgoing or incoming
    // connection): store handler.
    Map<Integer, ReplicationServerHandler> connectedRSs =
      replicationServerDomain.getConnectedRSs();
    connectedRSs.put(serverId, this);
  }
  /**
   * Wait receiving the TopologyMsg from the remote RS and process it.
   * @return the topologyMsg received or {@code null} if stop was received.
   * @throws DirectoryException
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -180,8 +180,7 @@
  private boolean isInterrupted(DataServerHandler serverHandler,
      StatusMachineEvent event)
  {
    if (replicationServerDomain.changeStatusFromStatusAnalyzer(serverHandler,
        event))
    if (replicationServerDomain.changeStatus(serverHandler, event))
    {
      // Finish job and let thread die
      TRACER.debugInfo(