mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.57.2013 157717b205d4c1f957cf810e04e06f11530c619c
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -136,7 +136,7 @@
   * The needed info for each received assured update message we are waiting
   * acks for.
   * <p>
   * Key: a change number matching a received update message which requested
   * Key: a CSN matching a received update message which requested
   * assured mode usage (either safe read or safe data mode)
   * <p>
   * Value: The object holding every info needed about the already received acks
@@ -145,8 +145,8 @@
   * @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub
   *      classes javadoc.
   */
  private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks =
    new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>();
  private final Map<CSN, ExpectedAcksInfo> waitingAcks =
    new ConcurrentHashMap<CSN, ExpectedAcksInfo>();
  /**
   * The timer used to run the timeout code (timer tasks) for the assured update
@@ -193,8 +193,8 @@
  public void put(UpdateMsg update, ServerHandler sourceHandler)
    throws IOException
  {
    ChangeNumber cn = update.getChangeNumber();
    int serverId = cn.getServerId();
    CSN csn = update.getCSN();
    int serverId = csn.getServerId();
    sourceHandler.updateServerState(update);
    sourceHandler.incrementInCount();
@@ -271,11 +271,11 @@
        // The following timer will time out and send an timeout ack to the
        // requester if the acks are not received in time. The timer will also
        // remove the object from this map.
        waitingAcks.put(cn, preparedAssuredInfo.expectedAcksInfo);
        waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
        // Arm timer for this assured update message (wait for acks until it
        // times out)
        AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn);
        AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
        assuredTimeoutTimer.schedule(assuredTimeoutTask,
            localReplicationServer.getAssuredTimeout());
        // Purge timer every 100 treated messages
@@ -319,7 +319,7 @@
        {
          if (debugEnabled())
          {
            debug("update " + update.getChangeNumber()
            debug("update " + update.getCSN()
                + " will not be sent to replication server "
                + rsHandler.getServerId() + " with generation id "
                + rsHandler.getGenerationId() + " different from local "
@@ -362,7 +362,7 @@
        {
          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
          {
            debug("update " + update.getChangeNumber()
            debug("update " + update.getCSN()
                + " will not be sent to directory server "
                + dsHandler.getServerId() + " with generation id "
                + dsHandler.getGenerationId() + " different from local "
@@ -370,7 +370,7 @@
          }
          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
          {
            debug("update " + update.getChangeNumber()
            debug("update " + update.getCSN()
                + " will not be sent to directory server "
                + dsHandler.getServerId() + " as it is in full update");
          }
@@ -499,7 +499,7 @@
  private PreparedAssuredInfo processSafeReadUpdateMsg(
    UpdateMsg update, ServerHandler sourceHandler) throws IOException
  {
    ChangeNumber cn = update.getChangeNumber();
    CSN csn = update.getCSN();
    byte groupId = localReplicationServer.getGroupId();
    byte sourceGroupId = sourceHandler.getGroupId();
    List<Integer> expectedServers = new ArrayList<Integer>();
@@ -550,7 +550,7 @@
    if (expectedServers.size() > 0)
    {
      // Some other acks to wait for
      preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(cn,
      preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(csn,
        sourceHandler, expectedServers, wrongStatusServers);
      preparedAssuredInfo.expectedServers = expectedServers;
    }
@@ -558,7 +558,7 @@
    if (preparedAssuredInfo.expectedServers == null)
    {
      // No eligible servers found, send the ack immediately
      sourceHandler.send(new AckMsg(cn));
      sourceHandler.send(new AckMsg(csn));
    }
    return preparedAssuredInfo;
@@ -582,7 +582,7 @@
  private PreparedAssuredInfo processSafeDataUpdateMsg(
    UpdateMsg update, ServerHandler sourceHandler) throws IOException
  {
    ChangeNumber cn = update.getChangeNumber();
    CSN csn = update.getCSN();
    boolean interestedInAcks = false;
    byte safeDataLevel = update.getSafeDataLevel();
    byte groupId = localReplicationServer.getGroupId();
@@ -608,7 +608,7 @@
             * mode with safe data level 1, coming from a DS. No need to wait
             * for more acks
             */
            sourceHandler.send(new AckMsg(cn));
            sourceHandler.send(new AckMsg(csn));
          } else
          {
            /**
@@ -628,7 +628,7 @@
           */
          if (safeDataLevel > (byte) 1)
          {
            sourceHandler.send(new AckMsg(cn));
            sourceHandler.send(new AckMsg(csn));
          }
        }
    }
@@ -656,14 +656,14 @@
        byte finalSdl = (nExpectedServers >= neededAdditionalServers) ?
          (byte)sdl : // Keep level as it was
          (byte)(nExpectedServers+1); // Change level to match what's available
        preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
        preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(csn,
          sourceHandler, finalSdl, expectedServers);
        preparedAssuredInfo.expectedServers = expectedServers;
      } else
      {
        // level > 1 and source is a DS but no eligible servers found, send the
        // ack immediately
        sourceHandler.send(new AckMsg(cn));
        sourceHandler.send(new AckMsg(csn));
      }
    }
@@ -706,8 +706,8 @@
  {
    // Retrieve the expected acks info for the update matching the original
    // sent update.
    ChangeNumber cn = ack.getChangeNumber();
    ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
    CSN csn = ack.getCSN();
    ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn);
    if (expectedAcksInfo != null)
    {
@@ -728,7 +728,7 @@
        if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
        {
          // Remove the object from the map as no more needed
          waitingAcks.remove(cn);
          waitingAcks.remove(csn);
          AckMsg finalAck = expectedAcksInfo.createAck(false);
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          try
@@ -744,7 +744,7 @@
            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
              Integer.toString(localReplicationServer.getServerId()),
              Integer.toString(origServer.getServerId()),
              cn.toString(), baseDn));
              csn.toString(), baseDn));
            mb.append(stackTraceToSingleLineString(e));
            logError(mb.toMessage());
            stopServer(origServer, false);
@@ -755,7 +755,7 @@
        }
      }
    }
    /* Else the timeout occurred for the update matching this change number
    /* Else the timeout occurred for the update matching this CSN
     * and the ack with timeout error has probably already been sent.
     */
  }
@@ -767,15 +767,15 @@
   */
  private class AssuredTimeoutTask extends TimerTask
  {
    private ChangeNumber cn = null;
    private CSN csn = null;
    /**
     * Constructor for the timer task.
     * @param cn The changeNumber of the assured update we are waiting acks for
     * @param csn The CSN of the assured update we are waiting acks for
     */
    public AssuredTimeoutTask(ChangeNumber cn)
    public AssuredTimeoutTask(CSN csn)
    {
      this.cn = cn;
      this.csn = csn;
    }
    /**
@@ -785,7 +785,7 @@
    @Override
    public void run()
    {
      ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
      ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn);
      if (expectedAcksInfo != null)
      {
@@ -798,14 +798,14 @@
            return;
          }
          // Remove the object from the map as no more needed
          waitingAcks.remove(cn);
          waitingAcks.remove(csn);
          // Create the timeout ack and send him to the server the assured
          // update message came from
          AckMsg finalAck = expectedAcksInfo.createAck(true);
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          if (debugEnabled())
          {
            debug("sending timeout for assured update with change number " + cn
            debug("sending timeout for assured update with CSN " + csn
                + " to serverId=" + origServer.getServerId());
          }
          try
@@ -821,7 +821,7 @@
            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
                Integer.toString(localReplicationServer.getServerId()),
                Integer.toString(origServer.getServerId()),
                cn.toString(), baseDn));
                csn.toString(), baseDn));
            mb.append(stackTraceToSingleLineString(e));
            logError(mb.toMessage());
            stopServer(origServer, false);
@@ -1268,13 +1268,12 @@
   *
   * @param serverId
   *          Identifier of the server for which the cursor is created.
   * @param startAfterCN
   * @param startAfterCSN
   *          Starting point for the cursor.
   * @return the created {@link ReplicaDBCursor}. Null when no DB is
   *         available or the DB is empty for the provided serverId .
   * @return the created {@link ReplicaDBCursor}. Null when no DB is available
   *         or the DB is empty for the provided serverId .
   */
  public ReplicaDBCursor getCursorFrom(int serverId,
      ChangeNumber startAfterCN)
  public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
  {
    DbHandler dbHandler = sourceDbHandlers.get(serverId);
    if (dbHandler == null)
@@ -1285,7 +1284,7 @@
    ReplicaDBCursor cursor;
    try
    {
      cursor = dbHandler.generateCursorFrom(startAfterCN);
      cursor = dbHandler.generateCursorFrom(startAfterCSN);
    }
    catch (Exception e)
    {
@@ -1303,13 +1302,13 @@
 /**
  * Count the number of changes in the replication changelog for the provided
  * serverID, between 2 provided changenumbers.
  * serverID, between 2 provided CSNs.
  * @param serverId Identifier of the server for which to compute the count.
  * @param from lower limit changenumber.
  * @param to   upper limit changenumber.
  * @param from lower limit CSN.
  * @param to   upper limit CSN.
  * @return the number of changes.
  */
  public long getCount(int serverId, ChangeNumber from, ChangeNumber to)
  public long getCount(int serverId, CSN from, CSN to)
  {
    DbHandler dbHandler = sourceDbHandlers.get(serverId);
    if (dbHandler != null)
@@ -2646,57 +2645,59 @@
   * <pre>
   *     s1               s2          s3
   *     --               --          --
   *                                 cn31
   *     cn15
   *                                 csn31
   *     csn15
   *
   *  ----------------------------------------- eligibleCN
   *     cn14
   *                     cn26
   *     cn13
   *  ----------------------------------------- eligibleCSN
   *     csn14
   *                     csn26
   *     csn13
   * </pre>
   *
   * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
   * The eligibleState is : s1;csn14 / s2;csn26 / s3;csn31
   *
   * @param eligibleCN              The provided eligibleCN.
   * @param eligibleCSN
   *          The provided eligible CSN.
   * @return The computed eligible server state.
   */
  public ServerState getEligibleState(ChangeNumber eligibleCN)
  public ServerState getEligibleState(CSN eligibleCSN)
  {
    ServerState dbState = getDbServerState();
    // The result is initialized from the dbState.
    // From it, we don't want to keep the changes newer than eligibleCN.
    // From it, we don't want to keep the changes newer than eligibleCSN.
    ServerState result = dbState.duplicate();
    if (eligibleCN != null)
    if (eligibleCSN != null)
    {
      for (int serverId : dbState)
      {
        DbHandler h = sourceDbHandlers.get(serverId);
        ChangeNumber mostRecentDbCN = dbState.getChangeNumber(serverId);
        CSN mostRecentDbCSN = dbState.getCSN(serverId);
        try {
          // Is the most recent change in the Db newer than eligible CN ?
          // if yes (like cn15 in the example above, then we have to go back
          // to the Db and look for the change older than  eligible CN (cn14)
          if (eligibleCN.olderOrEqual(mostRecentDbCN)) {
            // let's try to seek the first change <= eligibleCN
          // Is the most recent change in the Db newer than eligible CSN ?
          // if yes (like csn15 in the example above, then we have to go back
          // to the Db and look for the change older than eligible CSN (csn14)
          if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
          {
            // let's try to seek the first change <= eligibleCSN
            ReplicaDBCursor cursor = null;
            try {
              cursor = h.generateCursorFrom(eligibleCN);
              cursor = h.generateCursorFrom(eligibleCSN);
              if (cursor != null && cursor.getChange() != null) {
                ChangeNumber newCN = cursor.getChange().getChangeNumber();
                result.update(newCN);
                CSN newCSN = cursor.getChange().getCSN();
                result.update(newCSN);
              }
            } catch (ChangelogException e) {
              // there's no change older than eligibleCN (case of s3/cn31)
              result.update(new ChangeNumber(0, 0, serverId));
              // there's no change older than eligibleCSN (case of s3/csn31)
              result.update(new CSN(0, 0, serverId));
            } finally {
              close(cursor);
            }
          } else {
            // for this serverId, all changes in the ChangelogDb are holder
            // than eligibleCN , the most recent in the db is our guy.
            result.update(mostRecentDbCN);
            // than eligibleCSN, the most recent in the db is our guy.
            result.update(mostRecentDbCSN);
          }
        } catch (Exception e) {
          logError(ERR_WRITER_UNEXPECTED_EXCEPTION
@@ -2733,15 +2734,17 @@
  }
  /**
   * Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat
   * state.
   * For each DS, take the oldest CN from the changetime heartbeat state
   * and from the changelog db last CN. Can be null.
   * @return the eligible CN.
   * Returns the eligible CSN for that domain - relies on the
   * ChangeTimeHeartbeat state.
   * <p>
   * For each DS, take the oldest CSN from the changetime heartbeat state and
   * from the changelog db last CSN. Can be null.
   *
   * @return the eligible CSN.
   */
  public ChangeNumber getEligibleCN()
  public CSN getEligibleCSN()
  {
    ChangeNumber eligibleCN = null;
    CSN eligibleCSN = null;
    for (DbHandler db : sourceDbHandlers.values())
    {
@@ -2749,8 +2752,8 @@
      int serverId = db.getServerId();
      // Should it be considered for eligibility ?
      ChangeNumber heartbeatLastCN =
        getChangeTimeHeartbeatState().getChangeNumber(serverId);
      CSN heartbeatLastCSN =
        getChangeTimeHeartbeatState().getCSN(serverId);
      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
      // then the domain is considered down and not considered for eligibility
@@ -2776,24 +2779,24 @@
        continue;
      }
      ChangeNumber changelogLastCN = db.getLastChange();
      if (changelogLastCN != null
          && (eligibleCN == null || changelogLastCN.newer(eligibleCN)))
      CSN changelogLastCSN = db.getLastChange();
      if (changelogLastCSN != null
          && (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN)))
      {
        eligibleCN = changelogLastCN;
        eligibleCSN = changelogLastCSN;
      }
      if (heartbeatLastCN != null
          && (eligibleCN == null || heartbeatLastCN.newer(eligibleCN)))
      if (heartbeatLastCSN != null
          && (eligibleCSN == null || heartbeatLastCSN.newer(eligibleCSN)))
      {
        eligibleCN = heartbeatLastCN;
        eligibleCSN = heartbeatLastCSN;
      }
    }
    if (debugEnabled())
    {
      debug("getEligibleCN() returns result =" + eligibleCN);
      debug("getEligibleCSN() returns result =" + eligibleCSN);
    }
    return eligibleCN;
    return eligibleCSN;
  }
  private boolean isServerConnected(int serverId)
@@ -2816,7 +2819,7 @@
  /**
   * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp)
   * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
   * value received, and forwarding the message to the other RSes.
   * @param senderHandler The handler for the server that sent the heartbeat.
   * @param msg The message to process.
@@ -2840,7 +2843,7 @@
    try
    {
      storeReceivedCTHeartbeat(msg.getChangeNumber());
      storeReceivedCTHeartbeat(msg.getCSN());
      if (senderHandler.isDataServer())
      {
        // If we are the first replication server warned,
@@ -2874,34 +2877,34 @@
  /**
   * Store a change time value received from a data server.
   * @param cn The provided change time.
   * @param csn The provided change time.
   */
  public void storeReceivedCTHeartbeat(ChangeNumber cn)
  public void storeReceivedCTHeartbeat(CSN csn)
  {
    // TODO:May be we can spare processing by only storing CN (timestamp)
    // TODO:May be we can spare processing by only storing CSN (timestamp)
    // instead of a server state.
    getChangeTimeHeartbeatState().update(cn);
    getChangeTimeHeartbeatState().update(csn);
  }
  /**
   * This methods count the changes, server by server :
   * - from a serverState start point
   * - to (inclusive) an end point (the provided endCN).
   * - to (inclusive) an end point (the provided endCSN).
   * @param startState The provided start server state.
   * @param endCN The provided end change number.
   * @return The number of changes between startState and endCN.
   * @param endCSN The provided end CSN.
   * @return The number of changes between startState and endCSN.
   */
  public long getEligibleCount(ServerState startState, ChangeNumber endCN)
  public long getEligibleCount(ServerState startState, CSN endCSN)
  {
    long res = 0;
    for (int serverId : getDbServerState())
    {
      ChangeNumber startCN = startState.getChangeNumber(serverId);
      long serverIdRes = getCount(serverId, startCN, endCN);
      CSN startCSN = startState.getCSN(serverId);
      long serverIdRes = getCount(serverId, startCSN, endCSN);
      // The startPoint is excluded when counting the ECL eligible changes
      if (startCN != null && serverIdRes > 0)
      if (startCSN != null && serverIdRes > 0)
      {
        serverIdRes--;
      }
@@ -2912,20 +2915,20 @@
  }
  /**
   * This methods count the changes, server by server :
   * - from a start CN
   * - to (inclusive) an end point (the provided endCN).
   * @param startCN The provided start changeNumber.
   * @param endCN The provided end change number.
   * @return The number of changes between startTime and endCN.
   * This methods count the changes, server by server:
   * - from a start CSN
   * - to (inclusive) an end point (the provided endCSN).
   * @param startCSN The provided start CSN.
   * @param endCSN The provided end CSN.
   * @return The number of changes between startTime and endCSN.
   */
  public long getEligibleCount(ChangeNumber startCN, ChangeNumber endCN)
  public long getEligibleCount(CSN startCSN, CSN endCSN)
  {
    long res = 0;
    for (int serverId : getDbServerState()) {
      ChangeNumber lStartCN =
          new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), serverId);
      res += getCount(serverId, lStartCN, endCN);
      CSN lStartCSN =
          new CSN(startCSN.getTime(), startCSN.getSeqnum(), serverId);
      res += getCount(serverId, lStartCSN, endCSN);
    }
    return res;
  }