From 4af6c0362e528ebab371325232a557a5338a31d8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 18 Sep 2014 11:32:20 +0000
Subject: [PATCH] OPENDJ-1548 (CR-4579) In changelog backend, persistent searches should perform the same validations as normal searches
---
opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java | 47 +++++++--------
opends/src/server/org/opends/server/backends/ChangelogBackend.java | 71 +++++++++++++++--------
opends/src/server/org/opends/server/api/Backend.java | 4 +
3 files changed, 71 insertions(+), 51 deletions(-)
diff --git a/opends/src/server/org/opends/server/api/Backend.java b/opends/src/server/org/opends/server/api/Backend.java
index b35c1cd..700290a 100644
--- a/opends/src/server/org/opends/server/api/Backend.java
+++ b/opends/src/server/org/opends/server/api/Backend.java
@@ -891,8 +891,10 @@
*
* @param persistentSearch
* The persistent search operation to register with this backend
+ * @throws DirectoryException
+ * If a problem occurs while registering the persistent search
*/
- public void registerPersistentSearch(PersistentSearch persistentSearch)
+ public void registerPersistentSearch(PersistentSearch persistentSearch) throws DirectoryException
{
persistentSearches.add(persistentSearch);
diff --git a/opends/src/server/org/opends/server/backends/ChangelogBackend.java b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
index e4165d0..5fbd645 100644
--- a/opends/src/server/org/opends/server/backends/ChangelogBackend.java
+++ b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -185,7 +185,6 @@
private static final String CHANGE_NUMBER_ATTR = "changeNumber";
private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
- private static final String COOKIE_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".cookie";
private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
/** The set of objectclasses that will be used in root entry. */
@@ -985,18 +984,16 @@
{
validateProvidedCookie(searchParams);
- final MultiDomainServerState cookie = searchParams.cookie;
final CookieEntrySender entrySender;
if (isPersistentSearch(searchOperation))
{
- // communicate the cookie between the "initial search" phase and the "persistent search" phase
- searchOperation.setAttachment(COOKIE_ATTACHMENT, cookie);
entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
}
else
{
entrySender = new CookieEntrySender(searchOperation, SearchPhase.INITIAL);
}
+ entrySender.setCookie(searchParams.cookie);
if (!sendBaseChangelogEntry(searchOperation))
{ // only return the base entry: stop here
@@ -1008,14 +1005,14 @@
{
final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
- cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
+ searchParams.cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
- boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
+ final boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
if (continueSearch)
{
entrySender.transitioningToPersistentSearchPhase();
- sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
+ sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
}
}
finally
@@ -1026,15 +1023,14 @@
}
private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
- ECLMultiDomainDBCursor replicaUpdatesCursor, final MultiDomainServerState cookie) throws ChangelogException,
- DirectoryException
+ final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException
{
boolean continueSearch = true;
while (continueSearch && replicaUpdatesCursor.next())
{
final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
final DN domainBaseDN = replicaUpdatesCursor.getData();
- continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN, cookie);
+ continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN);
}
return continueSearch;
}
@@ -1053,9 +1049,10 @@
/** {@inheritDoc} */
@Override
- public void registerPersistentSearch(PersistentSearch pSearch)
+ public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException
{
- initializeAttachements(pSearch);
+ validatePersistentSearch(pSearch);
+ initializeEntrySender(pSearch);
if (isCookieBased(pSearch.getSearchOperation()))
{
@@ -1068,22 +1065,41 @@
super.registerPersistentSearch(pSearch);
}
- private void initializeAttachements(PersistentSearch pSearch)
+ private void validatePersistentSearch(final PersistentSearch pSearch) throws DirectoryException
{
+ // Validation must be done during registration for changes only persistent searches.
+ // Otherwise, when there is an initial search phase,
+ // validation is performed by the search() method.
+ if (pSearch.isChangesOnly())
+ {
+ final SearchOperation searchOperation = pSearch.getSearchOperation();
+ checkChangelogReadPrivilege(searchOperation);
+ final SearchParams params = buildSearchParameters(searchOperation);
+ // next line also validates some search parameters
+ optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter());
+ validateProvidedCookie(params);
+ }
+ }
+
+ private void initializeEntrySender(PersistentSearch pSearch)
+ {
+ final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL;
+
final SearchOperation searchOp = pSearch.getSearchOperation();
if (isCookieBased(searchOp))
{
+ final CookieEntrySender entrySender = new CookieEntrySender(searchOp, startPhase);
+ searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, entrySender);
if (pSearch.isChangesOnly())
{
// this changesOnly persistent search will not go through #initialSearch()
// so we must initialize the cookie here
- searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp));
+ entrySender.setCookie(getNewestCookie(searchOp));
}
- searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp, SearchPhase.PERSISTENT));
}
else
{
- searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, SearchPhase.PERSISTENT));
+ searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, startPhase));
}
}
@@ -1762,7 +1778,7 @@
sendEntryData.finalizeInitialSearch();
}
- public void transitioningToPersistentSearchPhase()
+ private void transitioningToPersistentSearchPhase()
{
sendEntryData.transitioningToPersistentSearchPhase();
}
@@ -1792,6 +1808,7 @@
private static class CookieEntrySender {
private final SearchOperation searchOp;
private final SearchPhase startPhase;
+ private MultiDomainServerState cookie;
private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData =
new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR);
@@ -1801,7 +1818,12 @@
this.startPhase = startPhase;
}
- public void finalizeInitialSearch()
+ private void setCookie(MultiDomainServerState cookie)
+ {
+ this.cookie = cookie;
+ }
+
+ private void finalizeInitialSearch()
{
for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
{
@@ -1809,7 +1831,7 @@
}
}
- public void transitioningToPersistentSearchPhase()
+ private void transitioningToPersistentSearchPhase()
{
for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
{
@@ -1830,13 +1852,12 @@
return data;
}
- private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN,
- final MultiDomainServerState cookie) throws DirectoryException
+ private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException
{
final CSN csn = updateMsg.getCSN();
final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
sendEntryData.initialSearchSendsEntry(csn);
- final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
+ final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
return sendEntryIfMatches(searchOp, entry, cookieString);
}
@@ -1849,9 +1870,7 @@
if (sendEntryData.persistentSearchCanSendEntry(csn))
{
// multi threaded case: wait for the "initial search" phase to set the cookie
- final MultiDomainServerState cookie = searchOp.getAttachment(COOKIE_ATTACHMENT);
-
- final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
+ final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
// FIXME JNR use this instead of previous line:
// entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
@@ -1859,7 +1878,7 @@
}
}
- private String updateCookie(final MultiDomainServerState cookie, DN baseDN, final CSN csn)
+ private String updateCookie(DN baseDN, final CSN csn)
{
synchronized (cookie)
{ // forbid concurrent updates to the cookie
diff --git a/opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java b/opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java
index 8d57883..c6f79b7 100644
--- a/opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java
+++ b/opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java
@@ -225,34 +225,33 @@
// will be overwritten.
setResultCode(ResultCode.SUCCESS);
-
- // If there's a persistent search, then register it with the server.
- boolean processSearchNow = true;
- if (persistentSearch != null)
- {
- // If we're only interested in changes, then we do not actually want
- // to process the search now.
- processSearchNow = !persistentSearch.isChangesOnly();
-
- // The Core server maintains the count of concurrent persistent searches
- // so that all the backends (Remote and Local) are aware of it. Verify
- // with the core if we have already reached the threshold.
- if (!DirectoryServer.allowNewPersistentSearch())
- {
- setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED);
- appendErrorMessage(ERR_MAX_PSEARCH_LIMIT_EXCEEDED.get());
- return;
- }
- backend.registerPersistentSearch(persistentSearch);
- persistentSearch.enable();
- }
-
-
- // Process the search in the backend and all its subordinates.
try
{
+ // If there's a persistent search, then register it with the server.
+ boolean processSearchNow = true;
+ if (persistentSearch != null)
+ {
+ // If we're only interested in changes, then we do not actually want
+ // to process the search now.
+ processSearchNow = !persistentSearch.isChangesOnly();
+
+ // The Core server maintains the count of concurrent persistent searches
+ // so that all the backends (Remote and Local) are aware of it. Verify
+ // with the core if we have already reached the threshold.
+ if (!DirectoryServer.allowNewPersistentSearch())
+ {
+ setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED);
+ appendErrorMessage(ERR_MAX_PSEARCH_LIMIT_EXCEEDED.get());
+ return;
+ }
+ backend.registerPersistentSearch(persistentSearch);
+ persistentSearch.enable();
+ }
+
+
if (processSearchNow)
{
+ // Process the search in the backend and all its subordinates.
backend.search(this);
}
}
--
Gitblit v1.10.0