From 69ebea4017b17653a1b966b7a83372eb9ce0dcdc Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 05 Nov 2013 10:51:08 +0000
Subject: [PATCH] ECLServerHandler.java, ReplicationServerDomain.java: Renamed variables and changed comments to match the new terminology. Added more comments to explain what the code is doing.
---
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 88 ++++++++++++++++++----------
opends/src/messages/messages/replication.properties | 4
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 58 ++++++++----------
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 10 +-
opends/src/server/org/opends/server/replication/common/RSInfo.java | 15 +---
5 files changed, 94 insertions(+), 81 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index eb9abc4..531b0b7 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -500,7 +500,7 @@
NOTICE_RS_NOT_LOCALLY_CONFIGURED_219=RS(%d) was not configured locally on DS(%d), \
but at least one other RS was
NOTICE_RS_HAS_NO_GENERATION_ID_220=RS(%d) has no generation Id, but at least one \
- other RS has the same generation Id %d as DS(%d)
+ other RS has the same generation Id %d as DS(%d)
NOTICE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS_221=RS(%d) generation Id %d does not \
match DS(%d) generation Id %d, but at least another RS does
NOTICE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS_222=RS(%d) groupId %d does not match \
@@ -525,5 +525,5 @@
biggest weight among all the replication servers
NOTICE_AVOID_YOYO_EFFECT_232=DS(%d) stayed connected to RS(%d) to avoid the yoyo effect
NOTICE_BEST_RS_233=RS(%d) has been evaluated to be the best replication server \
- for DS(%d) to connect to because it was the only one standing after all tests
+ for DS(%d) to connect to because it was the only one standing after all tests
NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d)
diff --git a/opends/src/server/org/opends/server/replication/common/RSInfo.java b/opends/src/server/org/opends/server/replication/common/RSInfo.java
index 43f1979..4326c7f 100644
--- a/opends/src/server/org/opends/server/replication/common/RSInfo.java
+++ b/opends/src/server/org/opends/server/replication/common/RSInfo.java
@@ -167,16 +167,11 @@
public String toString()
{
StringBuilder sb = new StringBuilder();
- sb.append("\nId: ");
- sb.append(id);
- sb.append(" ; Server URL: ");
- sb.append(serverUrl);
- sb.append(" ; Generation id: ");
- sb.append(generationId);
- sb.append(" ; Group id: ");
- sb.append(groupId);
- sb.append(" ; Weight: ");
- sb.append(weight);
+ sb.append("Id: ").append(id);
+ sb.append(" ; Server URL: ").append(serverUrl);
+ sb.append(" ; Generation id: ").append(generationId);
+ sb.append(" ; Group id: ").append(groupId);
+ sb.append(" ; Weight: ").append(weight);
return sb.toString();
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index e8c6aa4..98bc563 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -58,13 +58,33 @@
public final class ECLServerHandler extends ServerHandler
{
+ /**
+ * Marks the end of the search on the External Change Log.
+ */
private static int UNDEFINED_PHASE = 0;
/**
- * Constant used to indicate the handler is in the ECL initialization phase.
+ * The External Change Log initialization phase currently reads the changes
+ * from all the ReplicaDBs from oldest to newest and then:
+ * <ul>
+ * <li>Matches each ReplicaDBs change with the corresponding record in
+ * {@link ChangeNumberIndexDB}, then assign its changeNumber in memory before
+ * returning the change to the client</li>
+ * <li>Once it reaches the end of the {@link ChangeNumberIndexDB}, it inserts
+ * each ReplicaDBs change in the {@link ChangeNumberIndexDB} and gets back and
+ * assign the changeNumber in memory to the ReplicaDBs change.</li>
+ * </ul>
+ * Once this phase is over the current search moves to the
+ * {@link #UNDEFINED_PHASE} or the {@link #PERSISTENT_PHASE} depending on the
+ * search type.
*
* @see #getSearchPhase()
*/
public static int INIT_PHASE = 1;
+ /**
+ * The persistent phase is only used for persistent searches on the External
+ * ChangeLog. It comes after the {@link #INIT_PHASE} and sends back to the
+ * client newly added changes to the {@link ChangeNumberIndexDB}.
+ */
private static int PERSISTENT_PHASE = 2;
/**
@@ -111,8 +131,8 @@
private Set<String> excludedBaseDNs = new HashSet<String>();
/**
- * Eligible CSN - only changes older or equal to eligibleCSN * are published
- * in the ECL.
+ * Eligible CSN - only changes older or equal to eligibleCSN are published in
+ * the ECL.
*/
private CSN eligibleCSN;
@@ -1203,6 +1223,9 @@
if (draftCompat)
{
continueLooping = !assignChangeNumber(change);
+ // if we encounter a change that has been trimmed from the replicaDBs,
+ // we will skip it and loop to the next oldest change from the
+ // replicaDBs
}
// here we have the right oldest change
@@ -1288,34 +1311,34 @@
* Either retrieves a change number from the DB, or assign a new change number
* and store in the DB.
*
- * @param oldestChange
- * the oldestChange where to assign the change number
+ * @param replicaDBChange
+ * the replica DB change to find in the {@link ChangeNumberIndexDB}
+ * where to assign the change number
* @return <code>true</code> if a change number has been assigned to the
- * provided oldestChange, <code>false</code> otherwise
+ * provided replicaDBChange, <code>false</code> otherwise
* @throws DirectoryException
* if any problem occur
* @throws ChangelogException
* if a database problem occurs.
*/
- private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
+ private boolean assignChangeNumber(final ECLUpdateMsg replicaDBChange)
throws ChangelogException
{
- // We also need to check if the CNIndexDB is consistent with the
- // changelogDB. If not, 2 potential reasons:
- // a/ changelog has been purged (trim) let's traverse the CNIndexDB
+ // We also need to check if the CNIndexDB is consistent with the replicaDBs.
+ // If not, 2 potential reasons:
+ // a/ replicaDBs have been purged (trim) let's traverse the CNIndexDB
// b/ changelog is late ... let's traverse the changelogDb
// The following loop allows to loop until being on the same cn in the 2 dbs
- // replogCSN : the oldest change from the changelog db
- CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN();
- DN dnFromChangelogDb = oldestChange.getBaseDN();
+ CSN csnFromReplicaDB = replicaDBChange.getUpdateMsg().getCSN();
+ DN dnFromReplicaDB = replicaDBChange.getBaseDN();
while (true)
{
if (isEndOfCNIndexDBReached)
{
// we are at the end of the CNIndexDB in the append mode
- assignNewChangeNumberAndStore(oldestChange);
+ assignNewChangeNumberAndStore(replicaDBChange);
return true;
}
@@ -1324,43 +1347,44 @@
final DN dnFromCNIndexDB = currentRecord.getBaseDN();
if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber() comparing the 2 db DNs :"
- + dnFromChangelogDb + "?=" + dnFromCNIndexDB + " timestamps:"
- + asDate(csnFromChangelogDb) + " ?older"
+ TRACER.debugInfo("assignChangeNumber() comparing the replicaDB's and"
+ + " CNIndexDB's DNs :" + dnFromReplicaDB + "?=" + dnFromCNIndexDB
+ + " timestamps:" + asDate(csnFromReplicaDB) + " ?older"
+ asDate(csnFromCNIndexDB));
- if (areSameChange(csnFromChangelogDb, dnFromChangelogDb,
- csnFromCNIndexDB, dnFromCNIndexDB))
+ if (areSameChange(csnFromReplicaDB, dnFromReplicaDB, csnFromCNIndexDB,
+ dnFromCNIndexDB))
{
+ // We matched the ReplicaDB change with a record in the CNIndexDB
+ // => set the changeNumber in memory and return the change to the client
if (debugEnabled())
TRACER.debugInfo("assignChangeNumber() assigning changeNumber="
- + currentRecord.getChangeNumber() + " to change=" + oldestChange);
+ + currentRecord.getChangeNumber() + " to change="
+ + replicaDBChange);
- oldestChange.setChangeNumber(currentRecord.getChangeNumber());
+ replicaDBChange.setChangeNumber(currentRecord.getChangeNumber());
return true;
}
- if (!csnFromCNIndexDB.isOlderThan(csnFromChangelogDb))
+ if (!csnFromCNIndexDB.isOlderThan(csnFromReplicaDB))
{
- // the change from the changelogDb is older
+ // the change from the replicaDB is older
// it should have been stored lately
- // let's continue to traverse the changelogDB
+ // let's continue to traverse the replicaDBs
if (debugEnabled())
- TRACER.debugInfo("assignChangeNumber() will skip "
- + csnFromChangelogDb
+ TRACER.debugInfo("assignChangeNumber() will skip " + csnFromReplicaDB
+ " and read next change from the regular changelog.");
return false; // TO BE CHECKED
}
- // the change from the CNIndexDB is older
- // that means that the change has been purged from the
- // changelogDb (and CNIndexDB not yet been trimmed)
+ // The change from the CNIndexDB is older.
+ // It means that the CNIndexDB change has been purged from the replicaDB
+ // and CNIndexDB has not been trimmed yet.
try
{
- // let's traverse the CNIndexDB searching for the change
- // found in the changelogDB
+ // keep traversing the CNIndexDB searching for the replicaDB change
if (debugEnabled())
TRACER.debugInfo("assignChangeNumber() will skip " + csnFromCNIndexDB
+ " and read next change from the CNIndexDB.");
@@ -1380,7 +1404,7 @@
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
// FIXME There is an opportunity for an infinite loop here if the DB
- // continuously throws DatabaseExceptions
+ // continuously throws ChangelogExceptions
}
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index b6a83cf..88e07b3 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2595,10 +2595,10 @@
CSN eligibleCSN = null;
final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN);
- for (final CSN changelogNewestCSN : newestCSNs)
+ for (final CSN replicaNewestCSN : newestCSNs)
{
// Should it be considered for eligibility ?
- int serverId = changelogNewestCSN.getServerId();
+ int serverId = replicaNewestCSN.getServerId();
CSN heartbeatLastCSN = getChangeTimeHeartbeatState().getCSN(serverId);
// If the most recent UpdateMsg or CLHeartbeatMsg received is very old
@@ -2625,11 +2625,11 @@
continue;
}
- if (changelogNewestCSN != null
+ if (replicaNewestCSN != null
&& (eligibleCSN == null ||
- changelogNewestCSN.isNewerThan(eligibleCSN)))
+ replicaNewestCSN.isNewerThan(eligibleCSN)))
{
- eligibleCSN = changelogNewestCSN;
+ eligibleCSN = replicaNewestCSN;
}
if (heartbeatLastCSN != null
&& (eligibleCSN == null || heartbeatLastCSN.isNewerThan(eligibleCSN)))
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index e755741..ef57f59 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -299,20 +299,19 @@
/**
* Sets the locally configured flag for the passed ReplicationServerInfo
* object, analyzing the local configuration.
- * @param replicationServerInfo the Replication server to check and update
+ * @param rsInfo the Replication server to check and update
*/
- private void updateRSInfoLocallyConfiguredStatus(
- ReplicationServerInfo replicationServerInfo)
+ private void updateRSInfoLocallyConfiguredStatus(ReplicationServerInfo rsInfo)
{
// Determine if the passed ReplicationServerInfo has a URL that is present
// in the locally configured replication servers
- String rsUrl = replicationServerInfo.getServerURL();
+ String rsUrl = rsInfo.getServerURL();
if (rsUrl == null)
{
// The ReplicationServerInfo has been generated from a server with
// no URL in TopologyMsg (i.e: with replication protocol version < 4):
// ignore this server as we do not know how to connect to it
- replicationServerInfo.setLocallyConfigured(false);
+ rsInfo.setLocallyConfigured(false);
return;
}
for (String serverUrl : getReplicationServerUrls())
@@ -320,12 +319,12 @@
if (isSameReplicationServerUrl(serverUrl, rsUrl))
{
// This RS is locally configured, mark this
- replicationServerInfo.setLocallyConfigured(true);
- replicationServerInfo.serverURL = serverUrl;
+ rsInfo.setLocallyConfigured(true);
+ rsInfo.serverURL = serverUrl;
return;
}
}
- replicationServerInfo.setLocallyConfigured(false);
+ rsInfo.setLocallyConfigured(false);
}
/**
@@ -704,14 +703,12 @@
for (String serverUrl : getReplicationServerUrls())
{
- // Connect to server and get info about it
- ReplicationServerInfo replicationServerInfo =
- performPhaseOneHandshake(serverUrl, false, false);
-
- // Store server info in list
- if (replicationServerInfo != null)
+ // Connect to server + get and store info about it
+ ReplicationServerInfo rsInfo =
+ performPhaseOneHandshake(serverUrl, false, false);
+ if (rsInfo != null)
{
- rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo);
+ rsInfos.put(rsInfo.getServerId(), rsInfo);
}
}
@@ -1943,10 +1940,10 @@
int sumOfWeights = 0;
// Sum of the connected DSs
int sumOfConnectedDSs = 0;
- for (ReplicationServerInfo replicationServerInfo : bestServers.values())
+ for (ReplicationServerInfo rsInfo : bestServers.values())
{
- sumOfWeights += replicationServerInfo.getWeight();
- sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
+ sumOfWeights += rsInfo.getWeight();
+ sumOfConnectedDSs += rsInfo.getConnectedDSNumber();
}
// Distance (difference) of the current loads to the load goals of each RS:
@@ -3036,27 +3033,25 @@
// Update replication server info list with the received topology
// information
- List<Integer> rsToKeepList = new ArrayList<Integer>();
+ final Set<Integer> rssToKeep = new HashSet<Integer>();
for (RSInfo rsInfo : topoMsg.getRsList())
{
int rsId = rsInfo.getId();
- rsToKeepList.add(rsId); // Mark this server as still existing
+ rssToKeep.add(rsId); // Mark this server as still existing
List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
- ReplicationServerInfo replicationServerInfo =
- replicationServerInfos.get(rsId);
- if (replicationServerInfo == null)
+ ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
+ if (rsInfo2 == null)
{
// New replication server, create info for it add it to the list
- replicationServerInfo =
- new ReplicationServerInfo(rsInfo, connectedDSs);
+ rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
// Set the locally configured flag for this new RS only if it is
// configured
- updateRSInfoLocallyConfiguredStatus(replicationServerInfo);
- replicationServerInfos.put(rsId, replicationServerInfo);
+ updateRSInfoLocallyConfiguredStatus(rsInfo2);
+ replicationServerInfos.put(rsId, rsInfo2);
} else
{
// Update the existing info for the replication server
- replicationServerInfo.update(rsInfo, connectedDSs);
+ rsInfo2.update(rsInfo, connectedDSs);
}
}
@@ -3064,12 +3059,11 @@
* Now remove any replication server that may have disappeared from the
* topology.
*/
- Iterator<Entry<Integer, ReplicationServerInfo>> rsInfoIt =
- replicationServerInfos.entrySet().iterator();
+ Iterator<Integer> rsInfoIt = replicationServerInfos.keySet().iterator();
while (rsInfoIt.hasNext())
{
- Entry<Integer, ReplicationServerInfo> rsInfoEntry = rsInfoIt.next();
- if (!rsToKeepList.contains(rsInfoEntry.getKey()))
+ final Integer rsId = rsInfoIt.next();
+ if (!rssToKeep.contains(rsId))
{
// This replication server has quit the topology, remove it from the
// list
--
Gitblit v1.10.0