From be4d4d9909a3461fa0211e259d0d08dcd49cb221 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 02 Sep 2013 08:57:43 +0000
Subject: [PATCH] Renamed: - ChangeNumber to CSN - ChangeNumberGenerator to CSNGenerator - ChangeNumberTest to CSNTest - ChangeNumberGeneratorTest to CSNGeneratorTest
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 215 +++++++++++++++++++++++++++--------------------------
1 files changed, 109 insertions(+), 106 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 6723f21..d638eee 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -136,7 +136,7 @@
* The needed info for each received assured update message we are waiting
* acks for.
* <p>
- * Key: a change number matching a received update message which requested
+ * Key: a CSN matching a received update message which requested
* assured mode usage (either safe read or safe data mode)
* <p>
* Value: The object holding every info needed about the already received acks
@@ -145,8 +145,8 @@
* @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub
* classes javadoc.
*/
- private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks =
- new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>();
+ private final Map<CSN, ExpectedAcksInfo> waitingAcks =
+ new ConcurrentHashMap<CSN, ExpectedAcksInfo>();
/**
* The timer used to run the timeout code (timer tasks) for the assured update
@@ -193,8 +193,8 @@
public void put(UpdateMsg update, ServerHandler sourceHandler)
throws IOException
{
- ChangeNumber cn = update.getChangeNumber();
- int serverId = cn.getServerId();
+ CSN csn = update.getCSN();
+ int serverId = csn.getServerId();
sourceHandler.updateServerState(update);
sourceHandler.incrementInCount();
@@ -271,11 +271,11 @@
// The following timer will time out and send an timeout ack to the
// requester if the acks are not received in time. The timer will also
// remove the object from this map.
- waitingAcks.put(cn, preparedAssuredInfo.expectedAcksInfo);
+ waitingAcks.put(csn, preparedAssuredInfo.expectedAcksInfo);
// Arm timer for this assured update message (wait for acks until it
// times out)
- AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn);
+ AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(csn);
assuredTimeoutTimer.schedule(assuredTimeoutTask,
localReplicationServer.getAssuredTimeout());
// Purge timer every 100 treated messages
@@ -319,7 +319,7 @@
{
if (debugEnabled())
{
- debug("update " + update.getChangeNumber()
+ debug("update " + update.getCSN()
+ " will not be sent to replication server "
+ rsHandler.getServerId() + " with generation id "
+ rsHandler.getGenerationId() + " different from local "
@@ -362,7 +362,7 @@
{
if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
{
- debug("update " + update.getChangeNumber()
+ debug("update " + update.getCSN()
+ " will not be sent to directory server "
+ dsHandler.getServerId() + " with generation id "
+ dsHandler.getGenerationId() + " different from local "
@@ -370,7 +370,7 @@
}
if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
{
- debug("update " + update.getChangeNumber()
+ debug("update " + update.getCSN()
+ " will not be sent to directory server "
+ dsHandler.getServerId() + " as it is in full update");
}
@@ -499,7 +499,7 @@
private PreparedAssuredInfo processSafeReadUpdateMsg(
UpdateMsg update, ServerHandler sourceHandler) throws IOException
{
- ChangeNumber cn = update.getChangeNumber();
+ CSN csn = update.getCSN();
byte groupId = localReplicationServer.getGroupId();
byte sourceGroupId = sourceHandler.getGroupId();
List<Integer> expectedServers = new ArrayList<Integer>();
@@ -550,7 +550,7 @@
if (expectedServers.size() > 0)
{
// Some other acks to wait for
- preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(cn,
+ preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(csn,
sourceHandler, expectedServers, wrongStatusServers);
preparedAssuredInfo.expectedServers = expectedServers;
}
@@ -558,7 +558,7 @@
if (preparedAssuredInfo.expectedServers == null)
{
// No eligible servers found, send the ack immediately
- sourceHandler.send(new AckMsg(cn));
+ sourceHandler.send(new AckMsg(csn));
}
return preparedAssuredInfo;
@@ -582,7 +582,7 @@
private PreparedAssuredInfo processSafeDataUpdateMsg(
UpdateMsg update, ServerHandler sourceHandler) throws IOException
{
- ChangeNumber cn = update.getChangeNumber();
+ CSN csn = update.getCSN();
boolean interestedInAcks = false;
byte safeDataLevel = update.getSafeDataLevel();
byte groupId = localReplicationServer.getGroupId();
@@ -608,7 +608,7 @@
* mode with safe data level 1, coming from a DS. No need to wait
* for more acks
*/
- sourceHandler.send(new AckMsg(cn));
+ sourceHandler.send(new AckMsg(csn));
} else
{
/**
@@ -628,7 +628,7 @@
*/
if (safeDataLevel > (byte) 1)
{
- sourceHandler.send(new AckMsg(cn));
+ sourceHandler.send(new AckMsg(csn));
}
}
}
@@ -656,14 +656,14 @@
byte finalSdl = (nExpectedServers >= neededAdditionalServers) ?
(byte)sdl : // Keep level as it was
(byte)(nExpectedServers+1); // Change level to match what's available
- preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
+ preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(csn,
sourceHandler, finalSdl, expectedServers);
preparedAssuredInfo.expectedServers = expectedServers;
} else
{
// level > 1 and source is a DS but no eligible servers found, send the
// ack immediately
- sourceHandler.send(new AckMsg(cn));
+ sourceHandler.send(new AckMsg(csn));
}
}
@@ -706,8 +706,8 @@
{
// Retrieve the expected acks info for the update matching the original
// sent update.
- ChangeNumber cn = ack.getChangeNumber();
- ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
+ CSN csn = ack.getCSN();
+ ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn);
if (expectedAcksInfo != null)
{
@@ -728,7 +728,7 @@
if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
{
// Remove the object from the map as no more needed
- waitingAcks.remove(cn);
+ waitingAcks.remove(csn);
AckMsg finalAck = expectedAcksInfo.createAck(false);
ServerHandler origServer = expectedAcksInfo.getRequesterServer();
try
@@ -744,7 +744,7 @@
mb.append(ERR_RS_ERROR_SENDING_ACK.get(
Integer.toString(localReplicationServer.getServerId()),
Integer.toString(origServer.getServerId()),
- cn.toString(), baseDn));
+ csn.toString(), baseDn));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
stopServer(origServer, false);
@@ -755,7 +755,7 @@
}
}
}
- /* Else the timeout occurred for the update matching this change number
+ /* Else the timeout occurred for the update matching this CSN
* and the ack with timeout error has probably already been sent.
*/
}
@@ -767,15 +767,15 @@
*/
private class AssuredTimeoutTask extends TimerTask
{
- private ChangeNumber cn = null;
+ private CSN csn = null;
/**
* Constructor for the timer task.
- * @param cn The changeNumber of the assured update we are waiting acks for
+ * @param csn The CSN of the assured update we are waiting acks for
*/
- public AssuredTimeoutTask(ChangeNumber cn)
+ public AssuredTimeoutTask(CSN csn)
{
- this.cn = cn;
+ this.csn = csn;
}
/**
@@ -785,7 +785,7 @@
@Override
public void run()
{
- ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
+ ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(csn);
if (expectedAcksInfo != null)
{
@@ -798,14 +798,14 @@
return;
}
// Remove the object from the map as no more needed
- waitingAcks.remove(cn);
+ waitingAcks.remove(csn);
// Create the timeout ack and send him to the server the assured
// update message came from
AckMsg finalAck = expectedAcksInfo.createAck(true);
ServerHandler origServer = expectedAcksInfo.getRequesterServer();
if (debugEnabled())
{
- debug("sending timeout for assured update with change number " + cn
+ debug("sending timeout for assured update with CSN " + csn
+ " to serverId=" + origServer.getServerId());
}
try
@@ -821,7 +821,7 @@
mb.append(ERR_RS_ERROR_SENDING_ACK.get(
Integer.toString(localReplicationServer.getServerId()),
Integer.toString(origServer.getServerId()),
- cn.toString(), baseDn));
+ csn.toString(), baseDn));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
stopServer(origServer, false);
@@ -1268,13 +1268,12 @@
*
* @param serverId
* Identifier of the server for which the cursor is created.
- * @param startAfterCN
+ * @param startAfterCSN
* Starting point for the cursor.
- * @return the created {@link ReplicaDBCursor}. Null when no DB is
- * available or the DB is empty for the provided serverId .
+ * @return the created {@link ReplicaDBCursor}. Null when no DB is available
+ * or the DB is empty for the provided serverId .
*/
- public ReplicaDBCursor getCursorFrom(int serverId,
- ChangeNumber startAfterCN)
+ public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
{
DbHandler dbHandler = sourceDbHandlers.get(serverId);
if (dbHandler == null)
@@ -1285,7 +1284,7 @@
ReplicaDBCursor cursor;
try
{
- cursor = dbHandler.generateCursorFrom(startAfterCN);
+ cursor = dbHandler.generateCursorFrom(startAfterCSN);
}
catch (Exception e)
{
@@ -1303,13 +1302,13 @@
/**
* Count the number of changes in the replication changelog for the provided
- * serverID, between 2 provided changenumbers.
+ * serverID, between 2 provided CSNs.
* @param serverId Identifier of the server for which to compute the count.
- * @param from lower limit changenumber.
- * @param to upper limit changenumber.
+ * @param from lower limit CSN.
+ * @param to upper limit CSN.
* @return the number of changes.
*/
- public long getCount(int serverId, ChangeNumber from, ChangeNumber to)
+ public long getCount(int serverId, CSN from, CSN to)
{
DbHandler dbHandler = sourceDbHandlers.get(serverId);
if (dbHandler != null)
@@ -2646,57 +2645,59 @@
* <pre>
* s1 s2 s3
* -- -- --
- * cn31
- * cn15
+ * csn31
+ * csn15
*
- * ----------------------------------------- eligibleCN
- * cn14
- * cn26
- * cn13
+ * ----------------------------------------- eligibleCSN
+ * csn14
+ * csn26
+ * csn13
* </pre>
*
- * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
+ * The eligibleState is : s1;csn14 / s2;csn26 / s3;csn31
*
- * @param eligibleCN The provided eligibleCN.
+ * @param eligibleCSN
+ * The provided eligible CSN.
* @return The computed eligible server state.
*/
- public ServerState getEligibleState(ChangeNumber eligibleCN)
+ public ServerState getEligibleState(CSN eligibleCSN)
{
ServerState dbState = getDbServerState();
// The result is initialized from the dbState.
- // From it, we don't want to keep the changes newer than eligibleCN.
+ // From it, we don't want to keep the changes newer than eligibleCSN.
ServerState result = dbState.duplicate();
- if (eligibleCN != null)
+ if (eligibleCSN != null)
{
for (int serverId : dbState)
{
DbHandler h = sourceDbHandlers.get(serverId);
- ChangeNumber mostRecentDbCN = dbState.getChangeNumber(serverId);
+ CSN mostRecentDbCSN = dbState.getCSN(serverId);
try {
- // Is the most recent change in the Db newer than eligible CN ?
- // if yes (like cn15 in the example above, then we have to go back
- // to the Db and look for the change older than eligible CN (cn14)
- if (eligibleCN.olderOrEqual(mostRecentDbCN)) {
- // let's try to seek the first change <= eligibleCN
+ // Is the most recent change in the Db newer than eligible CSN ?
+ // if yes (like csn15 in the example above, then we have to go back
+ // to the Db and look for the change older than eligible CSN (csn14)
+ if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
+ {
+ // let's try to seek the first change <= eligibleCSN
ReplicaDBCursor cursor = null;
try {
- cursor = h.generateCursorFrom(eligibleCN);
+ cursor = h.generateCursorFrom(eligibleCSN);
if (cursor != null && cursor.getChange() != null) {
- ChangeNumber newCN = cursor.getChange().getChangeNumber();
- result.update(newCN);
+ CSN newCSN = cursor.getChange().getCSN();
+ result.update(newCSN);
}
} catch (ChangelogException e) {
- // there's no change older than eligibleCN (case of s3/cn31)
- result.update(new ChangeNumber(0, 0, serverId));
+ // there's no change older than eligibleCSN (case of s3/csn31)
+ result.update(new CSN(0, 0, serverId));
} finally {
close(cursor);
}
} else {
// for this serverId, all changes in the ChangelogDb are holder
- // than eligibleCN , the most recent in the db is our guy.
- result.update(mostRecentDbCN);
+ // than eligibleCSN, the most recent in the db is our guy.
+ result.update(mostRecentDbCSN);
}
} catch (Exception e) {
logError(ERR_WRITER_UNEXPECTED_EXCEPTION
@@ -2733,15 +2734,17 @@
}
/**
- * Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat
- * state.
- * For each DS, take the oldest CN from the changetime heartbeat state
- * and from the changelog db last CN. Can be null.
- * @return the eligible CN.
+ * Returns the eligible CSN for that domain - relies on the
+ * ChangeTimeHeartbeat state.
+ * <p>
+ * For each DS, take the oldest CSN from the changetime heartbeat state and
+ * from the changelog db last CSN. Can be null.
+ *
+ * @return the eligible CSN.
*/
- public ChangeNumber getEligibleCN()
+ public CSN getEligibleCSN()
{
- ChangeNumber eligibleCN = null;
+ CSN eligibleCSN = null;
for (DbHandler db : sourceDbHandlers.values())
{
@@ -2749,8 +2752,8 @@
int serverId = db.getServerId();
// Should it be considered for eligibility ?
- ChangeNumber heartbeatLastCN =
- getChangeTimeHeartbeatState().getChangeNumber(serverId);
+ CSN heartbeatLastCSN =
+ getChangeTimeHeartbeatState().getCSN(serverId);
// If the most recent UpdateMsg or CLHeartbeatMsg received is very old
// then the domain is considered down and not considered for eligibility
@@ -2776,24 +2779,24 @@
continue;
}
- ChangeNumber changelogLastCN = db.getLastChange();
- if (changelogLastCN != null
- && (eligibleCN == null || changelogLastCN.newer(eligibleCN)))
+ CSN changelogLastCSN = db.getLastChange();
+ if (changelogLastCSN != null
+ && (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN)))
{
- eligibleCN = changelogLastCN;
+ eligibleCSN = changelogLastCSN;
}
- if (heartbeatLastCN != null
- && (eligibleCN == null || heartbeatLastCN.newer(eligibleCN)))
+ if (heartbeatLastCSN != null
+ && (eligibleCSN == null || heartbeatLastCSN.newer(eligibleCSN)))
{
- eligibleCN = heartbeatLastCN;
+ eligibleCSN = heartbeatLastCSN;
}
}
if (debugEnabled())
{
- debug("getEligibleCN() returns result =" + eligibleCN);
+ debug("getEligibleCSN() returns result =" + eligibleCSN);
}
- return eligibleCN;
+ return eligibleCSN;
}
private boolean isServerConnected(int serverId)
@@ -2816,7 +2819,7 @@
/**
- * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp)
+ * Processes a ChangeTimeHeartbeatMsg received, by storing the CSN (timestamp)
* value received, and forwarding the message to the other RSes.
* @param senderHandler The handler for the server that sent the heartbeat.
* @param msg The message to process.
@@ -2840,7 +2843,7 @@
try
{
- storeReceivedCTHeartbeat(msg.getChangeNumber());
+ storeReceivedCTHeartbeat(msg.getCSN());
if (senderHandler.isDataServer())
{
// If we are the first replication server warned,
@@ -2874,34 +2877,34 @@
/**
* Store a change time value received from a data server.
- * @param cn The provided change time.
+ * @param csn The provided change time.
*/
- public void storeReceivedCTHeartbeat(ChangeNumber cn)
+ public void storeReceivedCTHeartbeat(CSN csn)
{
- // TODO:May be we can spare processing by only storing CN (timestamp)
+ // TODO:May be we can spare processing by only storing CSN (timestamp)
// instead of a server state.
- getChangeTimeHeartbeatState().update(cn);
+ getChangeTimeHeartbeatState().update(csn);
}
/**
* This methods count the changes, server by server :
* - from a serverState start point
- * - to (inclusive) an end point (the provided endCN).
+ * - to (inclusive) an end point (the provided endCSN).
* @param startState The provided start server state.
- * @param endCN The provided end change number.
- * @return The number of changes between startState and endCN.
+ * @param endCSN The provided end CSN.
+ * @return The number of changes between startState and endCSN.
*/
- public long getEligibleCount(ServerState startState, ChangeNumber endCN)
+ public long getEligibleCount(ServerState startState, CSN endCSN)
{
long res = 0;
for (int serverId : getDbServerState())
{
- ChangeNumber startCN = startState.getChangeNumber(serverId);
- long serverIdRes = getCount(serverId, startCN, endCN);
+ CSN startCSN = startState.getCSN(serverId);
+ long serverIdRes = getCount(serverId, startCSN, endCSN);
// The startPoint is excluded when counting the ECL eligible changes
- if (startCN != null && serverIdRes > 0)
+ if (startCSN != null && serverIdRes > 0)
{
serverIdRes--;
}
@@ -2912,20 +2915,20 @@
}
/**
- * This methods count the changes, server by server :
- * - from a start CN
- * - to (inclusive) an end point (the provided endCN).
- * @param startCN The provided start changeNumber.
- * @param endCN The provided end change number.
- * @return The number of changes between startTime and endCN.
+ * This methods count the changes, server by server:
+ * - from a start CSN
+ * - to (inclusive) an end point (the provided endCSN).
+ * @param startCSN The provided start CSN.
+ * @param endCSN The provided end CSN.
+ * @return The number of changes between startTime and endCSN.
*/
- public long getEligibleCount(ChangeNumber startCN, ChangeNumber endCN)
+ public long getEligibleCount(CSN startCSN, CSN endCSN)
{
long res = 0;
for (int serverId : getDbServerState()) {
- ChangeNumber lStartCN =
- new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), serverId);
- res += getCount(serverId, lStartCN, endCN);
+ CSN lStartCSN =
+ new CSN(startCSN.getTime(), startCSN.getSeqnum(), serverId);
+ res += getCount(serverId, lStartCSN, endCSN);
}
return res;
}
--
Gitblit v1.10.0