mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
07.19.2009 ae408b6c09759f61754f3e7b39d5e5d6595c1fc4
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -120,8 +120,8 @@
   * to this replication server.
   *
   */
  private final Map<Short, DataServerHandler> directoryServers =
    new ConcurrentHashMap<Short, DataServerHandler>();
  private final Map<Integer, DataServerHandler> directoryServers =
    new ConcurrentHashMap<Integer, DataServerHandler>();
  /*
   * This map contains one ServerHandler for each replication servers
@@ -132,8 +132,8 @@
   * We add new TreeSet in the HashMap when a new replication server register
   * to this replication server.
   */
  private final Map<Short, ReplicationServerHandler> replicationServers =
    new ConcurrentHashMap<Short, ReplicationServerHandler>();
  private final Map<Integer, ReplicationServerHandler> replicationServers =
    new ConcurrentHashMap<Integer, ReplicationServerHandler>();
  private final ConcurrentLinkedQueue<MessageHandler> otherHandlers =
    new ConcurrentLinkedQueue<MessageHandler>();
@@ -142,8 +142,8 @@
   * This map contains the List of updates received from each
   * LDAP server
   */
  private final Map<Short, DbHandler> sourceDbHandlers =
    new ConcurrentHashMap<Short, DbHandler>();
  private final Map<Integer, DbHandler> sourceDbHandlers =
    new ConcurrentHashMap<Integer, DbHandler>();
  private ReplicationServer replicationServer;
  // GenerationId management
@@ -218,7 +218,7 @@
  {
    ChangeNumber cn = update.getChangeNumber();
    short id = cn.getServerId();
    int id = cn.getServerId();
    sourceHandler.updateServerState(update);
    sourceHandler.incrementInCount();
@@ -268,7 +268,7 @@
        {
          // Unknown assured mode: should never happen
          Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
            Short.toString(replicationServer.getServerId()),
            Integer.toString(replicationServer.getServerId()),
            assuredMode.toString(), baseDn, update.toString());
          logError(errorMsg);
          assuredMessage = false;
@@ -313,7 +313,7 @@
    // Publish the messages to the source handler
    dbHandler.add(update);
    List<Short> expectedServers = null;
    List<Integer> expectedServers = null;
    if (assuredMessage)
    {
      expectedServers = preparedAssuredInfo.expectedServers;
@@ -374,7 +374,7 @@
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " will not be sent to replication server " +
              Short.toString(handler.getServerId()) + " with generation id " +
              Integer.toString(handler.getServerId()) + " with generation id " +
              Long.toString(handler.getGenerationId()) +
              " different from local " +
              "generation id " + Long.toString(generationId));
@@ -439,7 +439,7 @@
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " will not be sent to directory server " +
              Short.toString(handler.getServerId()) + " with generation id " +
              Integer.toString(handler.getServerId()) + " with generation id " +
              Long.toString(handler.getGenerationId()) +
              " different from local " +
              "generation id " + Long.toString(generationId));
@@ -449,7 +449,7 @@
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " will not be sent to directory server " +
              Short.toString(handler.getServerId()) +
              Integer.toString(handler.getServerId()) +
              " as it is in full update");
        }
@@ -505,7 +505,7 @@
       * request.
       *
       */
      public List<Short> expectedServers = null;
      public List<Integer> expectedServers = null;
      /**
       * The constructed ExpectedAcksInfo object to be used when acks will be
@@ -535,8 +535,8 @@
    ChangeNumber cn = update.getChangeNumber();
    byte groupId = replicationServer.getGroupId();
    byte sourceGroupId = sourceHandler.getGroupId();
    List<Short> expectedServers = new ArrayList<Short>();
    List<Short> wrongStatusServers = new ArrayList<Short>();
    List<Integer> expectedServers = new ArrayList<Integer>();
    List<Integer> wrongStatusServers = new ArrayList<Integer>();
    if (sourceGroupId == groupId)
      // Assured feature does not cross different group ids
@@ -643,7 +643,7 @@
    {
      // Should never happen
      Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get(
        Short.toString(replicationServer.getServerId()),
        Integer.toString(replicationServer.getServerId()),
        Byte.toString(safeDataLevel), baseDn, update.toString());
      logError(errorMsg);
    } else if (sourceGroupId != groupId)
@@ -705,7 +705,7 @@
      }
    }
    List<Short> expectedServers = new ArrayList<Short>();
    List<Integer> expectedServers = new ArrayList<Integer>();
    if (interestedInAcks)
    {
      if (sourceHandler.isDataServer())
@@ -805,8 +805,9 @@
             */
            MessageBuilder mb = new MessageBuilder();
            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
              Short.toString(replicationServer.getServerId()),
              Short.toString(origServer.getServerId()), cn.toString(), baseDn));
              Integer.toString(replicationServer.getServerId()),
              Integer.toString(origServer.getServerId()),
              cn.toString(), baseDn));
            mb.append(stackTraceToSingleLineString(e));
            logError(mb.toMessage());
            stopServer(origServer);
@@ -867,11 +868,11 @@
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          if (debugEnabled())
            TRACER.debugInfo(
              "In RS " + Short.toString(replicationServer.getServerId()) +
              "In RS " + Integer.toString(replicationServer.getServerId()) +
              " for " + baseDn +
              ", sending timeout for assured update with change " + " number " +
              cn.toString() + " to server id " +
              Short.toString(origServer.getServerId()));
              Integer.toString(origServer.getServerId()));
          try
          {
            origServer.sendAck(finalAck);
@@ -883,8 +884,9 @@
             */
            MessageBuilder mb = new MessageBuilder();
            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
              Short.toString(replicationServer.getServerId()),
              Short.toString(origServer.getServerId()), cn.toString(), baseDn));
                Integer.toString(replicationServer.getServerId()),
                Integer.toString(origServer.getServerId()),
                cn.toString(), baseDn));
            mb.append(stackTraceToSingleLineString(e));
            logError(mb.toMessage());
            stopServer(origServer);
@@ -904,8 +906,8 @@
            }
          }
          //   retrieve expected servers in timeout to increment their counter
          List<Short> serversInTimeout = expectedAcksInfo.getTimeoutServers();
          for (Short serverId : serversInTimeout)
          List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers();
          for (Integer serverId : serversInTimeout)
          {
            ServerHandler expectedServerInTimeout =
              directoryServers.get(serverId);
@@ -954,7 +956,7 @@
   *
   * @param serverId the serverId to be checked.
   */
  public void waitDisconnection(short serverId)
  public void waitDisconnection(int serverId)
  {
    if (directoryServers.containsKey(serverId))
    {
@@ -1327,7 +1329,7 @@
   * whatever directly connected of connected to another RS.
   * @return a set containing the servers known by this replicationServer.
   */
  public Set<Short> getServers()
  public Set<Integer> getServers()
  {
    return sourceDbHandlers.keySet();
  }
@@ -1360,7 +1362,7 @@
   * @return the created ReplicationIterator. Null when no DB is available
   * for the provided server Id.
   */
  public ReplicationIterator getChangelogIterator(short serverId,
  public ReplicationIterator getChangelogIterator(int serverId,
    ChangeNumber changeNumber)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
@@ -1393,7 +1395,7 @@
   * @return the created ReplicationIterator. Null when no DB is available
   * for the provided server Id.
   */
  public ReplicationIterator getIterator(short serverId,
  public ReplicationIterator getIterator(int serverId,
    ChangeNumber changeNumber)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
@@ -1442,7 +1444,7 @@
   *
   * @throws DatabaseException If a database error happened.
   */
  public void setDbHandler(short serverId, DbHandler dbHandler)
  public void setDbHandler(int serverId, DbHandler dbHandler)
    throws DatabaseException
  {
    synchronized (sourceDbHandlers)
@@ -1576,10 +1578,10 @@
            // Add the informations about the Replicas currently in
            // the topology.
            Iterator<Short> it = md.ldapIterator();
            Iterator<Integer> it = md.ldapIterator();
            while (it.hasNext())
            {
              short replicaId = it.next();
              int replicaId = it.next();
              returnMsg.setServerState(
                  replicaId, md.getLDAPServerState(replicaId),
                  md.getApproxFirstMissingDate(replicaId), true);
@@ -1590,7 +1592,7 @@
            it = md.rsIterator();
            while (it.hasNext())
            {
              short replicaId = it.next();
              int replicaId = it.next();
              returnMsg.setServerState(
                  replicaId, md.getRSStates(replicaId),
                  md.getRSApproxFirstMissingDate(replicaId), false);
@@ -1661,7 +1663,7 @@
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
            Short.toString((msg.getDestination()))));
              Integer.toString((msg.getDestination()))));
        }
      } else if (msg instanceof MonitorMsg)
      {
@@ -1849,7 +1851,8 @@
          {
            Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
            baseDn.toString(),
            "directory", Short.toString(handler.getServerId()), e.getMessage());
            "directory", Integer.toString(handler.getServerId()),
            e.getMessage());
            logError(message);
          }
        }
@@ -1875,7 +1878,7 @@
        {
          Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
            baseDn.toString(),
            "replication", Short.toString(handler.getServerId()),
            "replication", Integer.toString(handler.getServerId()),
            e.getMessage());
          logError(message);
        }
@@ -1918,7 +1921,7 @@
   * that we must not include in the list DS list.
   * @return A suitable TopologyMsg PDU to be sent to a peer DS
   */
  public TopologyMsg createTopologyMsgForDS(short destDsId)
  public TopologyMsg createTopologyMsgForDS(int destDsId)
  {
    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
@@ -2083,7 +2086,7 @@
      {
        logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn.
          toString(),
          Short.toString(dsHandler.getServerId()),
          Integer.toString(dsHandler.getServerId()),
          e.getMessage()));
      }
    }
@@ -2144,7 +2147,7 @@
    buildAndSendTopoInfoToRSs();
    Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
      Short.toString(senderHandler.getServerId()),
        Integer.toString(senderHandler.getServerId()),
      baseDn.toString(),
      newStatus.toString());
    logError(message);
@@ -2198,7 +2201,7 @@
    {
      logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER.get(baseDn.
        toString(),
        Short.toString(serverHandler.getServerId()),
        Integer.toString(serverHandler.getServerId()),
        e.getMessage()));
    }
@@ -2264,7 +2267,7 @@
   *                 the state.
   * @return Whether it is degraded or not.
   */
  public boolean isDegradedDueToGenerationId(short serverId)
  public boolean isDegradedDueToGenerationId(int serverId)
  {
    if (debugEnabled())
      TRACER.debugInfo(
@@ -2354,7 +2357,7 @@
    {
      Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
        baseDn,
        Short.toString(handler.getServerId()),
        Integer.toString(handler.getServerId()),
        Long.toString(handler.getGenerationId()),
        Long.toString(generationId));
      logError(message);
@@ -2423,7 +2426,7 @@
      for (ServerHandler directlsh : directoryServers.values())
      {
        short serverID = directlsh.getServerId();
        int serverID = directlsh.getServerId();
        // the state comes from the state stored in the SH
        ServerState directlshState = directlsh.getServerState().duplicate();
@@ -2446,10 +2449,10 @@
      // - whatever they are directly or undirectly connected
      ServerState dbServerState = getDbServerState();
      wrkMonitorData.setRSState(replicationServer.getServerId(), dbServerState);
      Iterator<Short> it = dbServerState.iterator();
      Iterator<Integer> it = dbServerState.iterator();
      while (it.hasNext())
      {
        short sid = it.next();
        int sid = it.next();
        ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
        wrkMonitorData.setMaxCN(sid, storedCN);
      }
@@ -2535,7 +2538,7 @@
          // This is a response for an earlier request whose computing is
          // already complete.
          logError(NOTE_IGNORING_REMOTE_MONITOR_DATA.get(
                      Short.toString(msg.getsenderID())));
              Integer.toString(msg.getsenderID())));
          return;
        }
        // Here is the RS state : list <serverID, lastChangeNumber>
@@ -2547,10 +2550,10 @@
        wrkMonitorData.setRSState(msg.getsenderID(), replServerState);
        // Store the remote LDAP servers states
        Iterator<Short> lsidIterator = msg.ldapIterator();
        Iterator<Integer> lsidIterator = msg.ldapIterator();
        while (lsidIterator.hasNext())
        {
          short sid = lsidIterator.next();
          int sid = lsidIterator.next();
          ServerState dsServerState = msg.getLDAPServerState(sid);
          wrkMonitorData.setMaxCNs(dsServerState);
          wrkMonitorData.setLDAPServerState(sid, dsServerState);
@@ -2560,17 +2563,17 @@
        // Process the latency reported by the remote RSi on its connections
        // to the other RSes
        Iterator<Short> rsidIterator = msg.rsIterator();
        Iterator<Integer> rsidIterator = msg.rsIterator();
        while (rsidIterator.hasNext())
        {
          short rsid = rsidIterator.next();
          int rsid = rsidIterator.next();
          if (rsid == replicationServer.getServerId())
          {
            // this is the latency of the remote RSi regarding the current RS
            // let's update the fmd of my connected LS
            for (ServerHandler connectedlsh : directoryServers.values())
            {
              short connectedlsid = connectedlsh.getServerId();
              int connectedlsid = connectedlsh.getServerId();
              Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
              wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
            }
@@ -2581,7 +2584,7 @@
            ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
            if (rsjHdr != null)
            {
              for (short remotelsid : rsjHdr.getConnectedDirectoryServerIds())
              for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds())
              {
                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
                wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
@@ -2633,7 +2636,7 @@
   * Get the map of connected DSs.
   * @return The map of connected DSs
   */
  public Map<Short, DataServerHandler> getConnectedDSs()
  public Map<Integer, DataServerHandler> getConnectedDSs()
  {
    return directoryServers;
  }
@@ -2642,7 +2645,7 @@
   * Get the map of connected RSs.
   * @return The map of connected RSs
   */
  public Map<Short, ReplicationServerHandler> getConnectedRSs()
  public Map<Integer, ReplicationServerHandler> getConnectedRSs()
  {
    return replicationServers;
  }
@@ -2894,10 +2897,10 @@
    // compute eligible CN
    ServerState hbState = heartbeatState.duplicate();
    Iterator<Short> it = hbState.iterator();
    Iterator<Integer> it = hbState.iterator();
    while (it.hasNext())
    {
      short sid = it.next();
      int sid = it.next();
      ChangeNumber storedCN = hbState.getMaxChangeNumber(sid);
      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
@@ -2940,10 +2943,10 @@
    if (eligibleCN != null)
    {
      Iterator<Short> it = dbState.iterator();
      Iterator<Integer> it = dbState.iterator();
      while (it.hasNext())
      {
        Short sid = it.next();
        int sid = it.next();
        DbHandler h = sourceDbHandlers.get(sid);
        ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
        try
@@ -3017,7 +3020,7 @@
    for (DbHandler db : sourceDbHandlers.values())
    {
      // Consider this producer (DS/db).
      short sid = db.getServerId();
      int sid = db.getServerId();
      ChangeNumber changelogLastCN = db.getLastChange();
      if (changelogLastCN != null)
@@ -3135,11 +3138,11 @@
    // Parses the dbState of the domain , server by server
    ServerState dbState = this.getDbServerState();
    Iterator<Short> it = dbState.iterator();
    Iterator<Integer> it = dbState.iterator();
    while (it.hasNext())
    {
      // for each server
      Short sid = it.next();
      int sid = it.next();
      DbHandler h = sourceDbHandlers.get(sid);
      try