From ae41fb531bbbd1bc8f9f6a82eb41c4eeb2da63c4 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Mon, 30 May 2011 15:20:19 +0000
Subject: [PATCH] Resolve several issues with the External Changelog with regards to Cookies and changes with updates and purging. More specifically these changes are resolving the following issues : OPENDJ-57 - ECL: lastChangeNumber and firstChangeNumber reset to zero when the changelog is purged to empty OPENDJ-172 - External ChangeLog Cookie varies when searching with an empty cookie. Cookie should be reproducible. OPENDJ-173 - External ChangeLog cookies content is altered by Change purging and prevents from continuing search with a previous returned cookie.
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 2
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 21 +++--
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java | 8 +
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 110 +++++++++++++++++++++++++++
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 23 -----
opends/src/server/org/opends/server/replication/common/ServerState.java | 27 ++++++
opends/src/server/org/opends/server/replication/server/DbHandler.java | 7 +
opends/src/server/org/opends/server/replication/server/DraftCNDB.java | 28 +++++++
8 files changed, 193 insertions(+), 33 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index e8fea1c..ff19e57 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -518,4 +518,31 @@
{
return saved;
}
+
+ /**
+ * Build a copy of the ServerState with only ChangeNumbers older than
+ * a specific ChangeNumber. This is used when building the initial
+ * Cookie in the External Changelog, to cope with purged changes.
+ * @param cn The ChangeNumber to compare the ServerState with
+ * @return a copy of the ServerState which only contains the ChangeNumbers
+ * older than cn.
+ */
+ public ServerState duplicateOnlyOlderThan(ChangeNumber cn)
+ {
+ ServerState newState = new ServerState();
+ synchronized (list)
+ {
+ for (Integer key : list.keySet())
+ {
+ ChangeNumber change = list.get(key);
+ Integer id = change.getServerId();
+ if (change.older(cn))
+ {
+ newState.list.put(id,change);
+ }
+ }
+ }
+ return newState;
+ }
+
}
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 43cdff7..cfb83e4 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -466,6 +466,13 @@
ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0);
+ // Find the last changeNumber before the trimDate, in the Database.
+ ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate);
+ if (lastBeforeTrimDate != null)
+ {
+ // If we found it, we want to stop trimming when reaching it.
+ trimDate = lastBeforeTrimDate;
+ }
// In case of deadlock detection by the Database, this thread can
// by aborted by a DeadlockException. This is a transient error and
// the transaction should be attempted again.
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index f42a956..85af56e 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -668,6 +668,34 @@
}
/**
+ * Getter for the integer value of the current curson, representing
+ * the current DraftChangeNumber being processed.
+ *
+ * @return the current DraftCN as an integer.
+ */
+ public int currentKey()
+ {
+ try
+ {
+ OperationStatus status =
+ cursor.getCurrent(key, entry, LockMode.DEFAULT);
+
+ if (status != OperationStatus.SUCCESS)
+ {
+ return -1;
+ }
+ String str = decodeUTF8(key.getData());
+ int draftCN = new Integer(str);
+ return draftCN;
+ }
+ catch(Exception e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ return -1;
+ }
+
+ /**
* Returns the replication changeNumber associated with the current key.
* @return the replication changeNumber
*/
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index dcf15e6..ef4df7c 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -389,18 +389,22 @@
// We don't use the returned endState but it's updating CN as
// reading
- domain.getEligibleState(crossDomainEligibleCN, false);
+ domain.getEligibleState(crossDomainEligibleCN);
ChangeNumber fcn = startState.getMaxChangeNumber(
cn.getServerId());
- if (cn.older(fcn))
+ int currentKey = cursor.currentKey();
+ // Do not delete the lastKey. This should allow us to
+ // preserve last change number over time.
+ if ((currentKey != lastkey) && (cn.older(fcn)))
{
size++;
cursor.delete();
}
else
{
+ firstkey = currentKey;
finished = true;
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 624c058..3ba19c3 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -182,8 +182,8 @@
+ ")" +
"] [nextNonEligibleMsg=" + nextNonEligibleMsg +
"] [startState=" + startState +
- "] [stopState= " + stopState +
- "] [currentState= " + currentState + "]]");
+ "] [stopState=" + stopState +
+ "] [currentState=" + currentState + "]]");
}
/**
@@ -749,7 +749,7 @@
if (isPersistent ==
StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
{
- newDomainCtxt.startState = rsd.getEligibleState(eligibleCN, true);
+ newDomainCtxt.startState = rsd.getEligibleState(eligibleCN);
startStatesFromProvidedCookie.remove(rsd.getBaseDn());
}
else
@@ -766,7 +766,12 @@
// let's start traversing this domain from the beginning of
// what we have in the replication changelog
if (newDomainCtxt.startState == null)
- newDomainCtxt.startState = new ServerState();
+ {
+ ChangeNumber latestTrimCN =
+ new ChangeNumber(newDomainCtxt.domainLatestTrimDate, 0,0);
+ newDomainCtxt.startState = rsd.getStartState()
+ .duplicateOnlyOlderThan(latestTrimCN);
+ }
}
else
{
@@ -807,7 +812,7 @@
}
// Set the stop state for the domain from the eligibleCN
- newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN, true);
+ newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
}
newDomainCtxt.currentState = new ServerState();
@@ -822,9 +827,8 @@
rsd.registerHandler(mh);
newDomainCtxt.mh = mh;
- previousCookie.update(
- newDomainCtxt.rsd.getBaseDn(),
- newDomainCtxt.startState);
+ previousCookie.update(newDomainCtxt.rsd.getBaseDn(),
+ newDomainCtxt.startState);
// store the new context
tmpSet.add(newDomainCtxt);
@@ -1052,6 +1056,7 @@
ResultCode.UNWILLING_TO_PERFORM,
ERR_INVALID_COOKIE_SYNTAX.get());
}
+
excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs();
replicationServer.disableEligibility(excludedServiceIDs);
eligibleCN = replicationServer.getEligibleCN();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index b512b7e..2a6c676 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -450,6 +450,12 @@
*/
cn = null;
}
+ else
+ {
+ str = decodeUTF8(key.getData());
+ cn= new ChangeNumber(str);
+ // There can't be 2 counter record next to each other
+ }
}
}
catch (DatabaseException e)
@@ -470,6 +476,110 @@
}
/**
+ * Try to find in the DB, the change number right before the one
+ * passed as a parameter.
+ *
+ * @param changeNumber
+ * The changeNumber from which we start searching.
+ * @return the changeNumber right before the one passed as a parameter.
+ * Can return null if there is none.
+ */
+ public ChangeNumber getPreviousChangeNumber(ChangeNumber changeNumber)
+ {
+
+ if (changeNumber == null)
+ return null;
+
+ Cursor cursor = null;
+ ChangeNumber cn = null;
+
+ DatabaseEntry key = new ReplicationKey(changeNumber);
+ DatabaseEntry data = new DatabaseEntry();
+
+ Transaction txn = null;
+
+ dbCloseLock.readLock().lock();
+ try
+ {
+ cursor = db.openCursor(txn, null);
+ if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) ==
+ OperationStatus.SUCCESS)
+ {
+ // We can move close to the changeNumber.
+ // Let's move to the previous change.
+ if (cursor.getPrev(key, data, LockMode.DEFAULT) ==
+ OperationStatus.SUCCESS)
+ {
+ String str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
+ {
+ if (cursor.getPrev(key, data,
+ LockMode.DEFAULT) != OperationStatus.SUCCESS)
+ {
+ // database starts with a counter record.
+ cn = null;
+ }
+ else
+ {
+ str = decodeUTF8(key.getData());
+ cn= new ChangeNumber(str);
+ // There can't be 2 counter record next to each other
+ }
+ }
+ }
+ // else, there was no change previous to our changeNumber.
+ }
+ else
+ {
+ // We could not move the cursor past to the changeNumber
+ // Check if the last change is older than changeNumber
+ if (cursor.getLast(key, data, LockMode.DEFAULT) ==
+ OperationStatus.SUCCESS)
+ {
+ String str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
+ {
+ if (cursor.getPrev(key, data,
+ LockMode.DEFAULT) != OperationStatus.SUCCESS)
+ {
+ /*
+ * database only contain a counter record, should not be
+ * possible, but Ok, let's just say no change Number
+ */
+ cn = null;
+ }
+ else
+ {
+ str = decodeUTF8(key.getData());
+ cn= new ChangeNumber(str);
+ // There can't be 2 counter record next to each other
+ }
+ }
+ }
+ }
+ }
+ catch (DatabaseException e)
+ {
+ /* database is faulty */
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ // TODO: Verify if shutting down replication is the right thing to do
+ replicationServer.shutdown();
+ cn = null;
+ }
+ finally
+ {
+ closeLockedCursor(cursor);
+ }
+ return cn;
+ }
+
+
+ /**
* {@inheritDoc}
*/
@Override
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index c1a33a4..a21b51b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1977,7 +1977,7 @@
continue;
result.update(rsd.getBaseDn(), rsd.getEligibleState(
- getEligibleCN(),false));
+ getEligibleCN()));
}
}
return result;
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index f7182f6..64cc575 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -3153,12 +3153,9 @@
* The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
*
* @param eligibleCN The provided eligibleCN.
- * @param allowOlderThanPurgeDate When true, the returned state can be older
- * than the purge date of the domain.
* @return The computed eligible server state.
*/
- public ServerState getEligibleState(ChangeNumber eligibleCN,
- boolean allowOlderThanPurgeDate)
+ public ServerState getEligibleState(ChangeNumber eligibleCN)
{
ServerState result = new ServerState();
@@ -3225,24 +3222,6 @@
}
}
- if (allowOlderThanPurgeDate == false)
- {
- boolean domainPurged = true;
- long latestDomainTrimDate = getLatestDomainTrimDate();
- Iterator<Integer> it = result.iterator();
- while (it.hasNext())
- {
- int sid = it.next();
- ChangeNumber cn = result.getMaxChangeNumber(sid);
- if ((cn.getTime()>0) && (cn.getTime()<latestDomainTrimDate))
- result.update(new ChangeNumber(0,0,sid));
- else
- domainPurged = false;
- }
- if (domainPurged == true)
- result.clear();
- }
-
if (debugEnabled())
TRACER.debugInfo("In " + this
+ " getEligibleState() result is " + result);
--
Gitblit v1.10.0