mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
07.19.2009 ae408b6c09759f61754f3e7b39d5e5d6595c1fc4
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -141,8 +141,8 @@
 *   Full Initialization of a replica can be triggered by LDAP clients
 *   by creating InitializeTasks or InitializeTargetTask.
 *   Full initialization can also by triggered from the ReplicationDomain
 *   implementation using methods {@link #initializeRemote(short)}
 *   or {@link #initializeFromRemote(short)}.
 *   implementation using methods {@link #initializeRemote(int)}
 *   or {@link #initializeFromRemote(int)}.
 * <p>
 *   At shutdown time, the {@link #stopDomain()} method should be called to
 *   cleanly stop the replication service.
@@ -171,7 +171,7 @@
   * Replication Service.
   * Each Domain must use a unique ServerID.
   */
  private final short serverID;
  private final int serverID;
  /**
   * The ReplicationBroker that is used by this ReplicationDomain to
@@ -262,8 +262,8 @@
  // that have not been successfully acknowledged (either because of timeout,
  // wrong status or error at replay) for a particular server (DS or RS). String
  // format: <server id>:<number of failed updates>
  private Map<Short,Integer> assuredSrServerNotAcknowledgedUpdates =
    new HashMap<Short,Integer>();
  private Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
    new HashMap<Integer,Integer>();
  // Number of updates received in Assured Mode, Safe Read request
  private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
  // Number of updates received in Assured Mode, Safe Read request that we have
@@ -283,8 +283,8 @@
  // Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
  // that have not been successfully acknowledged because of timeout for a
  // particular RS. String format: <server id>:<number of failed updates>
  private Map<Short,Integer> assuredSdServerTimeoutUpdates =
    new HashMap<Short,Integer>();
  private Map<Integer, Integer> assuredSdServerTimeoutUpdates =
    new HashMap<Integer,Integer>();
  /* Status related monitoring fields */
@@ -316,8 +316,8 @@
   * A Map containing of the ServerStates of all the replicas in the topology
   * as seen by the ReplicationServer the last time it was polled.
   */
  private Map<Short, ServerState> replicaStates =
    new HashMap<Short, ServerState>();
  private HashMap<Integer, ServerState> replicaStates =
    new HashMap<Integer, ServerState>();
  Set<String> cfgEclIncludes = new HashSet<String>();
  Set<String>    eClIncludes = new HashSet<String>();
@@ -344,7 +344,7 @@
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   */
  public ReplicationDomain(String serviceID, short serverID)
  public ReplicationDomain(String serviceID, int serverID)
  {
    this.serviceID = serviceID;
    this.serverID = serverID;
@@ -366,7 +366,7 @@
   *                   is participating to a given Replication Domain.
   * @param serverState The serverState to use
   */
  public ReplicationDomain(String serviceID, short serverID,
  public ReplicationDomain(String serviceID, int serverID,
    ServerState serverState)
  {
    this.serviceID = serviceID;
@@ -405,7 +405,7 @@
    if (!isValidInitialStatus(initStatus))
    {
      Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
        serviceID, Short.toString(serverID));
        serviceID, Integer.toString(serverID));
      logError(msg);
    } else
    {
@@ -434,7 +434,7 @@
    if (event == StatusMachineEvent.INVALID_EVENT)
    {
      Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
        serviceID, Short.toString(serverID));
        serviceID, Integer.toString(serverID));
      logError(msg);
      return;
    }
@@ -503,7 +503,7 @@
   * Get the server ID.
   * @return The server ID.
   */
  public short getServerId()
  public int getServerId()
  {
    return serverID;
  }
@@ -575,7 +575,7 @@
   * Gets the States of all the Replicas currently in the
   * Topology.
   * When this method is called, a Monitoring message will be sent
   * to the Replication to which this domain is currently connected
   * to the Replication Server to which this domain is currently connected
   * so that it computes a table containing information about
   * all Directory Servers in the topology.
   * This Computation involves communications will all the servers
@@ -583,7 +583,7 @@
   *
   * @return The States of all Replicas in the topology (except us)
   */
  public Map<Short, ServerState> getReplicaStates()
  public Map<Integer, ServerState> getReplicaStates()
  {
    // publish Monitor Request Message to the Replication Server
    broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
@@ -619,7 +619,7 @@
   * @return The server ID of the Replication Server to which the domain
   *         is currently connected.
   */
  public short getRsServerId()
  public int getRsServerId()
  {
    return broker.getRsServerId();
  }
@@ -832,12 +832,12 @@
        {
          // This is the response to a MonitorRequest that was sent earlier
          // build the replicaStates Map.
          replicaStates = new HashMap<Short, ServerState>();
          replicaStates = new HashMap<Integer, ServerState>();
          MonitorMsg monitorMsg = (MonitorMsg) msg;
          Iterator<Short> it = monitorMsg.ldapIterator();
          Iterator<Integer> it = monitorMsg.ldapIterator();
          while (it.hasNext())
          {
            short serverId = it.next();
            int serverId = it.next();
            replicaStates.put(
                serverId, monitorMsg.getLDAPServerState(serverId));
          }
@@ -886,26 +886,26 @@
   * passed server, or creates an initial value of 1 error for it if the server
   * is not yet present in the map.
   * @param errorList
   * @param serverId
   * @param sid
   */
  private void updateAssuredErrorsByServer(Map<Short,Integer> errorsByServer,
    Short serverId)
  private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer,
    Integer sid)
  {
    synchronized (errorsByServer)
    {
      Integer serverErrCount = errorsByServer.get(serverId);
      Integer serverErrCount = errorsByServer.get(sid);
      if (serverErrCount == null)
      {
        // Server not present in list, create an entry with an
        // initial number of errors set to 1
        errorsByServer.put(serverId, 1);
        errorsByServer.put(sid, 1);
      } else
      {
        // Server already present in list, just increment number of
        // errors for the server
        int val = serverErrCount.intValue();
        val++;
        errorsByServer.put(serverId, val);
        errorsByServer.put(sid, val);
      }
    }
  }
@@ -946,11 +946,12 @@
      {
        // Some problems detected: message not correclty reached every requested
        // servers. Log problem
        Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(serviceID,
          Short.toString(serverID), update.toString(), ack.errorsToString());
        Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(
            serviceID, Integer.toString(serverID),
            update.toString(), ack.errorsToString());
        logError(errorMsg);
        List<Short> failedServers = ack.getFailedServers();
        List<Integer> failedServers = ack.getFailedServers();
        // Increment assured replication monitoring counters
        switch (updateAssuredMode)
@@ -965,7 +966,7 @@
              assuredSrWrongStatusUpdates.incrementAndGet();
            if (failedServers != null) // This should always be the case !
            {
              for(Short sid : failedServers)
              for(Integer sid : failedServers)
              {
                updateAssuredErrorsByServer(
                  assuredSrServerNotAcknowledgedUpdates, sid);
@@ -978,7 +979,7 @@
              assuredSdTimeoutUpdates.incrementAndGet();
            if (failedServers != null) // This should always be the case !
            {
              for(Short sid : failedServers)
              for(Integer sid : failedServers)
              {
                updateAssuredErrorsByServer(
                  assuredSdServerTimeoutUpdates, sid);
@@ -1040,17 +1041,17 @@
  private class ExportThread extends DirectoryThread
  {
    // Id of server that will receive updates
    private short target;
    private int target;
    /**
     * Constructor for the ExportThread.
     *
     * @param target Id of server that will receive updates
     * @param i Id of server that will receive updates
     */
    public ExportThread(short target)
    public ExportThread(int i)
    {
      super("Export thread " + serverID);
      this.target = target;
      this.target = i;
    }
    /**
@@ -1085,12 +1086,12 @@
   */
  protected class IEContext
  {
    // Theprivate task that initiated the operation.
    // The private task that initiated the operation.
    Task initializeTask;
    // The destination in the case of an export
    short exportTarget = RoutableMsg.UNKNOWN_SERVER;
    int exportTarget = RoutableMsg.UNKNOWN_SERVER;
    // The source in the case of an import
    short importSource = RoutableMsg.UNKNOWN_SERVER;
    int importSource = RoutableMsg.UNKNOWN_SERVER;
    // The total entry count expected to be processed
    long entryCount = 0;
@@ -1207,7 +1208,7 @@
     * Gets the server id of the exporting server.
     * @return the server id of the exporting server.
     */
    public short getExportTarget()
    public int getExportTarget()
    {
      return exportTarget;
    }
@@ -1216,7 +1217,7 @@
     * Gets the server id of the importing server.
     * @return the server id of the importing server.
     */
    public short getImportSource()
    public int getImportSource()
    {
      return importSource;
    }
@@ -1247,10 +1248,10 @@
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeTarget(String targetString)
  public int decodeTarget(String targetString)
  throws DirectoryException
  {
    short  target = 0;
    int  target = 0;
    Throwable cause;
    if (targetString.equalsIgnoreCase("all"))
    {
@@ -1260,7 +1261,7 @@
    // So should be a serverID
    try
    {
      target = Integer.decode(targetString).shortValue();
      target = Integer.decode(targetString);
      if (target >= 0)
      {
        // FIXME Could we check now that it is a know server in the domain ?
@@ -1302,7 +1303,7 @@
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeRemote(short target, Task initTask)
  public void initializeRemote(int target, Task initTask)
  throws DirectoryException
  {
    initializeRemote(target, serverID, initTask);
@@ -1341,19 +1342,19 @@
   * server that requests the initialization.
   *
   * @param target The target that should be initialized.
   * @param requestorID The server that initiated the export.
   * @param target2 The server that initiated the export.
   * @param initTask The task that triggers this initialization and that should
   *  be updated with its progress.
   *
   * @exception DirectoryException When an error occurs.
   */
  protected void initializeRemote(short target, short requestorID,
  protected void initializeRemote(int target, int target2,
    Task initTask) throws DirectoryException
  {
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
      Short.toString(serverID),
        Integer.toString(serverID),
      serviceID,
      Short.toString(requestorID));
      Integer.toString(target2));
    logError(msg);
    boolean contextAcquired=false;
@@ -1376,7 +1377,7 @@
    // Send start message to the peer
    InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
        serviceID, serverID, target, requestorID, entryCount);
        serviceID, serverID, target, target2, entryCount);
    broker.publish(initializeMessage);
@@ -1406,9 +1407,9 @@
    }
    msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
      Short.toString(serverID),
        Integer.toString(serverID),
      serviceID,
      Short.toString(requestorID));
      Integer.toString(target2));
    logError(msg);
  }
@@ -1630,7 +1631,7 @@
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeFromRemote(short source)
  public void initializeFromRemote(int source)
  throws DirectoryException
  {
    initializeFromRemote(source, null);
@@ -1653,7 +1654,7 @@
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeRemote(short target) throws DirectoryException
  public void initializeRemote(int target) throws DirectoryException
  {
    initializeRemote(target, null);
  }
@@ -1680,7 +1681,7 @@
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeFromRemote(short source, Task initTask)
  public void initializeFromRemote(int source, Task initTask)
  throws DirectoryException
  {
    if (debugEnabled())
@@ -1722,7 +1723,7 @@
    DirectoryException de = null;
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
      Short.toString(serverID),
        Integer.toString(serverID),
      serviceID,
      Long.toString(initializeMessage.getRequestorID()));
    logError(msg);
@@ -1776,7 +1777,7 @@
    }
    msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
      Short.toString(serverID),
        Integer.toString(serverID),
      serviceID,
      Long.toString(initializeMessage.getRequestorID()));
    logError(msg);
@@ -1795,7 +1796,7 @@
    if (newStatus == ServerStatus.INVALID_STATUS)
    {
      Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(serviceID,
        Short.toString(serverID), status.toString(), event.toString());
          Integer.toString(serverID), status.toString(), event.toString());
      logError(msg);
      return;
    }
@@ -1840,20 +1841,17 @@
   * @throws DirectoryException When the generation ID of the Replication
   *                            Servers is not the expected value.
   */
  private void checkGenerationID(long generationID) throws DirectoryException
  private void checkGenerationID(long generationID)
  throws DirectoryException
  {
    boolean flag = false;
    boolean allset = true;
    for (int i = 0; i< 10; i++)
    {
      allset = true;
      for (RSInfo rsInfo : getRsList())
      {
        if (rsInfo.getGenerationId() == generationID)
        {
          flag = true;
          break;
        }
        else
        if (rsInfo.getGenerationId() != generationID)
        {
          try
          {
@@ -1861,15 +1859,17 @@
          } catch (InterruptedException e)
          {
          }
          allset = false;
          break;
        }
      }
      if (flag)
      if (allset)
      {
        break;
      }
    }
    if (!flag)
    if (!allset)
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
@@ -2169,15 +2169,15 @@
   * @return The number of updates sent in assured safe read mode that have not
   * been acknowledged per server.
   */
  public Map<Short, Integer> getAssuredSrServerNotAcknowledgedUpdates()
  public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates()
  {
    // Clone a snapshot with synchronized section to have a consistent view in
    // monitoring
    Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
    Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>();
    synchronized(assuredSrServerNotAcknowledgedUpdates)
    {
      Set<Short> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
      for (Short serverId : keySet)
      Set<Integer> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
      for (Integer serverId : keySet)
      {
        Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
        snapshot.put(serverId, i);
@@ -2254,15 +2254,15 @@
   * @return The number of updates sent in assured safe data mode that have not
   * been acknowledged due to timeout error per server.
   */
  public Map<Short, Integer> getAssuredSdServerTimeoutUpdates()
  public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates()
  {
    // Clone a snapshot with synchronized section to have a consistent view in
    // monitoring
    Map<Short, Integer> snapshot = new HashMap<Short, Integer>();
    Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>();
    synchronized(assuredSdServerTimeoutUpdates)
    {
      Set<Short> keySet = assuredSdServerTimeoutUpdates.keySet();
      for (Short serverId : keySet)
      Set<Integer> keySet = assuredSdServerTimeoutUpdates.keySet();
      for (Integer serverId : keySet)
      {
        Integer i = assuredSdServerTimeoutUpdates.get(serverId);
        snapshot.put(serverId, i);
@@ -2295,14 +2295,14 @@
    assuredSrTimeoutUpdates = new AtomicInteger(0);
    assuredSrWrongStatusUpdates = new AtomicInteger(0);
    assuredSrReplayErrorUpdates = new AtomicInteger(0);
    assuredSrServerNotAcknowledgedUpdates = new HashMap<Short,Integer>();
    assuredSrServerNotAcknowledgedUpdates = new HashMap<Integer,Integer>();
    assuredSrReceivedUpdates = new AtomicInteger(0);
    assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
    assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
    assuredSdSentUpdates = new AtomicInteger(0);
    assuredSdAcknowledgedUpdates = new AtomicInteger(0);
    assuredSdTimeoutUpdates = new AtomicInteger(0);
    assuredSdServerTimeoutUpdates = new HashMap<Short,Integer>();
    assuredSdServerTimeoutUpdates = new HashMap<Integer,Integer>();
  }
  /*
@@ -2605,7 +2605,7 @@
              //   -> replay error occured
              ackMsg.setHasReplayError(true);
              //   -> replay error occured in our server
              List<Short> idList = new ArrayList<Short>();
              List<Integer> idList = new ArrayList<Integer>();
              idList.add(serverID);
              ackMsg.setFailedServers(idList);
            }
@@ -2621,7 +2621,7 @@
        } else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
        {
          Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get(
            Short.toString(serverID), msgAssuredMode.toString(), serviceID,
              Integer.toString(serverID), msgAssuredMode.toString(), serviceID,
            msg.toString());
          logError(errorMsg);
        } else