From 2fc933d03d5961fa75fc4a8ffef75d3ec3b125d6 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 04 Oct 2013 12:46:34 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java | 7 +--
opends/src/server/org/opends/server/replication/common/CSNGenerator.java | 33 +++++++++-------
opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitorData.java | 13 +++---
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 9 +---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 39 +++++++++----------
opends/src/server/org/opends/server/replication/common/ServerState.java | 6 +-
6 files changed, 52 insertions(+), 55 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/CSNGenerator.java b/opends/src/server/org/opends/server/replication/common/CSNGenerator.java
index 559d621..5b295b5 100644
--- a/opends/src/server/org/opends/server/replication/common/CSNGenerator.java
+++ b/opends/src/server/org/opends/server/replication/common/CSNGenerator.java
@@ -64,19 +64,22 @@
* all {@link CSN}s generated will be larger than all the
* {@link CSN}s currently in state.
*/
- public CSNGenerator(int id, ServerState state)
- {
- this.lastTime = TimeThread.getTime();
- for (int stateId : state)
- {
- if (this.lastTime < state.getCSN(stateId).getTime())
- this.lastTime = state.getCSN(stateId).getTime();
- if (stateId == id)
- this.seqnum = state.getCSN(id).getSeqnum();
- }
- this.serverId = id;
-
- }
+ public CSNGenerator(int id, ServerState state)
+ {
+ this.lastTime = TimeThread.getTime();
+ for (CSN csn : state)
+ {
+ if (this.lastTime < csn.getTime())
+ {
+ this.lastTime = csn.getTime();
+ }
+ if (csn.getServerId() == id)
+ {
+ this.seqnum = csn.getSeqnum();
+ }
+ }
+ this.serverId = id;
+ }
/**
* Generate a new {@link CSN}.
@@ -160,9 +163,9 @@
*/
public void adjust(ServerState state)
{
- for (int localServerId : state)
+ for (CSN csn : state)
{
- adjust(state.getCSN(localServerId));
+ adjust(csn);
}
}
}
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 140de98..7e6be55 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -43,7 +43,7 @@
* For example, it is exchanged with the replication servers at connection
* establishment time to communicate "which CSNs was last seen by a serverId".
*/
-public class ServerState implements Iterable<Integer>
+public class ServerState implements Iterable<CSN>
{
/** Associates a serverId with a CSN. */
@@ -404,9 +404,9 @@
* {@inheritDoc}
*/
@Override
- public Iterator<Integer> iterator()
+ public Iterator<CSN> iterator()
{
- return serverIdToCSN.keySet().iterator();
+ return serverIdToCSN.values().iterator();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 59b7486..ec5c5d0 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -868,13 +868,10 @@
changelog db has been trimmed and the cookie is not valid
anymore.
*/
- for (int serverId : rsDomain.getStartState())
+ for (CSN dbOldestChange : rsDomain.getStartState())
{
- CSN dbOldestChange =
- rsDomain.getStartState().getCSN(serverId);
- CSN providedChange = cookie.getCSN(serverId);
- if (providedChange != null
- && providedChange.older(dbOldestChange))
+ CSN providedChange = cookie.getCSN(dbOldestChange.getServerId());
+ if (providedChange != null && providedChange.older(dbOldestChange))
{
return true;
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java b/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java
index bd1ada6..b78bb90 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java
@@ -279,7 +279,7 @@
// This directly connected LS has never produced any change
maxCSN = new CSN(0, 0, serverId);
}
- pendingMonitorData.setMaxCSN(serverId, maxCSN);
+ pendingMonitorData.setMaxCSN(maxCSN);
pendingMonitorData.setLDAPServerState(serverId, dsState);
pendingMonitorData.setFirstMissingDate(serverId,
ds.getApproxFirstMissingDate());
@@ -290,10 +290,9 @@
// - whatever they are directly or indirectly connected
final ServerState dbServerState = domain.getLatestServerState();
pendingMonitorData.setRSState(domain.getLocalRSServerId(), dbServerState);
- for (int serverId : dbServerState)
+ for (CSN storedCSN : dbServerState)
{
- CSN storedCSN = dbServerState.getCSN(serverId);
- pendingMonitorData.setMaxCSN(serverId, storedCSN);
+ pendingMonitorData.setMaxCSN(storedCSN);
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitorData.java b/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitorData.java
index 14e85c3..d666b28 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitorData.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitorData.java
@@ -294,22 +294,21 @@
*/
public void setMaxCSNs(ServerState state)
{
- for (Integer serverId : state) {
- CSN newCSN = state.getCSN(serverId);
- setMaxCSN(serverId, newCSN);
+ for (CSN newCSN : state)
+ {
+ setMaxCSN(newCSN);
}
}
/**
* For the provided serverId, sets the provided CSN as the max if
* it is newer than the current max.
- * @param serverId the provided serverId
* @param newCSN the provided new CSN
*/
- public void setMaxCSN(int serverId, CSN newCSN)
+ public void setMaxCSN(CSN newCSN)
{
- if (newCSN==null) return;
-
+ if (newCSN == null) return;
+ int serverId = newCSN.getServerId();
CSN currentMaxCSN = maxCSNs.get(serverId);
if (currentMaxCSN == null)
{
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 9c9e6c5..ae79eba 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1307,14 +1307,13 @@
/**
* Count the number of changes in the replication changelog for the provided
* serverID, between 2 provided CSNs.
- * @param serverId Identifier of the server for which to compute the count.
* @param from lower limit CSN.
* @param to upper limit CSN.
* @return the number of changes.
*/
- public long getCount(int serverId, CSN from, CSN to)
+ public long getCount(CSN from, CSN to)
{
- return domainDB.getCount(baseDN, serverId, from, to);
+ return domainDB.getCount(baseDN, from.getServerId(), from, to);
}
/**
@@ -2593,9 +2592,8 @@
if (eligibleCSN != null)
{
- for (int serverId : latestState)
+ for (CSN mostRecentDbCSN : latestState)
{
- CSN mostRecentDbCSN = latestState.getCSN(serverId);
try {
// Is the most recent change in the Db newer than eligible CSN ?
// if yes (like csn15 in the example above, then we have to go back
@@ -2603,9 +2601,12 @@
if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
{
// let's try to seek the first change <= eligibleCSN
- CSN newCSN = domainDB.getCSNAfter(baseDN, serverId, eligibleCSN);
+ CSN newCSN = domainDB.getCSNAfter(baseDN,
+ mostRecentDbCSN.getServerId(), eligibleCSN);
result.update(newCSN);
- } else {
+ }
+ else
+ {
// for this serverId, all changes in the ChangelogDb are holder
// than eligibleCSN, the most recent in the db is our guy.
result.update(mostRecentDbCSN);
@@ -2653,14 +2654,11 @@
CSN eligibleCSN = null;
final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN);
- for (final int serverId : newestCSNs)
+ for (final CSN changelogNewestCSN : newestCSNs)
{
- // Consider this producer (DS/db).
- final CSN changelogNewestCSN = newestCSNs.getCSN(serverId);
-
// Should it be considered for eligibility ?
- CSN heartbeatLastCSN =
- getChangeTimeHeartbeatState().getCSN(serverId);
+ int serverId = changelogNewestCSN.getServerId();
+ CSN heartbeatLastCSN = getChangeTimeHeartbeatState().getCSN(serverId);
// If the most recent UpdateMsg or CLHeartbeatMsg received is very old
// then the domain is considered down and not considered for eligibility
@@ -2804,10 +2802,9 @@
{
long res = 0;
- for (int serverId : getLatestServerState())
+ for (CSN startCSN : getLatestServerState())
{
- CSN startCSN = startState.getCSN(serverId);
- long serverIdRes = getCount(serverId, startCSN, endCSN);
+ long serverIdRes = getCount(startCSN, endCSN);
// The startPoint is excluded when counting the ECL eligible changes
if (startCSN != null && serverIdRes > 0)
@@ -2831,10 +2828,12 @@
public long getEligibleCount(CSN startCSN, CSN endCSN)
{
long res = 0;
- for (int serverId : getLatestServerState()) {
- CSN lStartCSN =
- new CSN(startCSN.getTime(), startCSN.getSeqnum(), serverId);
- res += getCount(serverId, lStartCSN, endCSN);
+ for (CSN csn : getLatestServerState())
+ {
+ int serverId = csn.getServerId();
+ CSN lStartCSN = new CSN(startCSN.getTime(), startCSN.getSeqnum(),
+ serverId);
+ res += getCount(lStartCSN, endCSN);
}
return res;
}
--
Gitblit v1.10.0