From 02bbeacbfb05101989dac510cbef7815fdf28a2e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 01 Sep 2014 12:51:46 +0000
Subject: [PATCH] OPENDJ-1206 (CR-4393) Create a new ReplicationBackend/ChangelogBackend to support cn=changelog
---
opends/src/server/org/opends/server/backends/ChangelogBackend.java | 569 +++++++++++++++++++++++++++++++++++++++-----------------
1 files changed, 394 insertions(+), 175 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/ChangelogBackend.java b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
index 0f0da2a..6808a0a 100644
--- a/opends/src/server/org/opends/server/backends/ChangelogBackend.java
+++ b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -29,8 +29,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.LDIFWriter.*;
@@ -39,6 +38,7 @@
import java.text.SimpleDateFormat;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -53,6 +53,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
@@ -62,6 +63,7 @@
import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
@@ -72,10 +74,11 @@
import org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.*;
+import org.opends.server.util.ServerConstants;
import org.opends.server.util.StaticUtils;
/**
- * A backend that provides access to the changelog, ie the "cn=changelog"
+ * A backend that provides access to the changelog, i.e. the "cn=changelog"
* suffix. It is a read-only backend that is created by a
* {@code ReplicationServer} and is not configurable.
* <p>
@@ -85,8 +88,8 @@
* request. The cookie provided in the control is used to retrieve entries from
* the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
* the entries.</li>
- * <li>Draft compat mode: when no "ECL Cookie Exchange Control" is provided with
- * the request. The entries are retrieved using the ChangeNumberIndexDB (or
+ * <li>Draft compatibility mode: when no "ECL Cookie Exchange Control" is provided
+ * with the request. The entries are retrieved using the ChangeNumberIndexDB (or
* DraftDB, hence the name) and their attributes are set with the information
* from the ReplicasDBs. The <code>changeNumber</code> attribute value is set
* from the content of ChangeNumberIndexDB.</li>
@@ -134,8 +137,20 @@
private static final AttributeType MODIFIERS_NAME_TYPE =
DirectoryConfig.getAttributeType(OP_ATTR_MODIFIERS_NAME_LC, true);
- /** The DN for the base changelog entry. */
- private DN baseChangelogDN;
+ /** The base DN for the external change log. */
+ public static final DN CHANGELOG_BASE_DN;
+
+ static
+ {
+ try
+ {
+ CHANGELOG_BASE_DN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT);
+ }
+ catch (DirectoryException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
/** The set of base DNs for this backend. */
private DN[] baseDNs;
@@ -149,7 +164,7 @@
private final ECLEnabledDomainPredicate domainPredicate;
/**
- * Creates a new backend with the provided repication server.
+ * Creates a new backend with the provided replication server.
*
* @param replicationServer
* The replication server on which the changes are read.
@@ -165,6 +180,23 @@
setPrivateBackend(true);
}
+ private ChangelogDB getChangelogDB()
+ {
+ return replicationServer.getChangelogDB();
+ }
+
+ /**
+ * Returns the ChangelogBackend configured for "cn=changelog" in this directory server.
+ *
+ * @return the ChangelogBackend configured for "cn=changelog" in this directory server
+ * @deprecated instead inject the required object where needed
+ */
+ @Deprecated
+ public static ChangelogBackend getInstance()
+ {
+ return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN);
+ }
+
/** {@inheritDoc} */
@Override
public void configureBackend(final Configuration config) throws ConfigException
@@ -176,29 +208,16 @@
@Override
public void initializeBackend() throws InitializationException
{
- try
- {
- baseChangelogDN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT);
- baseDNs = new DN[] { baseChangelogDN };
- }
- catch (final DirectoryException e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- throw new InitializationException(
- ERR_BACKEND_CANNOT_DECODE_BACKEND_ROOT_DN.get(getBackendID(), getExceptionMessage(e)), e);
- }
+ baseDNs = new DN[] { CHANGELOG_BASE_DN };
try
{
- DirectoryServer.registerBaseDN(baseChangelogDN, this, true);
+ DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true);
}
catch (final DirectoryException e)
{
throw new InitializationException(
- ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(baseChangelogDN.toString(), getExceptionMessage(e)), e);
+ ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(CHANGELOG_BASE_DN.toString(), getExceptionMessage(e)), e);
}
}
@@ -206,9 +225,11 @@
@Override
public void finalizeBackend()
{
+ super.finalizeBackend();
+
try
{
- DirectoryServer.deregisterBaseDN(baseChangelogDN);
+ DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN);
}
catch (final DirectoryException e)
{
@@ -299,7 +320,7 @@
@Override
public DN getBaseDN()
{
- return baseChangelogDN;
+ return CHANGELOG_BASE_DN;
}
@Override
@@ -313,6 +334,13 @@
{
return SearchScope.WHOLE_SUBTREE;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public Object setAttachment(String name, Object value)
+ {
+ return null;
+ }
}
/** {@inheritDoc} */
@@ -320,7 +348,7 @@
public long numSubordinates(final DN entryDN, final boolean subtree) throws DirectoryException
{
// Compute the num subordinates only for the base DN
- if (entryDN == null || !baseChangelogDN.equals(entryDN))
+ if (entryDN == null || !CHANGELOG_BASE_DN.equals(entryDN))
{
return -1;
}
@@ -329,11 +357,9 @@
return 1;
}
// Search with cookie mode to count all update messages
- final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
- excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT);
- SearchParams params = new SearchParams("0", excludedDomains);
+ final SearchParams params = new SearchParams(getExcludedDomains());
params.requestType = REQUEST_TYPE_FROM_COOKIE;
- params.multiDomainServerState = new MultiDomainServerState();
+ params.cookie = new MultiDomainServerState();
NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation();
try
{
@@ -342,11 +368,118 @@
catch (ChangelogException e)
{
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES.get(
- baseChangelogDN.toString(), stackTraceToSingleLineString(e)));
+ CHANGELOG_BASE_DN.toString(), stackTraceToSingleLineString(e)));
}
return searchOp.numSubordinates;
}
+ private Set<String> getExcludedDomains()
+ {
+ final Set<String> domains = MultimasterReplication.getECLDisabledDomains();
+ domains.add(DN_EXTERNAL_CHANGELOG_ROOT);
+ return domains;
+ }
+
+ /**
+ * Notifies persistent searches of this backend that a new entry was added to it.
+ * <p>
+ * Note: This method is called in a multi-threaded context.
+ *
+ * @param baseDN
+ * the baseDN of the newly added entry.
+ * @param changeNumber
+ * the change number of the newly added entry. It will be greater
+ * than zero for entries added to the change number index and less
+ * than or equal to zero for entries added to any replica DB
+ * @param cookieString
+ * a string representing the cookie of the newly added entry.
+ * This is only meaningful for entries added to the change number index
+ * @param updateMsg
+ * the update message of the newly added entry
+ * @throws ChangelogException
+ * If a problem occurs while notifying of the newly added entry.
+ */
+ public void notifyEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
+ throws ChangelogException
+ {
+ final boolean isCookieEntry = changeNumber <= 0;
+ final List<SearchOperation> pSearchOps = getPersistentSearches(isCookieEntry);
+ if (pSearchOps.isEmpty() || !(updateMsg instanceof LDAPUpdateMsg))
+ {
+ return;
+ }
+
+ try
+ {
+ final Entry entry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
+ for (SearchOperation pSearchOp : pSearchOps)
+ {
+ final EntrySender entrySender = (EntrySender)
+ pSearchOp.getAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL);
+
+ // when returning changesOnly, the first incoming update must return
+ // the base entry before any other changes,
+ // so force sending now, when protected by the synchronized block
+ if (isCookieEntry)
+ { // cookie based search
+ final String cookieStr;
+ synchronized (entrySender)
+ { // forbid concurrent updates to the cookie
+ entrySender.cookie.update(baseDN, updateMsg.getCSN());
+ cookieStr = entrySender.cookie.toString();
+
+ entrySender.sendBaseChangelogEntry(true);
+ }
+ final Entry entry2 = createEntryFromMsg(baseDN, changeNumber, cookieStr, updateMsg);
+ // FIXME JNR use this instead of previous line:
+ // entry.replaceAttribute(Attributes.create("changelogcookie", cookieStr));
+ entrySender.sendEntryIfMatches(entry2, cookieStr);
+ }
+ else
+ { // draft changeNumber search
+ if (!entrySender.hasReturnedBaseEntry.get())
+ {
+ synchronized (entrySender)
+ {
+ entrySender.sendBaseChangelogEntry(true);
+ }
+ }
+ entrySender.sendEntryIfMatches(entry, null);
+ }
+ }
+ }
+ catch (DirectoryException e)
+ {
+ throw new ChangelogException(e.getMessageObject(), e);
+ }
+ }
+
+ private List<SearchOperation> getPersistentSearches(boolean wantCookieBasedSearch)
+ {
+ final List<SearchOperation> results = new ArrayList<SearchOperation>();
+ for (PersistentSearch pSearch : getPersistentSearches())
+ {
+ final SearchOperation op = pSearch.getSearchOperation();
+ if (wantCookieBasedSearch == isCookieBased(op))
+ {
+ results.add(op);
+ }
+ }
+ return results;
+ }
+
+ private boolean isCookieBased(final SearchOperation searchOp)
+ {
+ for (Control c : searchOp.getRequestControls())
+ {
+ if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID()))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
/** {@inheritDoc} */
@Override
public void addEntry(Entry entry, AddOperation addOperation)
@@ -409,9 +542,7 @@
private SearchParams buildSearchParameters(final SearchOperation searchOperation) throws DirectoryException
{
- final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
- excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT);
- final SearchParams params = new SearchParams(searchOperation.toString(), excludedDomains);
+ final SearchParams params = new SearchParams(getExcludedDomains());
final ExternalChangelogRequestControl eclRequestControl =
searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
if (eclRequestControl == null)
@@ -421,7 +552,7 @@
else
{
params.requestType = REQUEST_TYPE_FROM_COOKIE;
- params.multiDomainServerState = eclRequestControl.getCookie();
+ params.cookie = eclRequestControl.getCookie();
}
return params;
}
@@ -523,7 +654,7 @@
{
try
{
- return numSubordinates(baseChangelogDN, true) + 1;
+ return numSubordinates(CHANGELOG_BASE_DN, true) + 1;
}
catch (DirectoryException e)
{
@@ -543,33 +674,28 @@
static class SearchParams
{
private ECLRequestType requestType;
- private final String operationId;
private final Set<String> excludedBaseDNs;
private long lowestChangeNumber = -1;
private long highestChangeNumber = -1;
private CSN csn = new CSN(0, 0, 0);
- private MultiDomainServerState multiDomainServerState;
+ private MultiDomainServerState cookie;
/**
* Creates search parameters.
*/
SearchParams()
{
- operationId = "";
- excludedBaseDNs = Collections.emptySet();
+ this.excludedBaseDNs = Collections.emptySet();
}
/**
* Creates search parameters with provided id and excluded domain DNs.
*
- * @param operationId
- * The id of the operation.
* @param excludedBaseDNs
* Set of DNs to exclude from search.
*/
- SearchParams(final String operationId, final Set<String> excludedBaseDNs)
+ SearchParams(final Set<String> excludedBaseDNs)
{
- this.operationId = operationId;
this.excludedBaseDNs = excludedBaseDNs;
}
@@ -802,45 +928,40 @@
private void searchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation)
throws DirectoryException, ChangelogException
{
- final ReplicationDomainDB replicationDomainDB = replicationServer.getChangelogDB().getReplicationDomainDB();
validateProvidedCookie(searchParams);
+ final boolean isPersistentSearch = isPersistentSearch(searchOperation);
- boolean hasReturnedBaseEntry = false;
+ final EntrySender entrySender = new EntrySender(searchOperation, searchParams.cookie);
+ if (isPersistentSearch)
+ {
+ searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, entrySender);
+ }
+
ECLMultiDomainDBCursor replicaUpdatesCursor = null;
try
{
+ final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
- searchParams.multiDomainServerState, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
+ searchParams.cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
- MultiDomainServerState cookie = searchParams.multiDomainServerState;
boolean continueSearch = true;
while (continueSearch && replicaUpdatesCursor.next())
{
- // Handle creation of base changelog entry on first update message found
- if (!hasReturnedBaseEntry)
- {
- if (!returnBaseChangelogEntry(searchOperation, true))
- {
- return;
- }
- hasReturnedBaseEntry = true;
- }
// Handle the update message
final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
final DN domainBaseDN = replicaUpdatesCursor.getData();
- cookie.update(domainBaseDN, updateMsg.getCSN());
- final Entry entry = createEntryFromMsg(domainBaseDN, 0L, cookie.toString(), updateMsg);
- if (matchBaseAndScopeAndFilter(entry, searchOperation))
- {
- Control control = new EntryChangelogNotificationControl(true, cookie.toString());
- continueSearch = searchOperation.returnEntry(entry, Arrays.asList(control));
- }
+ searchParams.cookie.update(domainBaseDN, updateMsg.getCSN());
+ final String cookieString = searchParams.cookie.toString();
+
+ final Entry entry = createEntryFromMsg(domainBaseDN, 0, cookieString, updateMsg);
+ continueSearch = entrySender.sendEntryIfMatches(entry, cookieString);
}
- // Handle creation of base changelog entry when no update message is found
- if (!hasReturnedBaseEntry)
+
+ if (!isPersistentSearch)
{
- returnBaseChangelogEntry(searchOperation, false);
+ // send the base changelog entry if no update message is found
+ entrySender.sendBaseChangelogEntry(false);
}
}
finally
@@ -849,6 +970,52 @@
}
}
+ private boolean isPersistentSearch(SearchOperation op)
+ {
+ for (PersistentSearch pSearch : getPersistentSearches())
+ {
+ if (op == pSearch.getSearchOperation())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void registerPersistentSearch(PersistentSearch pSearch)
+ {
+ super.registerPersistentSearch(pSearch);
+
+ final SearchOperation searchOp = pSearch.getSearchOperation();
+ if (pSearch.isChangesOnly())
+ {
+ // this persistent search will not go through #search0() down below
+ // so we must initialize the cookie here
+ searchOp.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL,
+ new EntrySender(searchOp, getNewestCookie(searchOp)));
+ }
+ }
+
+ private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
+ {
+ if (!isCookieBased(searchOp))
+ {
+ return null;
+ }
+
+ final MultiDomainServerState cookie = new MultiDomainServerState();
+ for (final Iterator<ReplicationServerDomain> it =
+ replicationServer.getDomainIterator(); it.hasNext();)
+ {
+ final DN baseDN = it.next().getBaseDN();
+ final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN);
+ cookie.update(baseDN, state);
+ }
+ return cookie;
+ }
+
/**
* Validates the cookie contained in search parameters by checking its content
* with the actual replication server state.
@@ -858,7 +1025,7 @@
*/
private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException
{
- final MultiDomainServerState state = searchParams.multiDomainServerState;
+ final MultiDomainServerState state = searchParams.cookie;
if (state != null && !state.isEmpty())
{
replicationServer.validateServerState(state, searchParams.getExcludedBaseDNs());
@@ -871,102 +1038,67 @@
private void searchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation)
throws ChangelogException, DirectoryException
{
- boolean hasReturnedBaseEntry = false;
- final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ final EntrySender entrySender = new EntrySender(searchOperation, null);
+ final boolean isPersistentSearch = isPersistentSearch(searchOperation);
+ if (isPersistentSearch)
+ {
+ searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, entrySender);
+ }
+
DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
MultiDomainDBCursor replicaUpdatesCursor = null;
- try {
- cnIndexDBCursor = getCNIndexDBCursor(changelogDB, params.lowestChangeNumber);
+ try
+ {
+ cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber);
boolean continueSearch = true;
while (continueSearch && cnIndexDBCursor.next())
{
- // Handle creation of base changelog entry on cnIndex record found
- if (!hasReturnedBaseEntry)
- {
- if (!returnBaseChangelogEntry(searchOperation, true))
- {
- return;
- }
- hasReturnedBaseEntry = true;
- }
// Handle the current cnIndex record
final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
if (replicaUpdatesCursor == null)
{
- replicaUpdatesCursor = initializeReplicaUpdatesCursor(changelogDB, cnIndexRecord);
+ replicaUpdatesCursor = initializeReplicaUpdatesCursor(cnIndexRecord);
}
continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
if (continueSearch)
{
- UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
- if (updateMsg != null)
- {
- continueSearch = returnEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg);
- replicaUpdatesCursor.next();
- }
+ UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
+ if (updateMsg != null)
+ {
+ continueSearch = sendEntryForUpdateMessage(entrySender, cnIndexRecord, updateMsg);
+ replicaUpdatesCursor.next();
+ }
}
}
- // Handle creation of base changelog entry when no update message is found
- if (!hasReturnedBaseEntry)
+
+ if (!isPersistentSearch)
{
- returnBaseChangelogEntry(searchOperation, false);
+ // send the base changelog entry if no update message is found
+ entrySender.sendBaseChangelogEntry(false);
}
}
- finally {
+ finally
+ {
StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor);
}
}
/**
- * Create and returns the base changelog entry to provided search operation.
- *
* @return {@code true} if search should continue, {@code false} otherwise
*/
- private boolean returnBaseChangelogEntry(final SearchOperation searchOperation, boolean hasSubordinates)
- throws DirectoryException
+ private boolean sendEntryForUpdateMessage(EntrySender entrySender,
+ ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) throws DirectoryException
{
- final DN baseDN = searchOperation.getBaseDN();
- final SearchFilter filter = searchOperation.getFilter();
- final SearchScope scope = searchOperation.getScope();
-
- if (baseChangelogDN.matchesBaseAndScope(baseDN, scope))
- {
- final Entry entry = buildBaseChangelogEntry(hasSubordinates);
- if (filter.matchesEntry(entry) && !searchOperation.returnEntry(entry, null))
- {
- // Abandon, size limit reached.
- return false;
- }
- }
- if (baseDN.equals(baseChangelogDN) && scope.equals(SearchScope.BASE_OBJECT))
- {
- // Only the change log root entry was requested
- return false;
- }
- return true;
- }
-
- /**
- * @return {@code true} if search should continue, {@code false} otherwise
- */
- private boolean returnEntryForUpdateMessage(
- final SearchOperation searchOperation,
- final ChangeNumberIndexRecord cnIndexRecord,
- final UpdateMsg updateMsg)
- throws DirectoryException
- {
+ final DN baseDN = cnIndexRecord.getBaseDN();
final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
- final DN changeDN = cnIndexRecord.getBaseDN();
- cookie.update(changeDN, cnIndexRecord.getCSN());
- final Entry entry = createEntryFromMsg(changeDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
- if (matchBaseAndScopeAndFilter(entry, searchOperation))
- {
- return searchOperation.returnEntry(entry, null);
- }
- return true;
+ cookie.update(baseDN, cnIndexRecord.getCSN());
+ final String cookieString = cookie.toString();
+
+ final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
+ return entrySender.sendEntryIfMatches(entry, null);
}
- private MultiDomainDBCursor initializeReplicaUpdatesCursor(final ChangelogDB changelogDB,
+ private MultiDomainDBCursor initializeReplicaUpdatesCursor(
final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
{
final MultiDomainServerState state = new MultiDomainServerState();
@@ -975,7 +1107,7 @@
// No need for ECLMultiDomainDBCursor in this case
// as updateMsg will be matched with cnIndexRecord
final MultiDomainDBCursor replicaUpdatesCursor =
- changelogDB.getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY);
+ getChangelogDB().getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY);
replicaUpdatesCursor.next();
return replicaUpdatesCursor;
}
@@ -1023,10 +1155,10 @@
}
/** Returns a cursor on CNIndexDB for the provided first change number. */
- private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(final ChangelogDB changelogDB,
+ private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(
final long firstChangeNumber) throws ChangelogException
{
- final ChangeNumberIndexDB cnIndexDB = changelogDB.getChangeNumberIndexDB();
+ final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB();
long changeNumberToUse = firstChangeNumber;
if (changeNumberToUse <= 1)
{
@@ -1036,31 +1168,6 @@
return cnIndexDB.getCursorFrom(changeNumberToUse);
}
- /** Indicates if the provided entry matches the filter, base and scope. */
- private boolean matchBaseAndScopeAndFilter(Entry entry, SearchOperation searchOp) throws DirectoryException
- {
- return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
- && searchOp.getFilter().matchesEntry(entry);
- }
-
- /**
- * Retrieves the base changelog entry.
- */
- private Entry buildBaseChangelogEntry(boolean hasSubordinates)
- {
- final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType,List<Attribute>>();
- final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType,List<Attribute>>();
-
- addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
- addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
- ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
- addAttributeByUppercaseName("hassubordinates", "hasSubordinates", Boolean.toString(hasSubordinates),
- userAttrs, operationalAttrs);
- addAttributeByUppercaseName("entrydn", "entryDN", baseChangelogDN.toString(),
- userAttrs, operationalAttrs);
- return new Entry(baseChangelogDN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
- }
-
/**
* Creates a changelog entry.
*/
@@ -1225,16 +1332,16 @@
{
final CSN csn = msg.getCSN();
String dnString;
- if (changeNumber == 0)
- {
- // Cookie mode
- dnString = "replicationCSN=" + csn + "," + baseDN.toString() + "," + DN_EXTERNAL_CHANGELOG_ROOT;
- }
- else
+ if (changeNumber > 0)
{
// Draft compat mode
dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
}
+ else
+ {
+ // Cookie mode
+ dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT;
+ }
final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
@@ -1247,7 +1354,7 @@
addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs);
// REQUIRED attributes
- if (changeNumber != 0)
+ if (changeNumber > 0)
{
addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs);
}
@@ -1277,7 +1384,8 @@
{
addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs);
}
- addAttributeByType("changelogcookie", "changeLogCookie", cookie, userAttrs, opAttrs);
+ final String cookie2 = cookie != null ? cookie : "";
+ addAttributeByType("changelogcookie", "changeLogCookie", cookie2, userAttrs, opAttrs);
final List<RawAttribute> includedAttributes = msg.getEclIncludes();
if (includedAttributes != null && !includedAttributes.isEmpty())
@@ -1300,6 +1408,116 @@
return new Entry(DN.decode(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs);
}
+ /**
+ * Used to send entries to searches on cn=changelog. This class ensures the
+ * base changelog entry is sent before sending any other entry. It is also
+ * used as a store when going from the "initial search" phase to the
+ * "persistent search" phase.
+ */
+ private static class EntrySender
+ {
+
+ private final SearchOperation searchOp;
+ /**
+ * Used by the cookie-based searches to communicate the cookie between the
+ * initial search phase and the persistent search phase. This is unused with
+ * draft change number searches.
+ */
+ private final MultiDomainServerState cookie;
+ private final AtomicBoolean hasReturnedBaseEntry = new AtomicBoolean();
+
+ public EntrySender(SearchOperation searchOp, MultiDomainServerState cookie)
+ {
+ this.searchOp = searchOp;
+ this.cookie = cookie;
+ }
+
+ /**
+ * Sends the entry if it matches the base, scope and filter of the current search operation.
+ * It will also send the base changelog entry if it needs to be sent and was not sent before.
+ *
+ * @return {@code true} if search should continue, {@code false} otherwise
+ */
+ private boolean sendEntryIfMatches(Entry entry, String cookie) throws DirectoryException
+ {
+ // About to send one entry: ensure the base changelog entry is sent first
+ if (!sendBaseChangelogEntry(true))
+ {
+ // only return the base entry: stop here
+ return false;
+ }
+ if (matchBaseAndScopeAndFilter(entry))
+ {
+ return searchOp.returnEntry(entry, getControls(cookie));
+ }
+ // maybe the next entry will match?
+ return true;
+ }
+
+ /** Indicates if the provided entry matches the filter, base and scope. */
+ private boolean matchBaseAndScopeAndFilter(Entry entry) throws DirectoryException
+ {
+ return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
+ && searchOp.getFilter().matchesEntry(entry);
+ }
+
+ private List<Control> getControls(String cookie)
+ {
+ if (cookie != null)
+ {
+ Control c = new EntryChangelogNotificationControl(true, cookie);
+ return Arrays.asList(c);
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Create and returns the base changelog entry to the underlying search operation.
+ *
+ * @return {@code true} if search should continue, {@code false} otherwise
+ */
+ private boolean sendBaseChangelogEntry(boolean hasSubordinates) throws DirectoryException
+ {
+ if (hasReturnedBaseEntry.compareAndSet(false, true))
+ {
+ final DN baseDN = searchOp.getBaseDN();
+ final SearchFilter filter = searchOp.getFilter();
+ final SearchScope scope = searchOp.getScope();
+
+ if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope))
+ {
+ final Entry entry = buildBaseChangelogEntry(hasSubordinates);
+ if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null))
+ {
+ // Abandon, size limit reached.
+ return false;
+ }
+ }
+ return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN)
+ || !scope.equals(SearchScope.BASE_OBJECT);
+ }
+ return true;
+ }
+
+ private Entry buildBaseChangelogEntry(boolean hasSubordinates)
+ {
+ final Map<AttributeType, List<Attribute>> userAttrs =
+ new LinkedHashMap<AttributeType, List<Attribute>>();
+ final Map<AttributeType, List<Attribute>> operationalAttrs =
+ new LinkedHashMap<AttributeType, List<Attribute>>();
+
+ addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME,
+ ChangelogBackend.BACKEND_ID, userAttrs, operationalAttrs);
+ addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
+ ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
+ addAttributeByUppercaseName("hassubordinates", "hasSubordinates",
+ Boolean.toString(hasSubordinates), userAttrs, operationalAttrs);
+ addAttributeByUppercaseName("entrydn", "entryDN",
+ ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
+ return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
+ }
+ }
+
private static void addAttribute(final Entry e, final String attrType, final String attrValue)
{
e.addAttribute(Attributes.create(attrType, attrValue), null);
@@ -1313,7 +1531,7 @@
addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true);
}
- private void addAttributeByUppercaseName(String attrNameLowercase,
+ private static void addAttributeByUppercaseName(String attrNameLowercase,
String attrNameUppercase, String attrValue,
Map<AttributeType, List<Attribute>> userAttrs,
Map<AttributeType, List<Attribute>> operationalAttrs)
@@ -1331,8 +1549,9 @@
{
attrType = DirectoryServer.getDefaultAttributeType(attrNameUppercase);
}
- final Attribute a = addByType ?
- Attributes.create(attrType, attrValue) : Attributes.create(attrNameUppercase, attrValue);
+ final Attribute a = addByType
+ ? Attributes.create(attrType, attrValue)
+ : Attributes.create(attrNameUppercase, attrValue);
final List<Attribute> attrList = Collections.singletonList(a);
if (attrType.isOperational())
{
--
Gitblit v1.10.0