From 7b670ce888f308da0db49a828c56c390eb9860db Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 21 Jul 2014 12:53:41 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 39 +--
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java | 12
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 25 +
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java | 13
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java | 11
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java | 8
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 43 +--
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 16
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java | 418 ++++++++++++++---------------------------
9 files changed, 217 insertions(+), 368 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index a194d6d..65a95fc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
@@ -237,10 +238,13 @@
return previousValue;
}
- // we just created a new domain => update all cursors
- for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+ if (MultimasterReplication.isECLEnabledDomain(baseDN))
{
- cursor.addDomain(baseDN, null);
+ // we just created a new domain => update all cursors
+ for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+ {
+ cursor.addDomain(baseDN, null);
+ }
}
return newValue;
}
@@ -417,10 +421,6 @@
*/
public void clearDB() throws ChangelogException
{
- if (debugEnabled())
- {
- TRACER.debugInfo("clear the FileChangelogDB");
- }
if (!dbDirectory.exists())
{
return;
@@ -868,7 +868,7 @@
}
}
- for (final Map<Integer, FileReplicaDB> domainMap: domainToReplicaDBs.values())
+ for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values())
{
for (final FileReplicaDB replicaDB : domainMap.values())
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
index a8579e4..c17755f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -36,7 +36,6 @@
* <p>
* The cursor provides a java.sql.ResultSet like API :
* <pre>
- * {@code
* FileReplicaDBCursor cursor = ...;
* try {
* while (cursor.next()) {
@@ -116,11 +115,8 @@
lastNonNullCurrentCSN = nextRecord.getKey();
return true;
}
- else
- {
- nextRecord = null;
- return false;
- }
+ nextRecord = null;
+ return false;
}
/** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 68b9688..8a9390d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -425,13 +425,26 @@
{
try
{
- while (!domainsToClear.isEmpty())
+ if (!domainsToClear.isEmpty())
{
- final DN baseDNToClear = domainsToClear.first();
- nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
- // Only release the waiting thread
- // once this domain's state has been cleared.
- domainsToClear.remove(baseDNToClear);
+ final DN cursorData = nextChangeForInsertDBCursor.getData();
+ final boolean callNextOnCursor =
+ cursorData != null && domainsToClear.contains(cursorData);
+ while (!domainsToClear.isEmpty())
+ {
+ final DN baseDNToClear = domainsToClear.first();
+ nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
+ // Only release the waiting thread
+ // once this domain's state has been cleared.
+ domainsToClear.remove(baseDNToClear);
+ }
+
+ if (callNextOnCursor)
+ {
+ // The next change to consume comes from a domain to be removed.
+ // Call DBCursor.next() to ensure this domain is removed
+ nextChangeForInsertDBCursor.next();
+ }
}
// Do not call DBCursor.next() here
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 38afd7d..ae54930 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -129,21 +129,31 @@
private void removeNoLongerNeededCursors()
{
- for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> iterator =
- cursors.entrySet().iterator(); iterator.hasNext();)
+ for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
{
- final Entry<DBCursor<UpdateMsg>, Data> entry = iterator.next();
- final Data data = entry.getValue();
- if (isCursorNoLongerNeededFor(data))
+ final Data dataToFind = iter.next();
+ for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
+ cursors.entrySet().iterator(); cursorIter.hasNext();)
{
- entry.getKey().close();
- iterator.remove();
- cursorRemoved(data);
+ final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
+ if (dataToFind.equals(entry.getValue()))
+ {
+ entry.getKey().close();
+ cursorIter.remove();
+ }
}
+ iter.remove();
}
}
/**
+ * Returns an Iterator over the data associated to cursors that must be removed.
+ *
+ * @return an Iterator over the data associated to cursors that must be removed.
+ */
+ protected abstract Iterator<Data> removedCursorsIterator();
+
+ /**
* Adds a cursor to this composite cursor. It first calls
* {@link DBCursor#next()} to verify whether it is exhausted or not.
*
@@ -191,23 +201,6 @@
protected abstract void incorporateNewCursors() throws ChangelogException;
/**
- * Returns whether the cursor associated to the provided data should be removed.
- *
- * @param data the data associated to the cursor to be tested
- * @return true if the cursor associated to the provided data should be removed,
- * false otherwise
- */
- protected abstract boolean isCursorNoLongerNeededFor(Data data);
-
- /**
- * Notifies that the cursor associated to the provided data has been removed.
- *
- * @param data
- * the data associated to the removed cursor
- */
- protected abstract void cursorRemoved(Data data);
-
- /**
* Returns the data associated to the cursor that returned the current record.
*
* @return the data associated to the cursor that returned the current record.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
index 28b56a3..c249a78 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -24,6 +24,7 @@
*/
package org.opends.server.replication.server.changelog.je;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -110,16 +111,10 @@
/** {@inheritDoc} */
@Override
- protected boolean isCursorNoLongerNeededFor(Void data)
+ @SuppressWarnings("unchecked")
+ protected Iterator<Void> removedCursorsIterator()
{
- return false; // Not needed
- }
-
- /** {@inheritDoc} */
- @Override
- protected void cursorRemoved(Void data)
- {
- // Not used so far
+ return Collections.EMPTY_LIST.iterator(); // nothing to remove
}
/** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 4ebee02..ce8b030 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
@@ -186,7 +187,7 @@
}
}
- private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN)
+ private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN)
{
final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
if (domainMap != null)
@@ -196,29 +197,12 @@
return Collections.emptyMap();
}
- private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
+ private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId)
{
return getDomainMap(baseDN).get(serverId);
}
/**
- * Provision resources for the specified serverId in the specified replication
- * domain.
- *
- * @param baseDN
- * the replication domain where to add the serverId
- * @param serverId
- * the server Id to add to the replication domain
- * @throws ChangelogException
- * If a database error happened.
- */
- private void commission(DN baseDN, int serverId, ReplicationServer rs)
- throws ChangelogException
- {
- getOrCreateReplicaDB(baseDN, serverId, rs);
- }
-
- /**
* Returns a {@link JEReplicaDB}, possibly creating it.
*
* @param baseDN
@@ -279,10 +263,13 @@
return previousValue;
}
- // we just created a new domain => update all cursors
- for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+ if (MultimasterReplication.isECLEnabledDomain(baseDN))
{
- cursor.addDomain(baseDN, null);
+ // we just created a new domain => update all cursors
+ for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+ {
+ cursor.addDomain(baseDN, null);
+ }
}
return newValue;
}
@@ -360,7 +347,7 @@
{
for (int serverId : entry.getValue())
{
- commission(entry.getKey(), serverId, replicationServer);
+ getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
}
}
}
@@ -695,7 +682,7 @@
{
try
{
- cnIndexDB = new JEChangeNumberIndexDB(this.replicationEnv);
+ cnIndexDB = new JEChangeNumberIndexDB(replicationEnv);
}
catch (Exception e)
{
@@ -774,10 +761,10 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startAfterCSN)
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
throws ChangelogException
{
- JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+ final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
final DBCursor<UpdateMsg> cursor =
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
index 773ecb3..cf88679 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -107,16 +107,9 @@
/** {@inheritDoc} */
@Override
- protected boolean isCursorNoLongerNeededFor(DN baseDN)
+ protected Iterator<DN> removedCursorsIterator()
{
- return removeDomains.contains(baseDN);
- }
-
- /** {@inheritDoc} */
- @Override
- protected void cursorRemoved(DN baseDN)
- {
- removeDomains.remove(baseDN);
+ return removeDomains.iterator();
}
/** {@inheritDoc} */
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index cdaae64..63d9c92 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -89,6 +89,16 @@
public class ExternalChangeLogTest extends ReplicationTestCase
{
+ private static class Results
+ {
+
+ public final List<SearchResultEntryProtocolOp> searchResultEntries =
+ new ArrayList<SearchResultEntryProtocolOp>();
+ public long searchReferences;
+ public long searchesDone;
+
+ }
+
private static final int SERVER_ID_1 = 1201;
private static final int SERVER_ID_2 = 1202;
@@ -182,14 +192,15 @@
@Test(enabled=true, dependsOnMethods = { "PrimaryTest"})
public void TestWithAndWithoutControl() throws Exception
{
+ final String tn = "TestWithAndWithoutControl";
replicationServer.getChangelogDB().setPurgeDelay(0);
// Write changes and read ECL from start
- ECLCompatWriteReadAllOps(1);
+ ECLCompatWriteReadAllOps(1, tn);
ECLCompatNoControl(1);
// Write additional changes and read ECL from a provided change number
- ECLCompatWriteReadAllOps(5);
+ ECLCompatWriteReadAllOps(5, tn);
}
@Test(enabled=false, dependsOnMethods = { "PrimaryTest"})
@@ -287,12 +298,13 @@
@Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
public void ECLReplicationServerFullTest15() throws Exception
{
+ final String tn = "ECLReplicationServerFullTest15";
replicationServer.getChangelogDB().setPurgeDelay(0);
// Write 4 changes and read ECL from start
- ECLCompatWriteReadAllOps(1);
+ ECLCompatWriteReadAllOps(1, tn);
// Write 4 additional changes and read ECL from a provided change number
- CSN csn = ECLCompatWriteReadAllOps(5);
+ CSN csn = ECLCompatWriteReadAllOps(5, tn);
// Test request from a provided change number - read 6
ECLCompatReadFrom(6, csn);
@@ -889,15 +901,12 @@
final CSN[] csns = generateCSNs(3, SERVER_ID_1);
publishDeleteMsgInOTest(server01, csns[0], testName, 1);
-
- Thread.sleep(1000);
-
- // Test that last cookie has been updated
- String cookieNotEmpty = readLastCookie();
- debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
-
+ final String firstCookie = assertLastCookieDifferentThanLastValue("");
+ String lastCookie = firstCookie;
publishDeleteMsgInOTest(server01, csns[1], testName, 2);
+ lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
publishDeleteMsgInOTest(server01, csns[2], testName, 3);
+ lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
// ---
// 2. Now set up a very short purge delay on the replication changelogs
@@ -924,7 +933,7 @@
// returns the appropriate error.
debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
- searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
+ searchOp = searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
assertTrue(searchOp.getErrorMessage().toString().startsWith(
ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(TEST_ROOT_DN_STRING).toString()),
searchOp.getErrorMessage().toString());
@@ -956,26 +965,21 @@
final CSN[] csns = generateCSNs(3, SERVER_ID_1);
publishDeleteMsgInOTest(server01, csns[0], testName, 1);
-
- Thread.sleep(1000);
-
- // Test that last cookie has been updated
- String cookieNotEmpty = readLastCookie();
- debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
-
+ final String firstCookie = assertLastCookieDifferentThanLastValue("");
+ String lastCookie = firstCookie;
publishDeleteMsgInOTest(server01, csns[1], testName, 2);
+ lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
publishDeleteMsgInOTest(server01, csns[2], testName, 3);
+ lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
// ---
// 2. Now remove the domain by sending a reset message
- ResetGenerationIdMsg msg = new ResetGenerationIdMsg(23657);
- server01.publish(msg);
+ server01.publish(new ResetGenerationIdMsg(23657));
// ---
// 3. Assert that a request with an empty cookie returns nothing
// since replication changelog has been cleared
String cookie= "";
- InternalSearchOperation searchOp = null;
searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
// ---
@@ -983,7 +987,7 @@
// since replication changelog has been cleared
cookie = readLastCookie();
debugInfo(testName, "2. Search with last cookie=" + cookie + "\"");
- searchOp = searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
+ searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
// ---
// 5. Assert that a request with an "old" cookie - one that refers to
@@ -991,7 +995,8 @@
// returns the appropriate error.
debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
- searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
+ final InternalSearchOperation searchOp =
+ searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
assertThat(searchOp.getErrorMessage().toString()).contains("unknown replicated domain", TEST_ROOT_DN_STRING.toString());
}
finally
@@ -1001,6 +1006,23 @@
debugInfo(testName, "Ending test successfully");
}
+ private String assertLastCookieDifferentThanLastValue(final String lastCookie) throws Exception
+ {
+ int cnt = 0;
+ while (cnt < 100)
+ {
+ final String newCookie = readLastCookie();
+ if (!newCookie.equals(lastCookie))
+ {
+ return newCookie;
+ }
+ cnt++;
+ Thread.sleep(10);
+ }
+ Assertions.fail("Expected last cookie would have been updated, but it always stayed at value '" + lastCookie + "'");
+ return null;// dead code
+ }
+
private void debugAndWriteEntries(LDIFWriter ldifWriter,
List<SearchResultEntry> entries, String tn) throws Exception
{
@@ -1068,10 +1090,11 @@
// Publish ADD
csnCounter++;
- String lentry = "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
- + "objectClass: top\n" + "objectClass: domain\n"
- + "entryUUID: "+user1entryUUID+"\n";
- Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+ Entry entry = TestCaseUtils.entryFromLdifString(
+ "dn: uid=" + tn + "2," + TEST_ROOT_DN_STRING + "\n"
+ + "objectClass: top\n"
+ + "objectClass: domain\n"
+ + "entryUUID: " + user1entryUUID + "\n");
AddMsg addMsg = new AddMsg(
csns[csnCounter],
DN.decode("uid="+tn+"2," + TEST_ROOT_DN_STRING),
@@ -1407,49 +1430,27 @@
InvocationCounterPlugin.resetAllCounters();
- long searchEntries;
- long searchReferences = ldapStatistics.getSearchResultReferences();
- long searchesDone = ldapStatistics.getSearchResultsDone();
+ final Results results = new Results();
+ results.searchReferences = ldapStatistics.getSearchResultReferences();
+ results.searchesDone = ldapStatistics.getSearchResultsDone();
debugInfo(tn, "Search Persistent filter=(targetDN=*"+tn+"*,o=test)");
- LDAPMessage message = new LDAPMessage(2, searchRequest, controls);
- w.writeMessage(message);
+ w.writeMessage(new LDAPMessage(2, searchRequest, controls));
Thread.sleep(500);
if (!changesOnly)
{
// Wait for change 1
debugInfo(tn, "Waiting for init search expected to return change 1");
- searchEntries = 0;
+ readMessages(tn, r, results, 1, "Init search Result=");
+ for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
{
- while (searchEntries < 1 && (message = r.readMessage()) != null)
- {
- debugInfo(tn, "Init search Result=" +
- message.getProtocolOpType() + message + " " + searchEntries);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- SearchResultEntryProtocolOp searchResultEntry =
- message.getSearchResultEntryProtocolOp();
- searchEntries++;
- // FIXME:ECL Double check 1 is really the valid value here.
- checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
- (compatMode?"1":"0"));
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- searchReferences++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
- searchesDone++;
- break;
- }
- }
+ // FIXME:ECL Double check 1 is really the valid value here.
+ final String cn = compatMode ? "1" : "0";
+ checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
}
debugInfo(tn, "INIT search done with success. searchEntries="
- + searchEntries + " #searchesDone="+ searchesDone);
+ + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
}
// Produces change 2
@@ -1465,30 +1466,8 @@
" published , psearch will now wait for new entries");
// wait for the 1 new entry
- searchEntries = 0;
- SearchResultEntryProtocolOp searchResultEntry = null;
- while (searchEntries < 1 && (message = r.readMessage()) != null)
- {
- debugInfo(tn, "psearch search Result=" +
- message.getProtocolOpType() + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- searchResultEntry = message.getSearchResultEntryProtocolOp();
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- searchReferences++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
-// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
- searchesDone++;
- break;
- }
- }
+ readMessages(tn, r, results, 1, "psearch search Result=");
+ SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(0);
Thread.sleep(1000);
// Check we received change 2
@@ -1518,11 +1497,12 @@
createSearchRequest("(targetDN=*directpsearch*,o=test)", null);
debugInfo(tn, "ACI test : sending search");
- message = new LDAPMessage(2, searchRequest, createCookieControl(""));
- w.writeMessage(message);
+ w.writeMessage(new LDAPMessage(2, searchRequest, createCookieControl("")));
- searchesDone=0;
- searchEntries = 0;
+ LDAPMessage message;
+ int searchesDone = 0;
+ int searchEntries = 0;
+ int searchReferences = 0;
while ((searchesDone==0) && (message = r.readMessage()) != null)
{
debugInfo(tn, "ACI test : message returned " +
@@ -1714,125 +1694,53 @@
InvocationCounterPlugin.resetAllCounters();
- ldapStatistics.getSearchRequests();
- long searchEntries = ldapStatistics.getSearchResultEntries();
- ldapStatistics.getSearchResultReferences();
- long searchesDone = ldapStatistics.getSearchResultsDone();
+ final Results results = new Results();
+ results.searchesDone = ldapStatistics.getSearchResultsDone();
- LDAPMessage message;
- message = new LDAPMessage(2, searchRequest1, controls);
- w1.writeMessage(message);
+ w1.writeMessage(new LDAPMessage(2, searchRequest1, controls));
Thread.sleep(500);
-
- message = new LDAPMessage(2, searchRequest2, controls);
- w2.writeMessage(message);
+ w2.writeMessage(new LDAPMessage(2, searchRequest2, controls));
Thread.sleep(500);
-
- message = new LDAPMessage(2, searchRequest3, controls);
- w3.writeMessage(message);
+ w3.writeMessage(new LDAPMessage(2, searchRequest3, controls));
Thread.sleep(500);
if (!changesOnly)
{
debugInfo(tn, "Search1 Persistent filter=" + searchRequest1.getFilter()
+ " expected to return change " + csn1);
- searchEntries = 0;
- message = null;
-
{
- while (searchEntries < 1 && (message = r1.readMessage()) != null)
+ readMessages(tn, r1, results, 1, "Search1 Result=");
+ final int searchEntries = results.searchResultEntries.size();
+ if (searchEntries == 1)
{
- debugInfo(tn, "Search1 Result=" +
- message.getProtocolOpType() + " " + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- SearchResultEntryProtocolOp searchResultEntry =
- message.getSearchResultEntryProtocolOp();
- searchEntries++;
- if (searchEntries==1)
- {
- checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
- checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
- (compatMode?"10":"0"));
- }
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
- searchesDone++;
- break;
- }
+ final SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(1);
+ final String cn = compatMode ? "10" : "0";
+ checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
+ checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
}
+ debugInfo(tn, "Search1 done with success. searchEntries="
+ + searchEntries + " #searchesDone=" + results.searchesDone);
}
- debugInfo(tn, "Search1 done with success. searchEntries="
- + searchEntries + " #searchesDone="+ searchesDone);
- searchEntries = 0;
- message = null;
{
debugInfo(tn, "Search 2 Persistent filter=" + searchRequest2.getFilter()
+ " expected to return change " + csn2 + " & " + csn3);
- while (searchEntries < 2 && (message = r2.readMessage()) != null)
+ readMessages(tn, r2, results, 2, "Search 2 Result=");
+ for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
{
- debugInfo(tn, "Search 2 Result=" +
- message.getProtocolOpType() + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- SearchResultEntryProtocolOp searchResultEntry =
- message.getSearchResultEntryProtocolOp();
- searchEntries++;
- checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
- (compatMode?"10":"0"));
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
- searchesDone++;
- break;
- }
+ final String cn = compatMode ? "10" : "0";
+ checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
}
+ debugInfo(tn, "Search2 done with success. searchEntries="
+ + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
}
- debugInfo(tn, "Search2 done with success. searchEntries="
- + searchEntries + " #searchesDone="+ searchesDone);
- searchEntries = 0;
- message = null;
- {
- debugInfo(tn, "Search3 Persistent filter=" + searchRequest3.getFilter()
- + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
- while (searchEntries < 4 && (message = r3.readMessage()) != null)
- {
- debugInfo(tn, "Search3 Result=" +
- message.getProtocolOpType() + " " + message);
-
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
- searchesDone++;
- break;
- }
- }
- }
+ debugInfo(tn, "Search3 Persistent filter=" + searchRequest3.getFilter()
+ + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
+ readMessages(tn, r3, results, 4, "Search3 Result=");
debugInfo(tn, "Search3 done with success. searchEntries="
- + searchEntries + " #searchesDone="+ searchesDone);
-
+ + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
}
// Produces additional change
@@ -1866,82 +1774,19 @@
debugInfo(tn, delMsg13.getCSN() + " published additionally ");
// wait 11
- searchEntries = 0;
- message = null;
- while (searchEntries < 1 && (message = r1.readMessage()) != null)
- {
- debugInfo(tn, "Search 11 Result=" +
- message.getProtocolOpType() + " " + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
-// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
- searchesDone++;
- break;
- }
- }
+ readMessages(tn, r1, results, 1, "Search 11 Result=");
Thread.sleep(1000);
debugInfo(tn, "Search 1 successfully receives additional changes");
// wait 12 & 13
- searchEntries = 0;
- message = null;
- while (searchEntries < 2 && (message = r2.readMessage()) != null)
- {
- debugInfo(tn, "psearch search 12 Result=" +
- message.getProtocolOpType() + " " + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
-// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
- searchesDone++;
- break;
- }
- }
+ readMessages(tn, r2, results, 2, "psearch search 12 Result=");
Thread.sleep(1000);
debugInfo(tn, "Search 2 successfully receives additional changes");
// wait 11 & 12 & 13
- searchEntries = 0;
- SearchResultEntryProtocolOp searchResultEntry = null;
- message = null;
- while (searchEntries < 3 && (message = r3.readMessage()) != null)
- {
- debugInfo(tn, "psearch search 13 Result=" +
- message.getProtocolOpType() + " " + message);
- switch (message.getProtocolOpType())
- {
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
- searchResultEntry = message.getSearchResultEntryProtocolOp();
- searchEntries++;
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
- break;
-
- case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
- assertSuccessful(message);
-// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
- searchesDone++;
- break;
- }
- }
+ readMessages(tn, r3, results, 3, "psearch search 13 Result=");
+ SearchResultEntryProtocolOp searchResultEntry =
+ results.searchResultEntries.get(results.searchResultEntries.size() - 1);
Thread.sleep(1000);
// Check we received change 13
@@ -1956,6 +1801,35 @@
debugInfo(tn, "Ends test successfully");
}
+ private void readMessages(String tn, org.opends.server.tools.LDAPReader r,
+ final Results results, final int i, final String string) throws Exception
+ {
+ results.searchResultEntries.clear();
+
+ LDAPMessage message;
+ while (results.searchResultEntries.size() < i
+ && (message = r.readMessage()) != null)
+ {
+ debugInfo(tn, string + message.getProtocolOpType() + " " + message);
+
+ switch (message.getProtocolOpType())
+ {
+ case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
+ results.searchResultEntries.add(message.getSearchResultEntryProtocolOp());
+ break;
+
+ case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
+ results.searchReferences++;
+ break;
+
+ case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
+ assertSuccessful(message);
+ results.searchesDone++;
+ break;
+ }
+ }
+ }
+
private void assertSuccessful(LDAPMessage message)
{
SearchResultDoneProtocolOp doneOp = message.getSearchResultDoneProtocolOp();
@@ -2002,10 +1876,9 @@
new BindRequestProtocolOp(
ByteString.valueOf(bindDN),
3, ByteString.valueOf(password));
- LDAPMessage message = new LDAPMessage(1, bindRequest);
- w.writeMessage(message);
+ w.writeMessage(new LDAPMessage(1, bindRequest));
- message = r.readMessage();
+ final LDAPMessage message = r.readMessage();
BindResponseProtocolOp bindResponse = message.getBindResponseProtocolOp();
// assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
assertEquals(bindResponse.getResultCode(), expected);
@@ -2199,9 +2072,9 @@
debugInfo(tn, "Ending test successfully");
}
- private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
+ private CSN ECLCompatWriteReadAllOps(long firstChangeNumber, String testName) throws Exception
{
- String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber;
+ String tn = testName + "-ECLCompatWriteReadAllOps/" + firstChangeNumber;
debugInfo(tn, "Starting test\n\n");
LDAPReplicationDomain domain = null;
try
@@ -2219,17 +2092,16 @@
CSN[] csns = generateCSNs(4, SERVER_ID_1);
// Publish DEL
- DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
+ DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "-1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
server01.publish(delMsg);
debugInfo(tn, " publishes " + delMsg.getCSN());
// Publish ADD
- String lentry =
- "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
+ Entry entry = TestCaseUtils.entryFromLdifString(
+ "dn: uid=" + tn + "-2," + TEST_ROOT_DN_STRING + "\n"
+ "objectClass: top\n"
+ "objectClass: domain\n"
- + "entryUUID: "+user1entryUUID+"\n";
- Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+ + "entryUUID: " + user1entryUUID + "\n");
AddMsg addMsg = new AddMsg(
csns[1],
entry.getDN(),
@@ -2242,7 +2114,7 @@
debugInfo(tn, " publishes " + addMsg.getCSN());
// Publish MOD
- DN baseDN = DN.decode("uid="+tn+"3," + TEST_ROOT_DN_STRING);
+ DN baseDN = DN.decode("uid="+tn+"-3," + TEST_ROOT_DN_STRING);
List<Modification> mods = createMods("description", "new value");
ModifyMsg modMsg = new ModifyMsg(csns[2], baseDN, mods, user1entryUUID);
server01.publish(modMsg);
@@ -2250,7 +2122,7 @@
// Publish modDN
ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null,
- DN.decode("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN
+ DN.decode("uid="+tn+"-4," + TEST_ROOT_DN_STRING), // entryDN
RDN.decode("uid="+tn+"new4"), // new rdn
true, // deleteoldrdn
TEST_ROOT_DN2); // new superior
@@ -2260,8 +2132,8 @@
server01.publish(modDNMsg);
debugInfo(tn, " publishes " + modDNMsg.getCSN());
- String filter = "(targetdn=*" + tn + "*,o=test)";
- InternalSearchOperation searchOp = searchOnChangelog(filter, 4, tn, SUCCESS);
+ InternalSearchOperation searchOp =
+ searchOnChangelog("(targetdn=*" + tn + "*,o=test)", 4, tn, SUCCESS);
// test 4 entries returned
final LDIFWriter ldifWriter = getLDIFWriter();
@@ -2271,7 +2143,7 @@
stop(server01);
// Test with filter on change number
- filter =
+ String filter =
"(&(targetdn=*" + tn + "*,o=test)"
+ "(&(changenumber>=" + firstChangeNumber + ")"
+ "(changenumber<=" + (firstChangeNumber + 3) + ")))";
@@ -2334,7 +2206,7 @@
long firstChangeNumber, int i, String tn, CSN csn)
{
final long changeNumber = firstChangeNumber + i;
- final String targetDN = "uid=" + tn + (i + 1) + "," + TEST_ROOT_DN_STRING;
+ final String targetDN = "uid=" + tn + "-" + (i + 1) + "," + TEST_ROOT_DN_STRING;
assertDNEquals(resultEntry, changeNumber);
checkValue(resultEntry, "changenumber", String.valueOf(changeNumber));
@@ -2347,9 +2219,11 @@
private void assertDNEquals(SearchResultEntry resultEntry, long changeNumber)
{
- String actualDN = resultEntry.getDN().toNormalizedString();
- String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
- assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
+ final String actualDN = resultEntry.getDN().toNormalizedString();
+ final String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
+ assertThat(actualDN)
+ .as("Unexpected DN for entry " + resultEntry)
+ .isEqualToIgnoringCase(expectedDN);
}
private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception
@@ -2543,7 +2417,7 @@
while (!cnIndexDB.isEmpty())
{
debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
- Thread.sleep(200);
+ Thread.sleep(10);
}
debugInfo(tn, "Ending test with success");
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index 7c76a7a..da8bcd0 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,6 +25,9 @@
*/
package org.opends.server.replication.server.changelog.je;
+import java.util.Collections;
+import java.util.Iterator;
+
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -50,14 +53,9 @@
}
@Override
- protected boolean isCursorNoLongerNeededFor(String data)
+ protected Iterator<String> removedCursorsIterator()
{
- return false;
- }
-
- @Override
- protected void cursorRemoved(String data)
- {
+ return Collections.EMPTY_LIST.iterator();
}
}
--
Gitblit v1.10.0