From bcd9325b7d47b6932d140a15ee761252e130ab7e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 09 Oct 2013 13:35:05 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 86 +++------
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 25 +-
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java | 15 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java | 315 +++++++++++++++++----------------------
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 6
opends/src/server/org/opends/server/replication/common/ServerState.java | 15 +
6 files changed, 201 insertions(+), 261 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 888a1a2..ef5bd1b 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -529,16 +529,19 @@
}
/**
- * Build a copy of the ServerState with only CSNs older than
- * a specific CSN. This is used when building the initial
- * Cookie in the External Changelog, to cope with purged changes.
- * @param csn The CSN to compare the ServerState with
+ * Build a copy of the ServerState with only CSNs older than a provided
+ * timestamp. This is used when building the initial Cookie in the External
+ * Changelog, to cope with purged changes.
+ *
+ * @param timestamp
+ * The timestamp to compare the ServerState against
* @return a copy of the ServerState which only contains the CSNs older than
* csn.
*/
- public ServerState duplicateOnlyOlderThan(CSN csn)
+ public ServerState duplicateOnlyOlderThan(long timestamp)
{
- ServerState newState = new ServerState();
+ final CSN csn = new CSN(timestamp, 0, 0);
+ final ServerState newState = new ServerState();
synchronized (serverIdToCSN)
{
for (CSN change : serverIdToCSN.values())
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index e72dabe..259711d 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -186,8 +186,8 @@
.append(")")
.append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
.append("] [startState=").append(startState)
- .append("] [stopState=").append(stopState)
.append("] [currentState=").append(currentState)
+ .append("] [stopState=").append(stopState)
.append("]]");
}
@@ -735,11 +735,10 @@
}
// skip unused domains
- final ServerState latestServerState = domain.getLatestServerState();
- if (latestServerState.isEmpty())
+ final ServerState latestState = domain.getLatestServerState();
+ if (latestState.isEmpty())
continue;
-
// Creates the new domain context
final DomainContext newDomainCtxt = new DomainContext();
newDomainCtxt.active = true;
@@ -749,7 +748,7 @@
// Assign the start state for the domain
if (isPersistent == PERSISTENT_CHANGES_ONLY)
{
- newDomainCtxt.startState = latestServerState;
+ newDomainCtxt.startState = latestState;
startStatesFromProvidedCookie.remove(domain.getBaseDN());
}
else
@@ -767,10 +766,9 @@
// what we have in the replication changelog
if (newDomainCtxt.startState == null)
{
- CSN latestTrimCSN =
- new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0);
newDomainCtxt.startState =
- domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN);
+ domain.getOldestState().duplicateOnlyOlderThan(
+ newDomainCtxt.domainLatestTrimDate);
}
}
else
@@ -790,7 +788,7 @@
}
}
- newDomainCtxt.stopState = latestServerState;
+ newDomainCtxt.stopState = latestState;
}
newDomainCtxt.currentState = new ServerState();
@@ -860,12 +858,11 @@
ServerState cookie)
{
/*
- when the provided startState is older than the replication
- changelogdb startState, it means that the replication
- changelog db has been trimmed and the cookie is not valid
- anymore.
+ when the provided startState is older than the replication changelogdb
+ oldestState, it means that the replication changelog db has been trimmed and
+ the cookie is not valid anymore.
*/
- for (CSN dbOldestChange : rsDomain.getStartState())
+ for (CSN dbOldestChange : rsDomain.getOldestState())
{
CSN providedChange = cookie.getCSN(dbOldestChange.getServerId());
if (providedChange != null && providedChange.isOlderThan(dbOldestChange))
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index c0fdf78..32c7017 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1398,43 +1398,33 @@
*/
try
{
- boolean dbEmpty = true;
- long oldestChangeNumber = 0;
- long newestChangeNumber = 0;
-
final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
- final CNIndexRecord oldestCNRecord = cnIndexDB.getOldestRecord();
- final CNIndexRecord newestCNRecord = cnIndexDB.getNewestRecord();
-
- boolean noCookieForNewestCN = true;
- CSN csnForNewestCN = null;
- DN baseDNForNewestCN = null;
- if (oldestCNRecord != null)
+ final CNIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
+ final CNIndexRecord newestRecord = cnIndexDB.getNewestRecord();
+ if (oldestRecord == null)
{
- if (newestCNRecord == null)
- {
- // Edge case: DB was cleaned or closed in between calls to
- // getOldest*() and getNewest*().
- // The only remaining solution is to fail fast.
- throw new ChangelogException(
- ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get());
- }
-
- dbEmpty = false;
- oldestChangeNumber = oldestCNRecord.getChangeNumber();
- newestChangeNumber = newestCNRecord.getChangeNumber();
-
- // Get the generalized state associated with the current newest change
- // number and initializes from it the startStates table
- String newestCNGenState = newestCNRecord.getPreviousCookie();
- noCookieForNewestCN =
- newestCNGenState == null || newestCNGenState.length() == 0;
-
- csnForNewestCN = newestCNRecord.getCSN();
- baseDNForNewestCN = newestCNRecord.getBaseDN();
+ // The database is empty
+ long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber();
+ return new long[] { lastGeneratedCN, lastGeneratedCN };
+ }
+ if (newestRecord == null) // oldestCNRecord != null
+ {
+ // Edge case: DB was cleaned or closed in between calls to
+ // getOldest*() and getNewest*().
+ // The only remaining solution is to fail fast.
+ throw new ChangelogException(
+ ERR_READING_OLDEST_THEN_NEWEST_IN_CHANGENUMBER_DATABASE.get());
}
- long newestDate = 0;
+ long oldestChangeNumber = oldestRecord.getChangeNumber();
+ long newestChangeNumber = newestRecord.getChangeNumber();
+
+ // Get the generalized state associated with the current newest change
+ // number and initializes from the startState table
+ final String cookie = newestRecord.getPreviousCookie();
+ boolean noCookieForNewestCN = cookie == null || cookie.length() == 0;
+
+ long newestTime = newestRecord.getCSN().getTime();
for (ReplicationServerDomain rsDomain : getReplicationServerDomains())
{
if (contains(
@@ -1447,30 +1437,25 @@
if (noCookieForNewestCN)
{
// Count changes of this domain from the beginning of the changelog
- CSN trimCSN = new CSN(rsDomain.getLatestDomainTrimDate(), 0, 0);
- ec = rsDomain.getEligibleCount(
- rsDomain.getStartState().duplicateOnlyOlderThan(trimCSN),
- maxOldestChangeNumber);
+ final ServerState startState = rsDomain.getOldestState()
+ .duplicateOnlyOlderThan(rsDomain.getLatestDomainTrimDate());
+ ec = rsDomain.getEligibleCount(startState, maxOldestChangeNumber);
}
else
{
// There are records in the CNIndexDB (so already returned to clients)
// BUT
// There is nothing related to this domain in the newest CNIndexRecord
- // (may be this domain was disabled when this record was returned).
- // In that case, are counted the changes from
- // the date of the most recent change from this newest CNIndexRecord
- if (newestDate == 0)
- {
- newestDate = csnForNewestCN.getTime();
- }
+ // (maybe this domain was disabled when this record was returned).
+ // In that case, are counted the changes from the time of the most
+ // recent change
// And count changes of this domain from the date of the
// newest seqnum record (that does not refer to this domain)
- CSN csnx = new CSN(newestDate, csnForNewestCN.getSeqnum(), 0);
+ CSN csnx = new CSN(newestTime, newestRecord.getCSN().getSeqnum(), 0);
ec = rsDomain.getEligibleCount(csnx, maxOldestChangeNumber);
- if (baseDNForNewestCN.equals(rsDomain.getBaseDN()))
+ if (newestRecord.getBaseDN().equals(rsDomain.getBaseDN()))
ec--;
}
@@ -1482,15 +1467,6 @@
if (ec > 0 && oldestChangeNumber == 0)
oldestChangeNumber = 1;
}
-
- if (dbEmpty)
- {
- // The database was empty, just keep increasing numbers since last time
- // we generated one change number.
- long lastGeneratedCN = cnIndexDB.getLastGeneratedChangeNumber();
- oldestChangeNumber += lastGeneratedCN;
- newestChangeNumber += lastGeneratedCN;
- }
return new long[] { oldestChangeNumber, newestChangeNumber };
}
catch (ChangelogException e)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index a3a9043..bf5d0ee 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2568,15 +2568,15 @@
}
/**
- * Returns the start state of the domain, made of the oldest CSN stored for
- * each serverId.
+ * Returns the oldest known state for the domain, made of the oldest CSN
+ * stored for each serverId.
* <p>
* Note: Because the replication changelogDB trimming always keep one change
* whatever its date, the CSN contained in the returned state can be very old.
*
* @return the start state of the domain.
*/
- public ServerState getStartState()
+ public ServerState getOldestState()
{
return domainDB.getDomainOldestCSNs(baseDN);
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index d2d7795..f246160 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -375,14 +375,15 @@
continue;
}
- // Purge up to wherever the other DBs have been purged to.
- // FIXME there is an opportunity for a phantom record in the current
- // DB if the replicaDB gets purged after the next if statement.
+ // FIXME there is an opportunity for a phantom record in the CNIndexDB
+ // if the replicaDB gets purged after call to domain.getOldestState().
final CSN csn = record.getCSN();
- final ServerState startState = domain.getStartState();
- final CSN fcsn = startState.getCSN(csn.getServerId());
+ final ServerState oldestState = domain.getOldestState();
+ final CSN fcsn = oldestState.getCSN(csn.getServerId());
if (csn.isOlderThan(fcsn))
{
+ // This change which has already been purged from the corresponding
+ // replicaDB => purge it from CNIndexDB
cursor.delete();
continue;
}
@@ -397,7 +398,7 @@
if (debugEnabled())
TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
- + csnVector + " -- StartState:" + startState);
+ + csnVector + " -- StartState:" + oldestState);
}
catch(Exception e)
{
@@ -409,7 +410,7 @@
if (csnVector == null
|| (csnVector.getCSN(csn.getServerId()) != null
- && !csnVector.cover(startState)))
+ && !csnVector.cover(oldestState)))
{
cursor.delete();
if (debugEnabled())
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index 0f7e6ea..9f1ab7a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -45,7 +45,6 @@
import org.opends.server.core.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.plugins.InvocationCounterPlugin;
-import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.*;
import org.opends.server.replication.ReplicationTestCase;
@@ -115,8 +114,6 @@
/** The LDAPStatistics object associated with the LDAP connection handler. */
private LDAPStatistics ldapStatistics;
- private CSN gblCSN;
-
private int brokerSessionTimeout = 5000;
private int maxWindow = 100;
@@ -363,10 +360,10 @@
ECLCompatWriteReadAllOps(1);
// Write 4 additional changes and read ECL from a provided change number
- int ts = ECLCompatWriteReadAllOps(5);
+ CSN csn = ECLCompatWriteReadAllOps(5);
// Test request from a provided change number - read 6
- ECLCompatReadFrom(6);
+ ECLCompatReadFrom(6, csn);
// Test request from a provided change number interval - read 5-7
ECLCompatReadFromTo(5,7);
@@ -376,7 +373,7 @@
// Test first and last change number, add a new change, do not
// search again the ECL, but search for first and last
- ECLCompatTestLimitsAndAdd(1,8, ts);
+ ECLCompatTestLimitsAndAdd(1, 8, 4);
// Test CNIndexDB is purged when replication change log is purged
ECLPurgeCNIndexDBAfterChangelogClear();
@@ -392,11 +389,11 @@
public void ECLReplicationServerFullTest16() throws Exception
{
// Persistent search in init + changes mode
- ECLPsearch(false, true);
+ CSN csn = ECLPsearch(false, true);
// Test Filter on replication csn
// TODO: test with optimization when code done.
- ECLFilterOnReplicationCsn();
+ ECLFilterOnReplicationCSN(csn);
}
private void ECLIsNotASupportedSuffix() throws Exception
@@ -499,7 +496,7 @@
debugInfo(tn, "publishes:" + delMsg2);
// wait for the server to take these changes into account
- sleep(500);
+ Thread.sleep(500);
// open ECL broker
serverECL = openReplicationSession(
@@ -585,12 +582,7 @@
/** Add an entry in the database */
private void addEntry(Entry entry) throws Exception
{
- AddOperation addOp = new AddOperationBasis(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
- entry.getUserAttributes(), entry.getOperationalAttributes());
- addOp.setInternalOperation(true);
- addOp.run();
+ AddOperation addOp = connection.processAdd(entry);
waitOpResult(addOp, ResultCode.SUCCESS);
assertNotNull(getEntry(entry.getDN(), 1000, true));
}
@@ -629,9 +621,9 @@
DomainFakeCfg domainConf = new DomainFakeCfg(baseDN2, 1602, replServers);
domain2 = startNewDomain(domainConf, null,null);
- sleep(1000);
+ Thread.sleep(1000);
addEntry(createEntry(baseDN2));
- sleep(2000);
+ Thread.sleep(2000);
// Search on ECL from start on all suffixes
String cookie = "";
@@ -698,23 +690,23 @@
s2test2 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID);
- sleep(500);
+ Thread.sleep(500);
// Produce updates
long time = TimeThread.getTime();
int ts = 1;
- CSN csn = new CSN(time, ts++, s1test.getServerId());
- publishDeleteMsgInOTest(s1test, csn, tn, 1);
+ CSN csn1 = new CSN(time, ts++, s1test.getServerId());
+ publishDeleteMsgInOTest(s1test, csn1, tn, 1);
- csn = new CSN(time++, ts++, s2test2.getServerId());
- publishDeleteMsgInOTest2(s2test2, csn, tn, 2);
+ CSN csn2 = new CSN(time, ts++, s2test2.getServerId());
+ publishDeleteMsgInOTest2(s2test2, csn2, tn, 2);
- CSN csn3 = new CSN(time++, ts++, s2test2.getServerId());
+ CSN csn3 = new CSN(time, ts++, s2test2.getServerId());
publishDeleteMsgInOTest2(s2test2, csn3, tn, 3);
- csn = new CSN(time++, ts++, s1test.getServerId());
- publishDeleteMsgInOTest(s1test, csn, tn, 4);
- sleep(1500);
+ CSN csn4 = new CSN(time, ts++, s1test.getServerId());
+ publishDeleteMsgInOTest(s1test, csn4, tn, 4);
+ Thread.sleep(1500);
// Changes are :
// s1 s2
@@ -749,9 +741,9 @@
cookie = getCookie(searchOp.getSearchEntries(), 1, tn, ldifWriter, cookie);
// Now publishes a new change and search from the previous cookie
- CSN csn5 = new CSN(time++, ts++, s1test.getServerId());
+ CSN csn5 = new CSN(time, ts++, s1test.getServerId());
publishDeleteMsgInOTest(s1test, csn5, tn, 5);
- sleep(500);
+ Thread.sleep(500);
// Changes are :
// s1 s2
@@ -773,30 +765,29 @@
s2test = openReplicationSession(TEST_ROOT_DN, 1204,
100, replicationServerPort, brokerSessionTimeout, true);
- sleep(500);
+ Thread.sleep(500);
time = TimeThread.getTime();
- csn = new CSN(time++, ts++, s1test2.getServerId());
- publishDeleteMsgInOTest2(s1test2, csn, tn, 6);
+ CSN csn6 = new CSN(time, ts++, s1test2.getServerId());
+ publishDeleteMsgInOTest2(s1test2, csn6, tn, 6);
- csn = new CSN(time++, ts++, s2test.getServerId());
- publishDeleteMsgInOTest(s2test, csn, tn, 7);
+ CSN csn7 = new CSN(time, ts++, s2test.getServerId());
+ publishDeleteMsgInOTest(s2test, csn7, tn, 7);
- CSN csn8 = new CSN(time++, ts++, s1test2.getServerId());
+ CSN csn8 = new CSN(time, ts++, s1test2.getServerId());
publishDeleteMsgInOTest2(s1test2, csn8, tn, 8);
- CSN csn9 = new CSN(time++, ts++, s2test.getServerId());
+ CSN csn9 = new CSN(time, ts++, s2test.getServerId());
publishDeleteMsgInOTest(s2test, csn9, tn, 9);
- sleep(500);
+ Thread.sleep(500);
- ServerState startState = getReplicationDomainStartState(TEST_ROOT_DN);
- assertEquals(startState.getCSN(s1test.getServerId()).getSeqnum(), 1);
- assertTrue(startState.getCSN(s2test.getServerId()) != null);
- assertEquals(startState.getCSN(s2test.getServerId()).getSeqnum(), 7);
+ final ServerState oldestState = getDomainOldestState(TEST_ROOT_DN);
+ assertEquals(oldestState.getCSN(s1test.getServerId()), csn1);
+ assertEquals(oldestState.getCSN(s2test.getServerId()), csn7);
- startState = getReplicationDomainStartState(TEST_ROOT_DN2);
- assertEquals(startState.getCSN(s2test2.getServerId()).getSeqnum(), 2);
- assertEquals(startState.getCSN(s1test2.getServerId()).getSeqnum(), 6);
+ final ServerState oldestState2 = getDomainOldestState(TEST_ROOT_DN2);
+ assertEquals(oldestState2.getCSN(s2test2.getServerId()), csn2);
+ assertEquals(oldestState2.getCSN(s1test2.getServerId()), csn6);
// Test lastExternalChangelogCookie attribute of the ECL
MultiDomainServerState expectedLastCookie =
@@ -847,9 +838,9 @@
debugInfo(tn, "Ending test successfully");
}
- private ServerState getReplicationDomainStartState(DN baseDN)
+ private ServerState getDomainOldestState(DN baseDN)
{
- return replicationServer.getReplicationServerDomain(baseDN).getStartState();
+ return replicationServer.getReplicationServerDomain(baseDN).getOldestState();
}
private String getCookie(List<SearchResultEntry> entries,
@@ -1002,8 +993,8 @@
// 5. Assert that a request with an "old" cookie - one that refers to
// changes that have been removed by the replication changelog trimming
// returns the appropriate error.
- debugInfo(tn, "d1 trimdate" + getReplicationDomainStartState(TEST_ROOT_DN));
- debugInfo(tn, "d2 trimdate" + getReplicationDomainStartState(TEST_ROOT_DN2));
+ debugInfo(tn, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
+ debugInfo(tn, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, tn, UNWILLING_TO_PERFORM);
assertEquals(searchOp.getSearchEntries().size(), 0);
assertTrue(searchOp.getErrorMessage().toString().startsWith(
@@ -1130,7 +1121,7 @@
ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
server01.publish(modDNMsg);
debugInfo(tn, " publishes " + modDNMsg.getCSN());
- sleep(1000);
+ Thread.sleep(1000);
String cookie= "";
InternalSearchOperation searchOp =
@@ -1340,7 +1331,7 @@
/**
* Test persistent search
*/
- private void ECLPsearch(boolean changesOnly, boolean compatMode) throws Exception
+ private CSN ECLPsearch(boolean changesOnly, boolean compatMode) throws Exception
{
String tn = "ECLPsearch_" + changesOnly + "_" + compatMode;
debugInfo(tn, "Starting test \n\n");
@@ -1360,6 +1351,7 @@
}
assertNotNull(ldapStatistics);
+ try
{
// Create broker on suffix
ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1,
@@ -1373,7 +1365,7 @@
"11111111-1112-1113-1114-111111111114");
debugInfo(tn, " publishing " + delMsg.getCSN());
server01.publish(delMsg);
- sleep(500); // let's be sure the message is in the RS
+ Thread.sleep(500); // let's be sure the message is in the RS
// Creates cookie control
String cookie = "";
@@ -1416,7 +1408,7 @@
LDAPMessage message;
message = new LDAPMessage(2, searchRequest, controls);
w.writeMessage(message);
- sleep(500);
+ Thread.sleep(500);
SearchResultDoneProtocolOp searchResultDone;
@@ -1467,8 +1459,7 @@
"11111111-1112-1113-1114-111111111115");
debugInfo(tn, " publishing " + delMsg.getCSN());
server01.publish(delMsg);
- this.gblCSN = csn;
- sleep(1000);
+ Thread.sleep(1000);
debugInfo(tn, delMsg.getCSN() +
" published , psearch will now wait for new entries");
@@ -1502,7 +1493,7 @@
break;
}
}
- sleep(1000);
+ Thread.sleep(1000);
// Check we received change 2
for (LDAPAttribute a : searchResultEntry.getAttributes())
@@ -1581,9 +1572,15 @@
}
close(s);
- while (!s.isClosed()) sleep(100);
+ while (!s.isClosed())
+ Thread.sleep(100);
+
+ return csn;
}
- debugInfo(tn, "Ends test successfully");
+ finally
+ {
+ debugInfo(tn, "Ends test successfully");
+ }
}
private SearchRequestProtocolOp createSearchRequest(String filterString,
@@ -1647,7 +1644,7 @@
"11111111-1111-1111-1111-111111111111");
debugInfo(tn, " publishing " + delMsg1);
server01.publish(delMsg1);
- sleep(500); // let's be sure the message is in the RS
+ Thread.sleep(500); // let's be sure the message is in the RS
// Produce update 2
CSN csn2 = new CSN(TimeThread.getTime(), ts++, SERVER_ID_2);
@@ -1656,7 +1653,7 @@
"22222222-2222-2222-2222-222222222222");
debugInfo(tn, " publishing " + delMsg2);
server02.publish(delMsg2);
- sleep(500); // let's be sure the message is in the RS
+ Thread.sleep(500); // let's be sure the message is in the RS
// Produce update 3
CSN csn3 = new CSN(TimeThread.getTime(), ts++, SERVER_ID_2);
@@ -1665,7 +1662,7 @@
"33333333-3333-3333-3333-333333333333");
debugInfo(tn, " publishing " + delMsg3);
server02.publish(delMsg3);
- sleep(500); // let's be sure the message is in the RS
+ Thread.sleep(500); // let's be sure the message is in the RS
// Creates cookie control
String cookie = "";
@@ -1723,15 +1720,15 @@
LDAPMessage message;
message = new LDAPMessage(2, searchRequest1, controls);
w1.writeMessage(message);
- sleep(500);
+ Thread.sleep(500);
message = new LDAPMessage(2, searchRequest2, controls);
w2.writeMessage(message);
- sleep(500);
+ Thread.sleep(500);
message = new LDAPMessage(2, searchRequest3, controls);
w3.writeMessage(message);
- sleep(500);
+ Thread.sleep(500);
SearchResultEntryProtocolOp searchResultEntry = null;
SearchResultDoneProtocolOp searchResultDone = null;
@@ -1857,7 +1854,7 @@
"44444444-4444-4444-4444-444444444444");
debugInfo(tn, " publishing " + delMsg11);
server01.publish(delMsg11);
- sleep(500);
+ Thread.sleep(500);
debugInfo(tn, delMsg11.getCSN() + " published additionally ");
// Produces additional change
@@ -1867,7 +1864,7 @@
"55555555-5555-5555-5555-555555555555");
debugInfo(tn, " publishing " + delMsg12 );
server02.publish(delMsg12);
- sleep(500);
+ Thread.sleep(500);
debugInfo(tn, delMsg12.getCSN() + " published additionally ");
// Produces additional change
@@ -1877,7 +1874,7 @@
"66666666-6666-6666-6666-666666666666");
debugInfo(tn, " publishing " + delMsg13);
server02.publish(delMsg13);
- sleep(500);
+ Thread.sleep(500);
debugInfo(tn, delMsg13.getCSN() + " published additionally ");
// wait 11
@@ -1910,7 +1907,7 @@
break;
}
}
- sleep(1000);
+ Thread.sleep(1000);
debugInfo(tn, "Search 1 successfully receives additional changes");
// wait 12 & 13
@@ -1943,7 +1940,7 @@
break;
}
}
- sleep(1000);
+ Thread.sleep(1000);
debugInfo(tn, "Search 2 successfully receives additional changes");
// wait 11 & 12 & 13
@@ -1976,7 +1973,7 @@
break;
}
}
- sleep(1000);
+ Thread.sleep(1000);
// Check we received change 13
for (LDAPAttribute a : searchResultEntry.getAttributes())
@@ -2010,7 +2007,7 @@
close(s);
while (!s.isClosed())
{
- sleep(100);
+ Thread.sleep(100);
}
}
}
@@ -2082,14 +2079,6 @@
}
/**
- * Utility - sleeping as long as required
- */
- private void sleep(long time) throws InterruptedException
- {
- Thread.sleep(time);
- }
-
- /**
* Utility - log debug message - highlight it is from the test and not
* from the server code. Makes easier to observe the test steps.
*/
@@ -2150,6 +2139,9 @@
}
}
+ /**
+ * FIXME this test actually tests nothing: there are no asserts.
+ */
private void ChangeTimeHeartbeatTest() throws Exception
{
String tn = "ChangeTimeHeartbeatTest";
@@ -2170,23 +2162,23 @@
s2test2 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2,
100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID);
- sleep(500);
+ Thread.sleep(500);
// Produce updates
long time = TimeThread.getTime();
int ts = 1;
- CSN csn = new CSN(time, ts++, s1test.getServerId());
- publishDeleteMsgInOTest(s1test, csn, tn, 1);
+ CSN csn1 = new CSN(time, ts++, s1test.getServerId());
+ publishDeleteMsgInOTest(s1test, csn1, tn, 1);
- csn = new CSN(time++, ts++, s2test2.getServerId());
- publishDeleteMsgInOTest(s2test2, csn, tn, 2);
+ CSN csn2 = new CSN(time, ts++, s2test2.getServerId());
+ publishDeleteMsgInOTest(s2test2, csn2, tn, 2);
- CSN csn3 = new CSN(time++, ts++, s2test2.getServerId());
+ CSN csn3 = new CSN(time, ts++, s2test2.getServerId());
publishDeleteMsgInOTest(s2test2, csn3, tn, 3);
- csn = new CSN(time++, ts++, s1test.getServerId());
- publishDeleteMsgInOTest(s1test, csn, tn, 4);
- sleep(500);
+ CSN csn4 = new CSN(time, ts++, s1test.getServerId());
+ publishDeleteMsgInOTest(s1test, csn4, tn, 4);
+ Thread.sleep(500);
// --
s1test2 = openReplicationSession(TEST_ROOT_DN2, 1203,
@@ -2194,26 +2186,24 @@
s2test = openReplicationSession(TEST_ROOT_DN, 1204,
100, replicationServerPort, brokerSessionTimeout, true);
- sleep(500);
+ Thread.sleep(500);
// Test startState ("first cookie") of the ECL
time = TimeThread.getTime();
- csn = new CSN(time++, ts++, s1test2.getServerId());
- publishDeleteMsgInOTest2(s1test2, csn, tn, 6);
+ CSN csn6 = new CSN(time, ts++, s1test2.getServerId());
+ publishDeleteMsgInOTest2(s1test2, csn6, tn, 6);
- csn = new CSN(time++, ts++, s2test.getServerId());
- publishDeleteMsgInOTest(s2test, csn, tn, 7);
+ CSN csn7 = new CSN(time, ts++, s2test.getServerId());
+ publishDeleteMsgInOTest(s2test, csn7, tn, 7);
- CSN csn8 = new CSN(time++, ts++, s1test2.getServerId());
+ CSN csn8 = new CSN(time, ts++, s1test2.getServerId());
publishDeleteMsgInOTest2(s1test2, csn8, tn, 8);
- CSN csn9 = new CSN(time++, ts++, s2test.getServerId());
+ CSN csn9 = new CSN(time, ts++, s2test.getServerId());
publishDeleteMsgInOTest(s2test, csn9, tn, 9);
- sleep(500);
+ Thread.sleep(500);
ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN);
- rsd1.getLatestServerState();
- rsd1.getChangeTimeHeartbeatState();
debugInfo(tn, rsd1.getBaseDN()
+ " LatestServerState=" + rsd1.getLatestServerState()
+ " ChangeTimeHeartBeatState=" + rsd1.getChangeTimeHeartbeatState()
@@ -2222,8 +2212,6 @@
// FIXME:ECL Enable this test by adding an assert on the right value
ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN2);
- rsd2.getLatestServerState();
- rsd2.getChangeTimeHeartbeatState();
debugInfo(tn, rsd2.getBaseDN()
+ " LatestServerState=" + rsd2.getLatestServerState()
+ " ChangeTimeHeartBeatState=" + rsd2.getChangeTimeHeartbeatState()
@@ -2248,28 +2236,18 @@
String tn = "ECLCompatEmpty";
debugInfo(tn, "Starting test\n\n");
- // search on 'cn=changelog'
- String filter = "(objectclass=*)";
- debugInfo(tn, " Search: " + filter);
- InternalSearchOperation op = connection.processSearch(
- "cn=changelog",
- SearchScope.WHOLE_SUBTREE,
- filter);
-
- // success
+ final InternalSearchOperation op = connection.processSearch(
+ "cn=changelog", SearchScope.WHOLE_SUBTREE, "(objectclass=*)");
assertEquals(op.getResultCode(), ResultCode.SUCCESS, op.getErrorMessage().toString());
-
- // root entry returned
- assertEquals(op.getEntriesSent(), 1);
+ assertEquals(op.getEntriesSent(), 1, "The root entry should have been returned");
debugInfo(tn, "Ending test successfully");
}
- private int ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
+ private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
{
String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber;
debugInfo(tn, "Starting test\n\n");
- final int nbChanges = 4;
-
+ try
{
LDIFWriter ldifWriter = getLDIFWriter();
@@ -2280,8 +2258,7 @@
String user1entryUUID = "11111111-1112-1113-1114-111111111115";
String baseUUID = "22222222-2222-2222-2222-222222222222";
- CSN[] csns = generateCSNs(nbChanges, SERVER_ID_1);
- gblCSN = csns[1];
+ CSN[] csns = generateCSNs(4, SERVER_ID_1);
// Publish DEL
DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
@@ -2296,8 +2273,8 @@
+ "entryUUID: "+user1entryUUID+"\n";
Entry entry = TestCaseUtils.entryFromLdifString(lentry);
AddMsg addMsg = new AddMsg(
- gblCSN,
- DN.decode("uid="+tn+"2," + TEST_ROOT_DN_STRING),
+ csns[1],
+ entry.getDN(),
user1entryUUID,
baseUUID,
entry.getObjectClassAttribute(),
@@ -2324,14 +2301,14 @@
ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
server01.publish(modDNMsg);
debugInfo(tn, " publishes " + modDNMsg.getCSN());
- sleep(1000);
+ Thread.sleep(1000);
String filter = "(targetdn=*"+tn.toLowerCase()+"*,o=test)";
InternalSearchOperation searchOp = searchOnChangelog(filter, tn, SUCCESS);
// test 4 entries returned
assertEntries(searchOp.getSearchEntries(), firstChangeNumber, tn,
- ldifWriter, user1entryUUID, csns[0], gblCSN, csns[2], csns[3]);
+ ldifWriter, user1entryUUID, csns);
stop(server01);
@@ -2343,11 +2320,14 @@
searchOp = searchOnChangelog(filter, tn, SUCCESS);
assertEntries(searchOp.getSearchEntries(), firstChangeNumber, tn,
- ldifWriter, user1entryUUID, csns[0], gblCSN, csns[2], csns[3]);
- assertEquals(searchOp.getSearchEntries().size(), nbChanges);
+ ldifWriter, user1entryUUID, csns);
+ assertEquals(searchOp.getSearchEntries().size(), csns.length);
+ return csns[1];
}
- debugInfo(tn, "Ending test with success");
- return nbChanges;
+ finally
+ {
+ debugInfo(tn, "Ending test with success");
+ }
}
private void assertEntries(List<SearchResultEntry> entries,
@@ -2409,7 +2389,7 @@
assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
}
- private void ECLCompatReadFrom(long firstChangeNumber) throws Exception
+ private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception
{
String tn = "ECLCompatReadFrom/" + firstChangeNumber;
debugInfo(tn, "Starting test\n\n");
@@ -2432,10 +2412,10 @@
// check the entry has the right content
SearchResultEntry resultEntry = entries.get(0);
assertTrue("changenumber=6,cn=changelog".equalsIgnoreCase(resultEntry.getDN().toNormalizedString()));
- checkValue(resultEntry, "replicationcsn", gblCSN.toString());
+ checkValue(resultEntry, "replicationcsn", csn.toString());
checkValue(resultEntry, "replicaidentifier", String.valueOf(SERVER_ID_1));
checkValue(resultEntry, "changetype", "add");
- checkValue(resultEntry, "changelogcookie", "o=test:" + gblCSN + ";");
+ checkValue(resultEntry, "changelogcookie", "o=test:" + csn + ";");
checkValue(resultEntry, "targetentryuuid", user1entryUUID);
checkValue(resultEntry, "changenumber", "6");
@@ -2511,14 +2491,14 @@
/**
* Read the ECL in compat mode providing an unknown change number.
*/
- private void ECLFilterOnReplicationCsn() throws Exception
+ private void ECLFilterOnReplicationCSN(CSN csn) throws Exception
{
String tn = "ECLFilterOnReplicationCsn";
debugInfo(tn, "Starting test\n\n");
LDIFWriter ldifWriter = getLDIFWriter();
- String filter = "(replicationcsn=" + this.gblCSN + ")";
+ String filter = "(replicationcsn=" + csn + ")";
InternalSearchOperation searchOp = searchOnChangelog(filter, tn, SUCCESS);
assertEquals(searchOp.getSearchEntries().size(), 1);
@@ -2528,7 +2508,7 @@
// check the DEL entry has the right content
SearchResultEntry resultEntry = entries.get(0);
- checkValue(resultEntry, "replicationcsn", gblCSN.toString());
+ checkValue(resultEntry, "replicationcsn", csn.toString());
// TODO:ECL check values of the other attributes
debugInfo(tn, "Ending test with success");
@@ -2619,7 +2599,7 @@
while (!cnIndexDB.isEmpty())
{
debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
- sleep(200);
+ Thread.sleep(200);
}
debugInfo(tn, "Ending test with success");
@@ -2744,7 +2724,7 @@
csn1, user1entryUUID);
server01.publish(delMsg);
debugInfo(tn, " publishes " + delMsg.getCSN());
- sleep(500);
+ Thread.sleep(500);
stop(server01);
@@ -2779,7 +2759,7 @@
DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn1, user1entryUUID);
server01.publish(delMsg);
debugInfo(tn, " publishes " + delMsg.getCSN());
- sleep(300);
+ Thread.sleep(300);
// From begin to now : 1 change
assertEquals(rsdtest.getEligibleCount(fromStart, now()), 1);
@@ -2788,7 +2768,7 @@
delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csn2, user1entryUUID);
server01.publish(delMsg);
debugInfo(tn, " publishes " + delMsg.getCSN());
- sleep(300);
+ Thread.sleep(300);
// From begin to now : 2 changes
assertEquals(rsdtest.getEligibleCount(fromStart, now()), 2);
@@ -2815,7 +2795,7 @@
delMsg = newDeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, csn3, user1entryUUID);
server01.publish(delMsg);
debugInfo(tn, " publishes " + delMsg.getCSN());
- sleep(300);
+ Thread.sleep(300);
fromStateBeforeCSN2.update(csn2);
@@ -2838,7 +2818,7 @@
delMsg = newDeleteMsg("uid="+tn+i+"," + TEST_ROOT_DN_STRING, csnx, user1entryUUID);
server01.publish(delMsg);
}
- sleep(1000);
+ Thread.sleep(1000);
debugInfo(tn, "Perfs test in compat - search lastChangeNumber");
Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
@@ -2923,12 +2903,12 @@
domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1704, replServers);
domain21 = startNewDomain(domainConf, eclInclude, eclInclude);
- sleep(1000);
+ Thread.sleep(1000);
addEntry(createEntry(TEST_ROOT_DN2));
addEntry(createEntry(baseDN3));
- String lentry =
+ Entry uentry1 = TestCaseUtils.entryFromLdifString(
"dn: cn=Fiona Jensen," + TEST_ROOT_DN_STRING2 + "\n"
+ "objectclass: top\n"
+ "objectclass: person\n"
@@ -2937,12 +2917,10 @@
+ "cn: Fiona Jensen\n"
+ "sn: Jensen\n"
+ "uid: fiona\n"
- + "telephonenumber: 12121212";
-
- Entry uentry1 = TestCaseUtils.entryFromLdifString(lentry);
+ + "telephonenumber: 12121212");
addEntry(uentry1); // add fiona in o=test2
- lentry =
+ Entry uentry2 = TestCaseUtils.entryFromLdifString(
"dn: cn=Robert Hue," + baseDN3 + "\n"
+ "objectclass: top\n"
+ "objectclass: person\n"
@@ -2951,30 +2929,30 @@
+ "cn: Robert Hue\n"
+ "sn: Robby\n"
+ "uid: robert\n"
- + "telephonenumber: 131313";
- Entry uentry2 = TestCaseUtils.entryFromLdifString(lentry);
+ + "telephonenumber: 131313");
addEntry(uentry2); // add robert in o=test3
// mod 'sn' of fiona (o=test2) with 'sn' configured as ecl-incl-att
- runModifyOperation(uentry1, createMods("sn", "newsn"));
+ final ModifyOperation modOp1 = connection.processModify(
+ uentry1.getDN(), createMods("sn", "newsn"));
+ waitOpResult(modOp1, ResultCode.SUCCESS);
// mod 'telephonenumber' of robert (o=test3)
- runModifyOperation(uentry2, createMods("telephonenumber", "555555"));
+ final ModifyOperation modOp2 = connection.processModify(
+ uentry2.getDN(), createMods("telephonenumber", "555555"));
+ waitOpResult(modOp2, ResultCode.SUCCESS);
// moddn robert (o=test3) to robert2 (o=test3)
- ModifyDNOperation modDNOp = new ModifyDNOperationBasis(connection,
- InternalClientConnection.nextOperationID(),
- InternalClientConnection.nextMessageID(),
- null,
+ ModifyDNOperation modDNOp = connection.processModifyDN(
DN.decode("cn=Robert Hue," + baseDN3),
RDN.decode("cn=Robert Hue2"), true,
baseDN3);
- modDNOp.run();
waitOpResult(modDNOp, ResultCode.SUCCESS);
// del robert (o=test3)
- runDeleteOperation("cn=Robert Hue2," + baseDN3);
- sleep(1000);
+ final DeleteOperation delOp = connection.processDelete(DN.decode("cn=Robert Hue2," + baseDN3));
+ waitOpResult(delOp, ResultCode.SUCCESS);
+ Thread.sleep(1000);
// Search on ECL from start on all suffixes
String cookie = "";
@@ -2984,7 +2962,7 @@
assertThat(entries).hasSize(8);
debugAndWriteEntries(null, entries, tn);
- sleep(2000);
+ Thread.sleep(2000);
for (SearchResultEntry resultEntry : entries)
{
@@ -3026,9 +3004,13 @@
}
finally
{
- runDeleteOperation("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2);
- runDeleteOperation(TEST_ROOT_DN_STRING2);
- runDeleteOperation(baseDN3.toString());
+ final DeleteOperation delOp1 = connection.processDelete(
+ DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2));
+ waitOpResult(delOp1, ResultCode.SUCCESS);
+ final DeleteOperation delOp2 = connection.processDelete(TEST_ROOT_DN2);
+ waitOpResult(delOp2, ResultCode.SUCCESS);
+ final DeleteOperation delOp3 = connection.processDelete(baseDN3);
+ waitOpResult(delOp3, ResultCode.SUCCESS);
remove(domain21, domain2, domain3);
removeTestBackend(backend2, backend3);
@@ -3067,25 +3049,6 @@
return newDomain;
}
- private void runModifyOperation(Entry entry, List<Modification> mods)
- throws Exception
- {
- final ModifyOperation operation =
- new ModifyOperationBasis(connection, 1, 1, null, entry.getDN(), mods);
- operation.run();
- waitOpResult(operation, ResultCode.SUCCESS);
- }
-
- private void runDeleteOperation(String dn) throws Exception
- {
- final DeleteOperation delOp = new DeleteOperationBasis(connection,
- InternalClientConnection.nextOperationID(),
- InternalClientConnection.nextMessageID(), null,
- DN.decode(dn));
- delOp.run();
- waitOpResult(delOp, ResultCode.SUCCESS);
- }
-
private List<Modification> createMods(String attributeName, String valueString)
{
Attribute attr = Attributes.create(attributeName, valueString);
@@ -3113,7 +3076,7 @@
while (operation.getResultCode() == ResultCode.UNDEFINED
|| operation.getResultCode() != expectedResult)
{
- sleep(50);
+ Thread.sleep(50);
i++;
if (i > 10)
{
--
Gitblit v1.10.0