From be4d4d9909a3461fa0211e259d0d08dcd49cb221 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 02 Sep 2013 08:57:43 +0000
Subject: [PATCH] Renamed: - ChangeNumber to CSN - ChangeNumberGenerator to CSNGenerator - ChangeNumberTest to CSNTest - ChangeNumberGeneratorTest to CSNGeneratorTest

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  215 +++++++++++++++++++++++++++--------------------------
 1 files changed, 109 insertions(+), 106 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 6723f21..d638eee 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -136,7 +136,7 @@
    * The needed info for each received assured update message we are waiting
    * acks for.
    * <p>
-   * Key: a change number matching a received update message which requested
+   * Key: a CSN matching a received update message which requested
    * assured mode usage (either safe read or safe data mode)
    * <p>
    * Value: The object holding every info needed about the already received acks
@@ -145,8 +145,8 @@
    * @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub
    *      classes javadoc.
    */
-  private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks =
-    new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>();
+  private final Map<CSN, ExpectedAcksInfo> waitingAcks =
+    new ConcurrentHashMap<CSN, ExpectedAcksInfo>();
 
   /**
    * The timer used to run the timeout code (timer tasks) for the assured update
@@ -193,8 +193,8 @@
   public void put(UpdateMsg update, ServerHandler sourceHandler)
     throws IOException
   {
-    ChangeNumber cn = update.getChangeNumber();
-    int serverId = cn.getServerId();
+    CSN csn = update.getCSN();
+    int serverId = csn.getServerId();
 
     sourceHandler.updateServerState(update);
     sourceHandler.incrementInCount();
@@ -271,11 +271,11 @@
         // The following timer will time out and send an timeout ack to the
         // requester if the acks are not received in time. The timer will also
         // remove the object from this map.
-        waitingAcks.put(cn, preparedAssuredInfo.expectedAcksInfo);
+        waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
 
         // Arm timer for this assured update message (wait for acks until it
         // times out)
-        AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn);
+        AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
         assuredTimeoutTimer.schedule(assuredTimeoutTask,
             localReplicationServer.getAssuredTimeout());
         // Purge timer every 100 treated messages
@@ -319,7 +319,7 @@
         {
           if (debugEnabled())
           {
-            debug("update " + update.getChangeNumber()
+            debug("update " + update.getCSN()
                 + " will not be sent to replication server "
                 + rsHandler.getServerId() + " with generation id "
                 + rsHandler.getGenerationId() + " different from local "
@@ -362,7 +362,7 @@
         {
           if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
           {
-            debug("update " + update.getChangeNumber()
+            debug("update " + update.getCSN()
                 + " will not be sent to directory server "
                 + dsHandler.getServerId() + " with generation id "
                 + dsHandler.getGenerationId() + " different from local "
@@ -370,7 +370,7 @@
           }
           if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
           {
-            debug("update " + update.getChangeNumber()
+            debug("update " + update.getCSN()
                 + " will not be sent to directory server "
                 + dsHandler.getServerId() + " as it is in full update");
           }
@@ -499,7 +499,7 @@
   private PreparedAssuredInfo processSafeReadUpdateMsg(
     UpdateMsg update, ServerHandler sourceHandler) throws IOException
   {
-    ChangeNumber cn = update.getChangeNumber();
+    CSN csn = update.getCSN();
     byte groupId = localReplicationServer.getGroupId();
     byte sourceGroupId = sourceHandler.getGroupId();
     List<Integer> expectedServers = new ArrayList<Integer>();
@@ -550,7 +550,7 @@
     if (expectedServers.size() > 0)
     {
       // Some other acks to wait for
-      preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(cn,
+      preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(csn,
         sourceHandler, expectedServers, wrongStatusServers);
       preparedAssuredInfo.expectedServers = expectedServers;
     }
@@ -558,7 +558,7 @@
     if (preparedAssuredInfo.expectedServers == null)
     {
       // No eligible servers found, send the ack immediately
-      sourceHandler.send(new AckMsg(cn));
+      sourceHandler.send(new AckMsg(csn));
     }
 
     return preparedAssuredInfo;
@@ -582,7 +582,7 @@
   private PreparedAssuredInfo processSafeDataUpdateMsg(
     UpdateMsg update, ServerHandler sourceHandler) throws IOException
   {
-    ChangeNumber cn = update.getChangeNumber();
+    CSN csn = update.getCSN();
     boolean interestedInAcks = false;
     byte safeDataLevel = update.getSafeDataLevel();
     byte groupId = localReplicationServer.getGroupId();
@@ -608,7 +608,7 @@
              * mode with safe data level 1, coming from a DS. No need to wait
              * for more acks
              */
-            sourceHandler.send(new AckMsg(cn));
+            sourceHandler.send(new AckMsg(csn));
           } else
           {
             /**
@@ -628,7 +628,7 @@
            */
           if (safeDataLevel > (byte) 1)
           {
-            sourceHandler.send(new AckMsg(cn));
+            sourceHandler.send(new AckMsg(csn));
           }
         }
     }
@@ -656,14 +656,14 @@
         byte finalSdl = (nExpectedServers >= neededAdditionalServers) ?
           (byte)sdl : // Keep level as it was
           (byte)(nExpectedServers+1); // Change level to match what's available
-        preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
+        preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(csn,
           sourceHandler, finalSdl, expectedServers);
         preparedAssuredInfo.expectedServers = expectedServers;
       } else
       {
         // level > 1 and source is a DS but no eligible servers found, send the
         // ack immediately
-        sourceHandler.send(new AckMsg(cn));
+        sourceHandler.send(new AckMsg(csn));
       }
     }
 
@@ -706,8 +706,8 @@
   {
     // Retrieve the expected acks info for the update matching the original
     // sent update.
-    ChangeNumber cn = ack.getChangeNumber();
-    ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
+    CSN csn = ack.getCSN();
+    ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn);
 
     if (expectedAcksInfo != null)
     {
@@ -728,7 +728,7 @@
         if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
         {
           // Remove the object from the map as no more needed
-          waitingAcks.remove(cn);
+          waitingAcks.remove(csn);
           AckMsg finalAck = expectedAcksInfo.createAck(false);
           ServerHandler origServer = expectedAcksInfo.getRequesterServer();
           try
@@ -744,7 +744,7 @@
             mb.append(ERR_RS_ERROR_SENDING_ACK.get(
               Integer.toString(localReplicationServer.getServerId()),
               Integer.toString(origServer.getServerId()),
-              cn.toString(), baseDn));
+              csn.toString(), baseDn));
             mb.append(stackTraceToSingleLineString(e));
             logError(mb.toMessage());
             stopServer(origServer, false);
@@ -755,7 +755,7 @@
         }
       }
     }
-    /* Else the timeout occurred for the update matching this change number
+    /* Else the timeout occurred for the update matching this CSN
      * and the ack with timeout error has probably already been sent.
      */
   }
@@ -767,15 +767,15 @@
    */
   private class AssuredTimeoutTask extends TimerTask
   {
-    private ChangeNumber cn = null;
+    private CSN csn = null;
 
     /**
      * Constructor for the timer task.
-     * @param cn The changeNumber of the assured update we are waiting acks for
+     * @param csn The CSN of the assured update we are waiting acks for
      */
-    public AssuredTimeoutTask(ChangeNumber cn)
+    public AssuredTimeoutTask(CSN csn)
     {
-      this.cn = cn;
+      this.csn = csn;
     }
 
     /**
@@ -785,7 +785,7 @@
     @Override
     public void run()
     {
-      ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
+      ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn);
 
       if (expectedAcksInfo != null)
       {
@@ -798,14 +798,14 @@
             return;
           }
           // Remove the object from the map as no more needed
-          waitingAcks.remove(cn);
+          waitingAcks.remove(csn);
           // Create the timeout ack and send him to the server the assured
           // update message came from
           AckMsg finalAck = expectedAcksInfo.createAck(true);
           ServerHandler origServer = expectedAcksInfo.getRequesterServer();
           if (debugEnabled())
           {
-            debug("sending timeout for assured update with change number " + cn
+            debug("sending timeout for assured update with CSN " + csn
                 + " to serverId=" + origServer.getServerId());
           }
           try
@@ -821,7 +821,7 @@
             mb.append(ERR_RS_ERROR_SENDING_ACK.get(
                 Integer.toString(localReplicationServer.getServerId()),
                 Integer.toString(origServer.getServerId()),
-                cn.toString(), baseDn));
+                csn.toString(), baseDn));
             mb.append(stackTraceToSingleLineString(e));
             logError(mb.toMessage());
             stopServer(origServer, false);
@@ -1268,13 +1268,12 @@
    *
    * @param serverId
    *          Identifier of the server for which the cursor is created.
-   * @param startAfterCN
+   * @param startAfterCSN
    *          Starting point for the cursor.
-   * @return the created {@link ReplicaDBCursor}. Null when no DB is
-   *         available or the DB is empty for the provided serverId .
+   * @return the created {@link ReplicaDBCursor}. Null when no DB is available
+   *         or the DB is empty for the provided serverId .
    */
-  public ReplicaDBCursor getCursorFrom(int serverId,
-      ChangeNumber startAfterCN)
+  public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
   {
     DbHandler dbHandler = sourceDbHandlers.get(serverId);
     if (dbHandler == null)
@@ -1285,7 +1284,7 @@
     ReplicaDBCursor cursor;
     try
     {
-      cursor = dbHandler.generateCursorFrom(startAfterCN);
+      cursor = dbHandler.generateCursorFrom(startAfterCSN);
     }
     catch (Exception e)
     {
@@ -1303,13 +1302,13 @@
 
  /**
   * Count the number of changes in the replication changelog for the provided
-  * serverID, between 2 provided changenumbers.
+  * serverID, between 2 provided CSNs.
   * @param serverId Identifier of the server for which to compute the count.
-  * @param from lower limit changenumber.
-  * @param to   upper limit changenumber.
+  * @param from lower limit CSN.
+  * @param to   upper limit CSN.
   * @return the number of changes.
   */
-  public long getCount(int serverId, ChangeNumber from, ChangeNumber to)
+  public long getCount(int serverId, CSN from, CSN to)
   {
     DbHandler dbHandler = sourceDbHandlers.get(serverId);
     if (dbHandler != null)
@@ -2646,57 +2645,59 @@
    * <pre>
    *     s1               s2          s3
    *     --               --          --
-   *                                 cn31
-   *     cn15
+   *                                 csn31
+   *     csn15
    *
-   *  ----------------------------------------- eligibleCN
-   *     cn14
-   *                     cn26
-   *     cn13
+   *  ----------------------------------------- eligibleCSN
+   *     csn14
+   *                     csn26
+   *     csn13
    * </pre>
    *
-   * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
+   * The eligibleState is : s1;csn14 / s2;csn26 / s3;csn31
    *
-   * @param eligibleCN              The provided eligibleCN.
+   * @param eligibleCSN
+   *          The provided eligible CSN.
    * @return The computed eligible server state.
    */
-  public ServerState getEligibleState(ChangeNumber eligibleCN)
+  public ServerState getEligibleState(CSN eligibleCSN)
   {
     ServerState dbState = getDbServerState();
 
     // The result is initialized from the dbState.
-    // From it, we don't want to keep the changes newer than eligibleCN.
+    // From it, we don't want to keep the changes newer than eligibleCSN.
     ServerState result = dbState.duplicate();
 
-    if (eligibleCN != null)
+    if (eligibleCSN != null)
     {
       for (int serverId : dbState)
       {
         DbHandler h = sourceDbHandlers.get(serverId);
-        ChangeNumber mostRecentDbCN = dbState.getChangeNumber(serverId);
+        CSN mostRecentDbCSN = dbState.getCSN(serverId);
         try {
-          // Is the most recent change in the Db newer than eligible CN ?
-          // if yes (like cn15 in the example above, then we have to go back
-          // to the Db and look for the change older than  eligible CN (cn14)
-          if (eligibleCN.olderOrEqual(mostRecentDbCN)) {
-            // let's try to seek the first change <= eligibleCN
+          // Is the most recent change in the Db newer than eligible CSN ?
+          // if yes (like csn15 in the example above, then we have to go back
+          // to the Db and look for the change older than eligible CSN (csn14)
+          if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
+          {
+            // let's try to seek the first change <= eligibleCSN
             ReplicaDBCursor cursor = null;
             try {
-              cursor = h.generateCursorFrom(eligibleCN);
+              cursor = h.generateCursorFrom(eligibleCSN);
               if (cursor != null && cursor.getChange() != null) {
-                ChangeNumber newCN = cursor.getChange().getChangeNumber();
-                result.update(newCN);
+                CSN newCSN = cursor.getChange().getCSN();
+                result.update(newCSN);
               }
             } catch (ChangelogException e) {
-              // there's no change older than eligibleCN (case of s3/cn31)
-              result.update(new ChangeNumber(0, 0, serverId));
+              // there's no change older than eligibleCSN (case of s3/csn31)
+              result.update(new CSN(0, 0, serverId));
             } finally {
               close(cursor);
             }
           } else {
             // for this serverId, all changes in the ChangelogDb are holder
-            // than eligibleCN , the most recent in the db is our guy.
-            result.update(mostRecentDbCN);
+            // than eligibleCSN, the most recent in the db is our guy.
+            result.update(mostRecentDbCSN);
           }
         } catch (Exception e) {
           logError(ERR_WRITER_UNEXPECTED_EXCEPTION
@@ -2733,15 +2734,17 @@
   }
 
   /**
-   * Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat
-   * state.
-   * For each DS, take the oldest CN from the changetime heartbeat state
-   * and from the changelog db last CN. Can be null.
-   * @return the eligible CN.
+   * Returns the eligible CSN for that domain - relies on the
+   * ChangeTimeHeartbeat state.
+   * <p>
+   * For each DS, take the oldest CSN from the changetime heartbeat state and
+   * from the changelog db last CSN. Can be null.
+   *
+   * @return the eligible CSN.
    */
-  public ChangeNumber getEligibleCN()
+  public CSN getEligibleCSN()
   {
-    ChangeNumber eligibleCN = null;
+    CSN eligibleCSN = null;
 
     for (DbHandler db : sourceDbHandlers.values())
     {
@@ -2749,8 +2752,8 @@
       int serverId = db.getServerId();
 
       // Should it be considered for eligibility ?
-      ChangeNumber heartbeatLastCN =
-        getChangeTimeHeartbeatState().getChangeNumber(serverId);
+      CSN heartbeatLastCSN =
+        getChangeTimeHeartbeatState().getCSN(serverId);
 
       // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
       // then the domain is considered down and not considered for eligibility
@@ -2776,24 +2779,24 @@
         continue;
       }
 
-      ChangeNumber changelogLastCN = db.getLastChange();
-      if (changelogLastCN != null
-          && (eligibleCN == null || changelogLastCN.newer(eligibleCN)))
+      CSN changelogLastCSN = db.getLastChange();
+      if (changelogLastCSN != null
+          && (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN)))
       {
-        eligibleCN = changelogLastCN;
+        eligibleCSN = changelogLastCSN;
       }
-      if (heartbeatLastCN != null
-          && (eligibleCN == null || heartbeatLastCN.newer(eligibleCN)))
+      if (heartbeatLastCSN != null
+          && (eligibleCSN == null || heartbeatLastCSN.newer(eligibleCSN)))
       {
-        eligibleCN = heartbeatLastCN;
+        eligibleCSN = heartbeatLastCSN;
       }
     }
 
     if (debugEnabled())
     {
-      debug("getEligibleCN() returns result =" + eligibleCN);
+      debug("getEligibleCSN() returns result =" + eligibleCSN);
     }
-    return eligibleCN;
+    return eligibleCSN;
   }
 
   private boolean isServerConnected(int serverId)
@@ -2816,7 +2819,7 @@
 
 
   /**
-   * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp)
+   * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
    * value received, and forwarding the message to the other RSes.
    * @param senderHandler The handler for the server that sent the heartbeat.
    * @param msg The message to process.
@@ -2840,7 +2843,7 @@
 
     try
     {
-      storeReceivedCTHeartbeat(msg.getChangeNumber());
+      storeReceivedCTHeartbeat(msg.getCSN());
       if (senderHandler.isDataServer())
       {
         // If we are the first replication server warned,
@@ -2874,34 +2877,34 @@
 
   /**
    * Store a change time value received from a data server.
-   * @param cn The provided change time.
+   * @param csn The provided change time.
    */
-  public void storeReceivedCTHeartbeat(ChangeNumber cn)
+  public void storeReceivedCTHeartbeat(CSN csn)
   {
-    // TODO:May be we can spare processing by only storing CN (timestamp)
+    // TODO:May be we can spare processing by only storing CSN (timestamp)
     // instead of a server state.
-    getChangeTimeHeartbeatState().update(cn);
+    getChangeTimeHeartbeatState().update(csn);
   }
 
   /**
    * This methods count the changes, server by server :
    * - from a serverState start point
-   * - to (inclusive) an end point (the provided endCN).
+   * - to (inclusive) an end point (the provided endCSN).
    * @param startState The provided start server state.
-   * @param endCN The provided end change number.
-   * @return The number of changes between startState and endCN.
+   * @param endCSN The provided end CSN.
+   * @return The number of changes between startState and endCSN.
    */
-  public long getEligibleCount(ServerState startState, ChangeNumber endCN)
+  public long getEligibleCount(ServerState startState, CSN endCSN)
   {
     long res = 0;
 
     for (int serverId : getDbServerState())
     {
-      ChangeNumber startCN = startState.getChangeNumber(serverId);
-      long serverIdRes = getCount(serverId, startCN, endCN);
+      CSN startCSN = startState.getCSN(serverId);
+      long serverIdRes = getCount(serverId, startCSN, endCSN);
 
       // The startPoint is excluded when counting the ECL eligible changes
-      if (startCN != null && serverIdRes > 0)
+      if (startCSN != null && serverIdRes > 0)
       {
         serverIdRes--;
       }
@@ -2912,20 +2915,20 @@
   }
 
   /**
-   * This methods count the changes, server by server :
-   * - from a start CN
-   * - to (inclusive) an end point (the provided endCN).
-   * @param startCN The provided start changeNumber.
-   * @param endCN The provided end change number.
-   * @return The number of changes between startTime and endCN.
+   * This methods count the changes, server by server:
+   * - from a start CSN
+   * - to (inclusive) an end point (the provided endCSN).
+   * @param startCSN The provided start CSN.
+   * @param endCSN The provided end CSN.
+   * @return The number of changes between startTime and endCSN.
    */
-  public long getEligibleCount(ChangeNumber startCN, ChangeNumber endCN)
+  public long getEligibleCount(CSN startCSN, CSN endCSN)
   {
     long res = 0;
     for (int serverId : getDbServerState()) {
-      ChangeNumber lStartCN =
-          new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), serverId);
-      res += getCount(serverId, lStartCN, endCN);
+      CSN lStartCSN =
+          new CSN(startCSN.getTime(), startCSN.getSeqnum(), serverId);
+      res += getCount(serverId, lStartCSN, endCSN);
     }
     return res;
   }

--
Gitblit v1.10.0