From 5d4dbee77b59c2636e832de6c6a04a8ce65ca5c4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 15 Sep 2014 09:35:18 +0000
Subject: [PATCH] OPENDJ-1541 (CR-4516) Persistent search on cn=changelog can return duplicates
---
opends/src/server/org/opends/server/backends/ChangelogBackend.java | 536 +++++++++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 407 insertions(+), 129 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/ChangelogBackend.java b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
index be5fe9b..8445800 100644
--- a/opends/src/server/org/opends/server/backends/ChangelogBackend.java
+++ b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -26,8 +26,6 @@
package org.opends.server.backends;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -38,6 +36,9 @@
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -110,6 +111,8 @@
import org.opends.server.types.WritabilityMode;
import org.opends.server.util.StaticUtils;
+import com.forgerock.opendj.util.Pair;
+
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.config.ConfigConstants.*;
@@ -132,11 +135,11 @@
* request. The cookie provided in the control is used to retrieve entries from
* the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
* the entries.</li>
- * <li>Draft compatibility mode: when no "ECL Cookie Exchange Control" is provided
- * with the request. The entries are retrieved using the ChangeNumberIndexDB (or
- * DraftDB, hence the name) and their attributes are set with the information
- * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set
- * from the content of ChangeNumberIndexDB.</li>
+ * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided
+ * with the request. The entries are retrieved using the ChangeNumberIndexDB and
+ * their attributes are set with the information from the ReplicasDBs. The
+ * <code>changeNumber</code> attribute value is set from the content of
+ * ChangeNumberIndexDB.</li>
* </ul>
* <h3>Searches flow</h3>
* <p>
@@ -153,9 +156,8 @@
* (once, single threaded),</li>
* <li>
* {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
- * <li>
- * {@link ChangelogBackend#notifyEntryAdded(DN, long, String, UpdateMsg)}
- * (multiple times, multi threaded)</li>
+ * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
+ * threaded)</li>
* </ol>
* </li>
* <li>Persistent searches with <code>changesOnly=true</code> go through:
@@ -163,8 +165,8 @@
* <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
* (once, single threaded)</li>
* <li>
- * {@link ChangelogBackend#notifyEntryAdded(DN, long, String, UpdateMsg)}
- * (multiple times, multi threaded)</li>
+ * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
+ * threaded)</li>
* </ol>
* </li>
* </ul>
@@ -182,6 +184,8 @@
private static final String CHANGE_NUMBER_ATTR = "changeNumber";
private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
+ private static final String COOKIE_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".cookie";
+ private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
/** The set of objectclasses that will be used in root entry. */
private static final Map<ObjectClass, String>
@@ -237,6 +241,16 @@
private final ReplicationServer replicationServer;
private final ECLEnabledDomainPredicate domainPredicate;
+ /** The set of cookie-based persistent searches registered with this backend. */
+ private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches =
+ new ConcurrentLinkedQueue<PersistentSearch>();
+ /**
+ * The set of change number-based persistent searches registered with this
+ * backend.
+ */
+ private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches =
+ new ConcurrentLinkedQueue<PersistentSearch>();
+
/**
* Creates a new backend with the provided replication server.
*
@@ -406,10 +420,49 @@
}
/**
- * Notifies persistent searches of this backend that a new entry was added to it.
+ * Notifies persistent searches of this backend that a new cookie entry was added to it.
* <p>
* Note: This method correspond to the "persistent search" phase.
* It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
+ * <p>
+ * This method must only be called after the provided data have been persisted to disk.
+ *
+ * @param baseDN
+ * the baseDN of the newly added entry.
+ * @param updateMsg
+ * the update message of the newly added entry
+ * @throws ChangelogException
+ * If a problem occurs while notifying of the newly added entry.
+ */
+ public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException
+ {
+ if (!(updateMsg instanceof LDAPUpdateMsg))
+ {
+ return;
+ }
+
+ try
+ {
+ for (PersistentSearch pSearch : cookieBasedPersistentSearches)
+ {
+ final SearchOperation searchOp = pSearch.getSearchOperation();
+ final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
+ entrySender.persistentSearchSendEntry(baseDN, updateMsg);
+ }
+ }
+ catch (DirectoryException e)
+ {
+ throw new ChangelogException(e.getMessageObject(), e);
+ }
+ }
+
+ /**
+ * Notifies persistent searches of this backend that a new change number entry was added to it.
+ * <p>
+ * Note: This method correspond to the "persistent search" phase.
+ * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
+ * <p>
+ * This method must only be called after the provided data have been persisted to disk.
*
* @param baseDN
* the baseDN of the newly added entry.
@@ -425,44 +478,23 @@
* @throws ChangelogException
* If a problem occurs while notifying of the newly added entry.
*/
- public void notifyEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
+ public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
throws ChangelogException
{
- final boolean isCookieEntry = changeNumber <= 0;
- final List<SearchOperation> pSearchOps = getPersistentSearches(isCookieEntry);
- if (pSearchOps.isEmpty() || !(updateMsg instanceof LDAPUpdateMsg))
+ if (!(updateMsg instanceof LDAPUpdateMsg))
{
return;
}
try
{
- final Entry entry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
- for (SearchOperation pSearchOp : pSearchOps)
+ // changeNumber entry can be shared with multiple persistent searches
+ final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
+ for (PersistentSearch pSearch : changeNumberBasedPersistentSearches)
{
- final MultiDomainServerState cookie = (MultiDomainServerState)
- pSearchOp.getAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL);
-
- // when returning changesOnly, the first incoming update must return
- // the base entry before any other changes,
- // so force sending now, when protected by the synchronized block
- if (isCookieEntry)
- { // cookie based search
- final String cookieStr;
- synchronized (cookie)
- { // forbid concurrent updates to the cookie
- cookie.update(baseDN, updateMsg.getCSN());
- cookieStr = cookie.toString();
- }
- final Entry entry2 = createEntryFromMsg(baseDN, changeNumber, cookieStr, updateMsg);
- // FIXME JNR use this instead of previous line:
- // entry.replaceAttribute(Attributes.create("changelogcookie", cookieStr));
- sendEntryIfMatches(pSearchOp, entry2, cookieStr);
- }
- else
- { // draft changeNumber search
- sendEntryIfMatches(pSearchOp, entry, null);
- }
+ final SearchOperation searchOp = pSearch.getSearchOperation();
+ final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
+ entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry);
}
}
catch (DirectoryException e)
@@ -471,20 +503,6 @@
}
}
- private List<SearchOperation> getPersistentSearches(boolean wantCookieBasedSearch)
- {
- final List<SearchOperation> results = new ArrayList<SearchOperation>();
- for (PersistentSearch pSearch : getPersistentSearches())
- {
- final SearchOperation op = pSearch.getSearchOperation();
- if (wantCookieBasedSearch == isCookieBased(op))
- {
- results.add(op);
- }
- }
- return results;
- }
-
private boolean isCookieBased(final SearchOperation searchOp)
{
for (Control c : searchOp.getRequestControls())
@@ -966,17 +984,22 @@
{
validateProvidedCookie(searchParams);
- // send the base changelog entry immediately even for changesOnly=false persistent searches
- if (!sendBaseChangelogEntry(searchOperation))
- {
- // only return the base entry: stop here
- return;
- }
-
+ final MultiDomainServerState cookie = searchParams.cookie;
+ final CookieEntrySender entrySender;
if (isPersistentSearch(searchOperation))
{
// communicate the cookie between the "initial search" phase and the "persistent search" phase
- searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, searchParams.cookie);
+ searchOperation.setAttachment(COOKIE_ATTACHMENT, cookie);
+ entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
+ }
+ else
+ {
+ entrySender = new CookieEntrySender(searchOperation);
+ }
+
+ if (!sendBaseChangelogEntry(searchOperation))
+ { // only return the base entry: stop here
+ return;
}
ECLMultiDomainDBCursor replicaUpdatesCursor = null;
@@ -984,28 +1007,37 @@
{
final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
- searchParams.cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
+ cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
- boolean continueSearch = true;
- while (continueSearch && replicaUpdatesCursor.next())
+ boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
+ if (continueSearch)
{
- // Handle the update message
- final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
- final DN domainBaseDN = replicaUpdatesCursor.getData();
- searchParams.cookie.update(domainBaseDN, updateMsg.getCSN());
- final String cookieString = searchParams.cookie.toString();
-
- final Entry entry = createEntryFromMsg(domainBaseDN, 0, cookieString, updateMsg);
- continueSearch = sendEntryIfMatches(searchOperation, entry, cookieString);
+ entrySender.transitioningToPersistentSearchPhase();
+ sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
}
}
finally
{
+ entrySender.finalizeInitialSearch();
StaticUtils.close(replicaUpdatesCursor);
}
}
+ private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
+ ECLMultiDomainDBCursor replicaUpdatesCursor, final MultiDomainServerState cookie) throws ChangelogException,
+ DirectoryException
+ {
+ boolean continueSearch = true;
+ while (continueSearch && replicaUpdatesCursor.next())
+ {
+ final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
+ final DN domainBaseDN = replicaUpdatesCursor.getData();
+ continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN, cookie);
+ }
+ return continueSearch;
+ }
+
private boolean isPersistentSearch(SearchOperation op)
{
for (PersistentSearch pSearch : getPersistentSearches())
@@ -1022,17 +1054,38 @@
@Override
public void registerPersistentSearch(PersistentSearch pSearch)
{
- final SearchOperation searchOp = pSearch.getSearchOperation();
- if (pSearch.isChangesOnly())
- {
- // this changesOnly persistent search will not go through #search0() down below
- // so we must initialize the entrySender here and never return the base entry
- searchOp.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, getNewestCookie(searchOp));
- }
+ initializeAttachements(pSearch);
+ if (isCookieBased(pSearch.getSearchOperation()))
+ {
+ cookieBasedPersistentSearches.add(pSearch);
+ }
+ else
+ {
+ changeNumberBasedPersistentSearches.add(pSearch);
+ }
super.registerPersistentSearch(pSearch);
}
+ private void initializeAttachements(PersistentSearch pSearch)
+ {
+ final SearchOperation searchOp = pSearch.getSearchOperation();
+ if (isCookieBased(searchOp))
+ {
+ if (pSearch.isChangesOnly())
+ {
+ // this changesOnly persistent search will not go through #initialSearch()
+ // so we must initialize the cookie here
+ searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp));
+ }
+ searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp));
+ }
+ else
+ {
+ searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp));
+ }
+ }
+
private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
{
if (!isCookieBased(searchOp))
@@ -1060,10 +1113,10 @@
*/
private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException
{
- final MultiDomainServerState state = searchParams.cookie;
- if (state != null && !state.isEmpty())
+ final MultiDomainServerState cookie = searchParams.cookie;
+ if (cookie != null && !cookie.isEmpty())
{
- replicationServer.validateCookie(state, searchParams.getExcludedBaseDNs());
+ replicationServer.validateCookie(cookie, searchParams.getExcludedBaseDNs());
}
}
@@ -1076,51 +1129,61 @@
// "initial search" phase must return the base entry immediately
sendBaseChangelogEntry(searchOperation);
+ final ChangeNumberEntrySender entrySender;
+ if (isPersistentSearch(searchOperation))
+ {
+ entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
+ }
+ else
+ {
+ entrySender = new ChangeNumberEntrySender(searchOperation);
+ }
+
DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
- MultiDomainDBCursor replicaUpdatesCursor = null;
+ final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<MultiDomainDBCursor>();
try
{
cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber);
- boolean continueSearch = true;
- while (continueSearch && cnIndexDBCursor.next())
+ final boolean continueSearch =
+ sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor);
+ if (continueSearch)
{
- // Handle the current cnIndex record
- final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
- if (replicaUpdatesCursor == null)
- {
- replicaUpdatesCursor = initializeReplicaUpdatesCursor(cnIndexRecord);
- }
- continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
- if (continueSearch)
- {
- UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
- if (updateMsg != null)
- {
- continueSearch = sendEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg);
- replicaUpdatesCursor.next();
- }
- }
+ entrySender.transitioningToPersistentSearchPhase();
+ sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor);
}
}
finally
{
- StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor);
+ entrySender.finalizeInitialSearch();
+ StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor.get());
}
}
- /**
- * @return {@code true} if search should continue, {@code false} otherwise
- */
- private boolean sendEntryForUpdateMessage(SearchOperation searchOperation,
- ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) throws DirectoryException
+ private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender,
+ final SearchParams params, DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor,
+ AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor) throws ChangelogException, DirectoryException
{
- final DN baseDN = cnIndexRecord.getBaseDN();
- final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
- cookie.update(baseDN, cnIndexRecord.getCSN());
- final String cookieString = cookie.toString();
-
- final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
- return sendEntryIfMatches(searchOperation, entry, null);
+ boolean continueSearch = true;
+ while (continueSearch && cnIndexDBCursor.next())
+ {
+ // Handle the current cnIndex record
+ final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
+ if (replicaUpdatesCursor.get() == null)
+ {
+ replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord));
+ }
+ continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
+ if (continueSearch)
+ {
+ final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get());
+ if (updateMsg != null)
+ {
+ continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg);
+ replicaUpdatesCursor.get().next();
+ }
+ }
+ }
+ return continueSearch;
}
private MultiDomainDBCursor initializeReplicaUpdatesCursor(
@@ -1196,8 +1259,8 @@
/**
* Creates a changelog entry.
*/
- private Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
- throws DirectoryException
+ private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie,
+ final UpdateMsg msg) throws DirectoryException
{
if (msg instanceof AddMsg)
{
@@ -1224,7 +1287,7 @@
* change initiators name if available which is contained in the creatorsName
* attribute.
*/
- private Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
+ private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
throws DirectoryException
{
final AddMsg addMsg = (AddMsg) msg;
@@ -1265,8 +1328,8 @@
* out change initiators name if available which is contained in the
* modifiersName attribute.
*/
- private Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
- throws DirectoryException
+ private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie,
+ final UpdateMsg msg) throws DirectoryException
{
final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
String changeInitiatorsName = null;
@@ -1330,7 +1393,7 @@
* @param entryDN
* DN of original entry
*/
- private void logEncodingMessageError(String messageType, DN entryDN, Exception exception)
+ private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception)
{
TRACER.debugCaught(DebugLogLevel.ERROR, exception);
logError(Message.raw(Category.SYNC, Severity.MILD_ERROR,
@@ -1359,7 +1422,7 @@
String dnString;
if (changeNumber > 0)
{
- // Draft compat mode
+ // change number mode
dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
}
else
@@ -1439,7 +1502,8 @@
*
* @return {@code true} if search should continue, {@code false} otherwise
*/
- private boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie) throws DirectoryException
+ private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie)
+ throws DirectoryException
{
if (matchBaseAndScopeAndFilter(searchOp, entry))
{
@@ -1450,18 +1514,18 @@
}
/** Indicates if the provided entry matches the filter, base and scope. */
- private boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
+ private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
{
return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
&& searchOp.getFilter().matchesEntry(entry);
}
- private List<Control> getControls(String cookie)
+ private static List<Control> getControls(String cookie)
{
if (cookie != null)
{
- Control c = new EntryChangelogNotificationControl(true, cookie);
- return Arrays.asList(c);
+ final Control c = new EntryChangelogNotificationControl(true, cookie);
+ return Collections.singletonList(c);
}
return Collections.emptyList();
}
@@ -1557,4 +1621,218 @@
}
}
+ /**
+ * Describes the current search phase.
+ */
+ private enum SearchPhase
+ {
+ /**
+ * "Initial search" phase. The "initial search" phase is running
+ * concurrently. All update notifications are ignored.
+ */
+ INITIAL,
+ /**
+ * Transitioning from the "initial search" phase to the "persistent search"
+ * phase. "Initial search" phase has finished reading from the DB. It now
+ * verifies if any more updates have been persisted to the DB since stopping
+ * and send them. All update notifications are blocked.
+ */
+ TRANSITIONING,
+ /**
+ * "Persistent search" phase. "Initial search" phase has completed. All
+ * update notifications are published.
+ */
+ PERSISTENT;
+ }
+
+ /**
+ * Contains data to ensure that the same change is not sent twice to clients
+ * because of race conditions between the "initial search" phase and the
+ * "persistent search" phase.
+ */
+ private static class SendEntryData<K extends Comparable<K>>
+ {
+ private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<SearchPhase>(SearchPhase.INITIAL);
+ private final Object transitioningLock = new Object();
+ private volatile K lastKeySentByInitialSearch;
+
+ private void finalizeInitialSearch()
+ {
+ searchPhase.set(SearchPhase.PERSISTENT);
+ synchronized (transitioningLock)
+ { // initial search phase has completed, release all persistent searches
+ transitioningLock.notifyAll();
+ }
+ }
+
+ public void transitioningToPersistentSearchPhase()
+ {
+ searchPhase.set(SearchPhase.TRANSITIONING);
+ }
+
+ private void initialSearchSendsEntry(final K key)
+ {
+ lastKeySentByInitialSearch = key;
+ }
+
+ private boolean persistentSearchCanSendEntry(K key)
+ {
+ final SearchPhase stateValue = searchPhase.get();
+ switch (stateValue)
+ {
+ case INITIAL:
+ return false;
+ case TRANSITIONING:
+ synchronized (transitioningLock)
+ {
+ while (SearchPhase.TRANSITIONING.equals(searchPhase.get()))
+ {
+ // "initial search" phase is over, and is now verifying whether new
+ // changes have been published to the DB.
+ // Wait for this check to complete
+ try
+ {
+ transitioningLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ // Shutdown must have been called. Stop sending entries.
+ return false;
+ }
+ }
+ }
+ return key.compareTo(lastKeySentByInitialSearch) > 0;
+ case PERSISTENT:
+ return true;
+ default:
+ throw new RuntimeException("Not implemented for " + stateValue);
+ }
+ }
+ }
+
+ /** Sends entries to clients for change number searches. */
+ private static class ChangeNumberEntrySender
+ {
+ private final SearchOperation searchOp;
+ private final SendEntryData<Long> sendEntryData = new SendEntryData<Long>();
+
+ private ChangeNumberEntrySender(SearchOperation searchOp)
+ {
+ this.searchOp = searchOp;
+ }
+
+ private void finalizeInitialSearch()
+ {
+ sendEntryData.finalizeInitialSearch();
+ }
+
+ public void transitioningToPersistentSearchPhase()
+ {
+ sendEntryData.transitioningToPersistentSearchPhase();
+ }
+
+ /**
+ * @return {@code true} if search should continue, {@code false} otherwise
+ */
+ private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg)
+ throws DirectoryException
+ {
+ final DN baseDN = cnIndexRecord.getBaseDN();
+ final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
+ cookie.update(baseDN, cnIndexRecord.getCSN());
+ final String cookieString = cookie.toString();
+
+ sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber());
+ final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
+ return sendEntryIfMatches(searchOp, entry, null);
+ }
+
+ private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException
+ {
+ if (sendEntryData.persistentSearchCanSendEntry(changeNumber))
+ {
+ sendEntryIfMatches(searchOp, entry, null);
+ }
+ }
+ }
+
+ /** Sends entries to clients for cookie-based searches. */
+ private static class CookieEntrySender {
+ private final SearchOperation searchOp;
+ private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData =
+ new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR);
+
+ private CookieEntrySender(SearchOperation searchOp)
+ {
+ this.searchOp = searchOp;
+ }
+
+ public void finalizeInitialSearch()
+ {
+ for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
+ {
+ sendEntryData.finalizeInitialSearch();
+ }
+ }
+
+ public void transitioningToPersistentSearchPhase()
+ {
+ for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
+ {
+ sendEntryData.transitioningToPersistentSearchPhase();
+ }
+ }
+
+ private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn)
+ {
+ final Pair<DN, Integer> replicaId = Pair.of(baseDN, csn.getServerId());
+ SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId);
+ if (data == null)
+ {
+ final SendEntryData<CSN> newData = new SendEntryData<CSN>();
+ data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData);
+ return data == null ? newData : data;
+ }
+ return data;
+ }
+
+ private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN,
+ final MultiDomainServerState cookie) throws DirectoryException
+ {
+ final CSN csn = updateMsg.getCSN();
+ final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
+ sendEntryData.initialSearchSendsEntry(csn);
+ final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
+ final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
+ return sendEntryIfMatches(searchOp, entry, cookieString);
+ }
+
+ private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg)
+ throws DirectoryException
+ {
+ final CSN csn = updateMsg.getCSN();
+ final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
+ if (sendEntryData.persistentSearchCanSendEntry(csn))
+ {
+ // multi threaded case: wait for the "initial search" phase to set the cookie
+ final MultiDomainServerState cookie = searchOp.getAttachment(COOKIE_ATTACHMENT);
+
+ final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
+ final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
+ // FIXME JNR use this instead of previous line:
+ // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
+ sendEntryIfMatches(searchOp, cookieEntry, cookieString);
+ }
+ }
+
+ private String updateCookie(final MultiDomainServerState cookie, DN baseDN, final CSN csn)
+ {
+ synchronized (cookie)
+ { // forbid concurrent updates to the cookie
+ cookie.update(baseDN, csn);
+ return cookie.toString();
+ }
+ }
+ }
}
--
Gitblit v1.10.0