/*
|
* CDDL HEADER START
|
*
|
* The contents of this file are subject to the terms of the
|
* Common Development and Distribution License, Version 1.0 only
|
* (the "License"). You may not use this file except in compliance
|
* with the License.
|
*
|
* You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
|
* or http://forgerock.org/license/CDDLv1.0.html.
|
* See the License for the specific language governing permissions
|
* and limitations under the License.
|
*
|
* When distributing Covered Code, include this CDDL HEADER in each
|
* file and include the License file at legal-notices/CDDLv1_0.txt.
|
* If applicable, add the following below this CDDL HEADER, with the
|
* fields enclosed by brackets "[]" replaced with your own identifying
|
* information:
|
* Portions Copyright [yyyy] [name of copyright owner]
|
*
|
* CDDL HEADER END
|
*
|
*
|
* Copyright 2014 ForgeRock AS.
|
*/
|
package org.opends.server.backends;
|
|
import java.text.SimpleDateFormat;
|
import java.util.Collection;
|
import java.util.Collections;
|
import java.util.Date;
|
import java.util.HashSet;
|
import java.util.Iterator;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Set;
|
import java.util.TimeZone;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import org.opends.messages.Category;
|
import org.opends.messages.Message;
|
import org.opends.messages.Severity;
|
import org.opends.server.admin.Configuration;
|
import org.opends.server.api.Backend;
|
import org.opends.server.config.ConfigConstants;
|
import org.opends.server.config.ConfigException;
|
import org.opends.server.controls.EntryChangelogNotificationControl;
|
import org.opends.server.controls.ExternalChangelogRequestControl;
|
import org.opends.server.core.AddOperation;
|
import org.opends.server.core.DeleteOperation;
|
import org.opends.server.core.DirectoryServer;
|
import org.opends.server.core.ModifyDNOperation;
|
import org.opends.server.core.ModifyOperation;
|
import org.opends.server.core.PersistentSearch;
|
import org.opends.server.core.SearchOperation;
|
import org.opends.server.loggers.debug.DebugTracer;
|
import org.opends.server.replication.common.CSN;
|
import org.opends.server.replication.common.MultiDomainServerState;
|
import org.opends.server.replication.common.ServerState;
|
import org.opends.server.replication.protocol.AddMsg;
|
import org.opends.server.replication.protocol.DeleteMsg;
|
import org.opends.server.replication.protocol.LDAPUpdateMsg;
|
import org.opends.server.replication.protocol.ModifyCommonMsg;
|
import org.opends.server.replication.protocol.ModifyDNMsg;
|
import org.opends.server.replication.protocol.UpdateMsg;
|
import org.opends.server.replication.server.ReplicationServer;
|
import org.opends.server.replication.server.ReplicationServerDomain;
|
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
|
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
|
import org.opends.server.replication.server.changelog.api.ChangelogDB;
|
import org.opends.server.replication.server.changelog.api.ChangelogException;
|
import org.opends.server.replication.server.changelog.api.DBCursor;
|
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
|
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
|
import org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor;
|
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
|
import org.opends.server.types.Attribute;
|
import org.opends.server.types.AttributeType;
|
import org.opends.server.types.AttributeValue;
|
import org.opends.server.types.Attributes;
|
import org.opends.server.types.BackupConfig;
|
import org.opends.server.types.BackupDirectory;
|
import org.opends.server.types.ByteString;
|
import org.opends.server.types.CanceledOperationException;
|
import org.opends.server.types.ConditionResult;
|
import org.opends.server.types.Control;
|
import org.opends.server.types.DN;
|
import org.opends.server.types.DebugLogLevel;
|
import org.opends.server.types.DirectoryConfig;
|
import org.opends.server.types.DirectoryException;
|
import org.opends.server.types.Entry;
|
import org.opends.server.types.FilterType;
|
import org.opends.server.types.IndexType;
|
import org.opends.server.types.InitializationException;
|
import org.opends.server.types.LDIFExportConfig;
|
import org.opends.server.types.LDIFImportConfig;
|
import org.opends.server.types.LDIFImportResult;
|
import org.opends.server.types.Modification;
|
import org.opends.server.types.ModificationType;
|
import org.opends.server.types.ObjectClass;
|
import org.opends.server.types.Privilege;
|
import org.opends.server.types.RDN;
|
import org.opends.server.types.RawAttribute;
|
import org.opends.server.types.RestoreConfig;
|
import org.opends.server.types.ResultCode;
|
import org.opends.server.types.SearchFilter;
|
import org.opends.server.types.SearchScope;
|
import org.opends.server.types.WritabilityMode;
|
import org.opends.server.util.StaticUtils;
|
|
import com.forgerock.opendj.util.Pair;
|
|
import static org.opends.messages.BackendMessages.*;
|
import static org.opends.messages.ReplicationMessages.*;
|
import static org.opends.server.config.ConfigConstants.*;
|
import static org.opends.server.loggers.ErrorLogger.*;
|
import static org.opends.server.loggers.debug.DebugLogger.*;
|
import static org.opends.server.replication.plugin.MultimasterReplication.*;
|
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
|
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
|
import static org.opends.server.util.LDIFWriter.*;
|
import static org.opends.server.util.ServerConstants.*;
|
import static org.opends.server.util.StaticUtils.*;
|
|
/**
|
* A backend that provides access to the changelog, i.e. the "cn=changelog"
|
* suffix. It is a read-only backend that is created by a
|
* {@code ReplicationServer} and is not configurable.
|
* <p>
|
* There are two modes to search the changelog:
|
* <ul>
|
* <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the
|
* request. The cookie provided in the control is used to retrieve entries from
|
* the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
|
* the entries.</li>
|
* <li>Change number mode: when no "ECL Cookie Exchange Control" is provided
|
* with the request. The entries are retrieved using the ChangeNumberIndexDB and
|
* their attributes are set with the information from the ReplicasDBs. The
|
* <code>changeNumber</code> attribute value is set from the content of
|
* ChangeNumberIndexDB.</li>
|
* </ul>
|
* <h3>Searches flow</h3>
|
* <p>
|
* Here is the flow of searches within the changelog backend APIs:
|
* <ul>
|
* <li>Normal searches only go through:
|
* <ol>
|
* <li>{@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
|
* </ol>
|
* </li>
|
* <li>Persistent searches with <code>changesOnly=false</code> go through:
|
* <ol>
|
* <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
|
* (once, single threaded),</li>
|
* <li>
|
* {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
|
* <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
|
* threaded)</li>
|
* </ol>
|
* </li>
|
* <li>Persistent searches with <code>changesOnly=true</code> go through:
|
* <ol>
|
* <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
|
* (once, single threaded)</li>
|
* <li>
|
* {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
|
* threaded)</li>
|
* </ol>
|
* </li>
|
* </ul>
|
*
|
* @see ReplicationServer
|
*/
|
public class ChangelogBackend extends Backend<Configuration>
|
{
|
private static final DebugTracer TRACER = getTracer();
|
|
/** The id of this backend. */
|
public static final String BACKEND_ID = "changelog";
|
|
private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L;
|
|
private static final String CHANGE_NUMBER_ATTR = "changeNumber";
|
private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
|
private static final String COOKIE_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".cookie";
|
private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
|
|
/** The set of objectclasses that will be used in root entry. */
|
private static final Map<ObjectClass, String>
|
CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2);
|
|
static
|
{
|
CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
|
CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass("container", true), "container");
|
}
|
|
/** The set of objectclasses that will be used in ECL entries. */
|
private static final Map<ObjectClass, String>
|
CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2);
|
|
static
|
{
|
CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
|
CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY, true), OC_CHANGELOG_ENTRY);
|
}
|
|
/** The attribute type for the "creatorsName" attribute. */
|
private static final AttributeType CREATORS_NAME_TYPE =
|
DirectoryConfig.getAttributeType(OP_ATTR_CREATORS_NAME_LC, true);
|
|
/** The attribute type for the "modifiersName" attribute. */
|
private static final AttributeType MODIFIERS_NAME_TYPE =
|
DirectoryConfig.getAttributeType(OP_ATTR_MODIFIERS_NAME_LC, true);
|
|
/** The base DN for the external change log. */
|
public static final DN CHANGELOG_BASE_DN;
|
|
static
|
{
|
try
|
{
|
CHANGELOG_BASE_DN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT);
|
}
|
catch (DirectoryException e)
|
{
|
throw new RuntimeException(e);
|
}
|
}
|
|
/** The set of base DNs for this backend. */
|
private DN[] baseDNs;
|
/** The set of supported controls for this backend. */
|
private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL);
|
/** Whether the base changelog entry has subordinates. */
|
private Boolean baseEntryHasSubordinates;
|
|
/** The replication server on which the changelog is read. */
|
private final ReplicationServer replicationServer;
|
private final ECLEnabledDomainPredicate domainPredicate;
|
|
/** The set of cookie-based persistent searches registered with this backend. */
|
private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches =
|
new ConcurrentLinkedQueue<PersistentSearch>();
|
/**
|
* The set of change number-based persistent searches registered with this
|
* backend.
|
*/
|
private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches =
|
new ConcurrentLinkedQueue<PersistentSearch>();
|
|
/**
|
* Creates a new backend with the provided replication server.
|
*
|
* @param replicationServer
|
* The replication server on which the changes are read.
|
* @param domainPredicate
|
* Returns whether a domain is enabled for the external changelog.
|
*/
|
public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate)
|
{
|
this.replicationServer = replicationServer;
|
this.domainPredicate = domainPredicate;
|
setBackendID(BACKEND_ID);
|
setWritabilityMode(WritabilityMode.DISABLED);
|
setPrivateBackend(true);
|
}
|
|
private ChangelogDB getChangelogDB()
|
{
|
return replicationServer.getChangelogDB();
|
}
|
|
/**
|
* Returns the ChangelogBackend configured for "cn=changelog" in this directory server.
|
*
|
* @return the ChangelogBackend configured for "cn=changelog" in this directory server
|
* @deprecated instead inject the required object where needed
|
*/
|
@Deprecated
|
public static ChangelogBackend getInstance()
|
{
|
return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN);
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void configureBackend(final Configuration config) throws ConfigException
|
{
|
throw new UnsupportedOperationException("The changelog backend is not configurable");
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void initializeBackend() throws InitializationException
|
{
|
baseDNs = new DN[] { CHANGELOG_BASE_DN };
|
|
try
|
{
|
DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true);
|
}
|
catch (final DirectoryException e)
|
{
|
throw new InitializationException(
|
ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(DN_EXTERNAL_CHANGELOG_ROOT, getExceptionMessage(e)), e);
|
}
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void finalizeBackend()
|
{
|
super.finalizeBackend();
|
|
try
|
{
|
DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN);
|
}
|
catch (final DirectoryException e)
|
{
|
if (debugEnabled())
|
{
|
TRACER.debugCaught(DebugLogLevel.ERROR, e);
|
}
|
}
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public DN[] getBaseDNs()
|
{
|
return baseDNs;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void preloadEntryCache() throws UnsupportedOperationException
|
{
|
throw new UnsupportedOperationException("Operation not supported.");
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public boolean isLocal()
|
{
|
return true;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public boolean isIndexed(final AttributeType attributeType, final IndexType indexType)
|
{
|
return true;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public Entry getEntry(final DN entryDN) throws DirectoryException
|
{
|
if (entryDN == null)
|
{
|
throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
|
ERR_BACKEND_GET_ENTRY_NULL.get(getBackendID()));
|
}
|
throw new RuntimeException("Not implemented");
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public ConditionResult hasSubordinates(final DN entryDN) throws DirectoryException
|
{
|
if (CHANGELOG_BASE_DN.equals(entryDN))
|
{
|
final Boolean hasSubs = baseChangelogHasSubordinates();
|
if (hasSubs == null)
|
{
|
return ConditionResult.UNDEFINED;
|
}
|
return hasSubs ? ConditionResult.TRUE : ConditionResult.FALSE;
|
}
|
return ConditionResult.FALSE;
|
}
|
|
private Boolean baseChangelogHasSubordinates() throws DirectoryException
|
{
|
if (baseEntryHasSubordinates == null)
|
{
|
// compute its value
|
try
|
{
|
final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
|
final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
|
new MultiDomainServerState(), GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, getExcludedBaseDNs());
|
try
|
{
|
baseEntryHasSubordinates = cursor.next();
|
}
|
finally
|
{
|
close(cursor);
|
}
|
}
|
catch (ChangelogException e)
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_ATTRIBUTE.get(
|
"hasSubordinates", DN_EXTERNAL_CHANGELOG_ROOT, stackTraceToSingleLineString(e)));
|
}
|
}
|
return baseEntryHasSubordinates;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public long numSubordinates(final DN entryDN, final boolean subtree) throws DirectoryException
|
{
|
return -1;
|
}
|
|
/**
|
* Notifies persistent searches of this backend that a new cookie entry was added to it.
|
* <p>
|
* Note: This method correspond to the "persistent search" phase.
|
* It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
|
* <p>
|
* This method must only be called after the provided data have been persisted to disk.
|
*
|
* @param baseDN
|
* the baseDN of the newly added entry.
|
* @param updateMsg
|
* the update message of the newly added entry
|
* @throws ChangelogException
|
* If a problem occurs while notifying of the newly added entry.
|
*/
|
public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException
|
{
|
if (!(updateMsg instanceof LDAPUpdateMsg))
|
{
|
return;
|
}
|
|
try
|
{
|
for (PersistentSearch pSearch : cookieBasedPersistentSearches)
|
{
|
final SearchOperation searchOp = pSearch.getSearchOperation();
|
final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
|
entrySender.persistentSearchSendEntry(baseDN, updateMsg);
|
}
|
}
|
catch (DirectoryException e)
|
{
|
throw new ChangelogException(e.getMessageObject(), e);
|
}
|
}
|
|
/**
|
* Notifies persistent searches of this backend that a new change number entry was added to it.
|
* <p>
|
* Note: This method correspond to the "persistent search" phase.
|
* It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
|
* <p>
|
* This method must only be called after the provided data have been persisted to disk.
|
*
|
* @param baseDN
|
* the baseDN of the newly added entry.
|
* @param changeNumber
|
* the change number of the newly added entry. It will be greater
|
* than zero for entries added to the change number index and less
|
* than or equal to zero for entries added to any replica DB
|
* @param cookieString
|
* a string representing the cookie of the newly added entry.
|
* This is only meaningful for entries added to the change number index
|
* @param updateMsg
|
* the update message of the newly added entry
|
* @throws ChangelogException
|
* If a problem occurs while notifying of the newly added entry.
|
*/
|
public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
|
throws ChangelogException
|
{
|
if (!(updateMsg instanceof LDAPUpdateMsg))
|
{
|
return;
|
}
|
|
try
|
{
|
// changeNumber entry can be shared with multiple persistent searches
|
final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
|
for (PersistentSearch pSearch : changeNumberBasedPersistentSearches)
|
{
|
final SearchOperation searchOp = pSearch.getSearchOperation();
|
final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
|
entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry);
|
}
|
}
|
catch (DirectoryException e)
|
{
|
throw new ChangelogException(e.getMessageObject(), e);
|
}
|
}
|
|
private boolean isCookieBased(final SearchOperation searchOp)
|
{
|
for (Control c : searchOp.getRequestControls())
|
{
|
if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID()))
|
{
|
return true;
|
}
|
}
|
return false;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void addEntry(Entry entry, AddOperation addOperation)
|
throws DirectoryException, CanceledOperationException
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
|
ERR_BACKEND_ADD_NOT_SUPPORTED.get(String.valueOf(entry.getDN()), getBackendID()));
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void deleteEntry(DN entryDN, DeleteOperation deleteOperation)
|
throws DirectoryException, CanceledOperationException
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
|
ERR_BACKEND_DELETE_NOT_SUPPORTED.get(String.valueOf(entryDN), getBackendID()));
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void replaceEntry(Entry oldEntry, Entry newEntry,
|
ModifyOperation modifyOperation) throws DirectoryException,
|
CanceledOperationException
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
|
ERR_BACKEND_MODIFY_NOT_SUPPORTED.get(String.valueOf(newEntry.getDN()), getBackendID()));
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void renameEntry(DN currentDN, Entry entry,
|
ModifyDNOperation modifyDNOperation) throws DirectoryException,
|
CanceledOperationException
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
|
ERR_BACKEND_MODIFY_DN_NOT_SUPPORTED.get(String.valueOf(currentDN), getBackendID()));
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void search(final SearchOperation searchOperation) throws DirectoryException
|
{
|
checkChangelogReadPrivilege(searchOperation);
|
|
final SearchParams params = buildSearchParameters(searchOperation);
|
|
optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter());
|
try
|
{
|
initialSearch(params, searchOperation);
|
}
|
catch (ChangelogException e)
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get(
|
searchOperation.getBaseDN().toString(),
|
searchOperation.getFilter().toString(),
|
stackTraceToSingleLineString(e)));
|
}
|
}
|
|
private SearchParams buildSearchParameters(final SearchOperation searchOperation) throws DirectoryException
|
{
|
final SearchParams params = new SearchParams(getExcludedBaseDNs());
|
final ExternalChangelogRequestControl eclRequestControl =
|
searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
|
if (eclRequestControl != null)
|
{
|
params.cookie = eclRequestControl.getCookie();
|
}
|
return params;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public Set<String> getSupportedControls()
|
{
|
return supportedControls;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public Set<String> getSupportedFeatures()
|
{
|
return Collections.emptySet();
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public boolean supportsLDIFExport()
|
{
|
return false;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void exportLDIF(final LDIFExportConfig exportConfig)
|
throws DirectoryException
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
|
ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID()));
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public boolean supportsLDIFImport()
|
{
|
return false;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public LDIFImportResult importLDIF(LDIFImportConfig importConfig)
|
throws DirectoryException
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
|
ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID()));
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public boolean supportsBackup()
|
{
|
return false;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public boolean supportsBackup(BackupConfig backupConfig, StringBuilder unsupportedReason)
|
{
|
return false;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void createBackup(BackupConfig backupConfig) throws DirectoryException
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
|
ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
|
ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public boolean supportsRestore()
|
{
|
return false;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException
|
{
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
|
ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public long getEntryCount()
|
{
|
try
|
{
|
return numSubordinates(CHANGELOG_BASE_DN, true) + 1;
|
}
|
catch (DirectoryException e)
|
{
|
if (debugEnabled())
|
{
|
TRACER.debugCaught(DebugLogLevel.ERROR, e);
|
}
|
return -1;
|
}
|
}
|
|
/**
|
* Represent the search parameters specific to the changelog.
|
*
|
* This class should be visible for tests.
|
*/
|
static class SearchParams
|
{
|
private final Set<DN> excludedBaseDNs;
|
private long lowestChangeNumber = -1;
|
private long highestChangeNumber = -1;
|
private CSN csn = new CSN(0, 0, 0);
|
private MultiDomainServerState cookie;
|
|
/**
|
* Creates search parameters.
|
*/
|
SearchParams()
|
{
|
this(Collections.<DN> emptySet());
|
}
|
|
/**
|
* Creates search parameters with provided id and excluded domain DNs.
|
*
|
* @param excludedBaseDNs
|
* Set of DNs to exclude from search.
|
*/
|
SearchParams(final Set<DN> excludedBaseDNs)
|
{
|
this.excludedBaseDNs = excludedBaseDNs;
|
}
|
|
/**
|
* Returns whether this search is cookie based.
|
*
|
* @return true if this search is cookie-based, false if this search is
|
* change number-based.
|
*/
|
private boolean isCookieBasedSearch()
|
{
|
return cookie != null;
|
}
|
|
/**
|
* Indicates if provided change number is compatible with last change
|
* number.
|
*
|
* @param changeNumber
|
* The change number to test.
|
* @return {@code true} if and only if the provided change number is in the
|
* range of the last change number.
|
*/
|
boolean changeNumberIsInRange(long changeNumber)
|
{
|
return highestChangeNumber == -1 || changeNumber <= highestChangeNumber;
|
}
|
|
/**
|
* Returns the lowest change number to retrieve (inclusive).
|
*
|
* @return the lowest change number
|
*/
|
long getLowestChangeNumber()
|
{
|
return lowestChangeNumber;
|
}
|
|
/**
|
* Returns the highest change number to retrieve (inclusive).
|
*
|
* @return the highest change number
|
*/
|
long getHighestChangeNumber()
|
{
|
return highestChangeNumber;
|
}
|
|
/**
|
* Returns the CSN to retrieve.
|
*
|
* @return the CSN, which may be the default CSN with zero values.
|
*/
|
CSN getCSN()
|
{
|
return csn;
|
}
|
|
/**
|
* Returns the set of DNs to exclude from the search.
|
*
|
* @return the DNs corresponding to domains to exclude from the search.
|
*/
|
Set<DN> getExcludedBaseDNs()
|
{
|
return excludedBaseDNs;
|
}
|
}
|
|
/**
|
* Returns the set of DNs to exclude from the search.
|
*
|
* @return the DNs corresponding to domains to exclude from the search.
|
* @throws DirectoryException
|
* If a DN can't be decoded.
|
*/
|
private static Set<DN> getExcludedBaseDNs() throws DirectoryException
|
{
|
final Set<DN> excludedDNs = new HashSet<DN>();
|
for (String dn : getExcludedChangelogDomains())
|
{
|
excludedDNs.add(DN.decode(dn));
|
}
|
return excludedDNs;
|
}
|
|
/**
|
* Optimize the search parameters by analyzing the DN and filter.
|
* Populate the provided SearchParams with optimizations found.
|
*
|
* @param params the search parameters that are specific to external changelog
|
* @param baseDN the provided search baseDN.
|
* @param userFilter the provided search filter.
|
* @throws DirectoryException when an exception occurs.
|
*/
|
void optimizeSearchParameters(final SearchParams params, final DN baseDN, final SearchFilter userFilter)
|
throws DirectoryException
|
{
|
SearchFilter equalityFilter = null;
|
switch (baseDN.getNumComponents())
|
{
|
case 1:
|
// "cn=changelog" : use user-provided search filter.
|
break;
|
case 2:
|
// It is probably "changeNumber=xxx,cn=changelog", use equality filter
|
// But it also could be "<service-id>,cn=changelog" so need to check on attribute
|
equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR_LC, CHANGE_NUMBER_ATTR);
|
break;
|
default:
|
// "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter
|
equalityFilter = buildSearchFilterFrom(baseDN, "replicationcsn", "replicationCSN");
|
break;
|
}
|
|
final SearchParams optimized = optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter);
|
params.lowestChangeNumber = optimized.lowestChangeNumber;
|
params.highestChangeNumber = optimized.highestChangeNumber;
|
params.csn = optimized.csn;
|
}
|
|
/**
|
* Build a search filter from given DN and attribute.
|
*
|
* @return the search filter or {@code null} if attribute is not present in
|
* the provided DN
|
*/
|
private SearchFilter buildSearchFilterFrom(final DN baseDN, final String lowerCaseAttr, final String upperCaseAttr)
|
{
|
final RDN rdn = baseDN.getRDN();
|
AttributeType attrType = DirectoryServer.getAttributeType(lowerCaseAttr);
|
if (attrType == null)
|
{
|
attrType = DirectoryServer.getDefaultAttributeType(upperCaseAttr);
|
}
|
final AttributeValue attrValue = rdn.getAttributeValue(attrType);
|
if (attrValue != null)
|
{
|
return SearchFilter.createEqualityFilter(attrType, attrValue);
|
}
|
return null;
|
}
|
|
private SearchParams optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException
|
{
|
final SearchParams params = new SearchParams();
|
if (filter == null)
|
{
|
return params;
|
}
|
|
if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR))
|
{
|
params.lowestChangeNumber = decodeChangeNumber(filter.getAssertionValue());
|
}
|
else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR))
|
{
|
params.highestChangeNumber = decodeChangeNumber(filter.getAssertionValue());
|
}
|
else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR))
|
{
|
final long number = decodeChangeNumber(filter.getAssertionValue());
|
params.lowestChangeNumber = number;
|
params.highestChangeNumber = number;
|
}
|
else if (matches(filter, FilterType.EQUALITY, "replicationcsn"))
|
{
|
// == exact CSN
|
params.csn = new CSN(filter.getAssertionValue().toString());
|
}
|
else if (filter.getFilterType() == FilterType.AND)
|
{
|
// TODO: it looks like it could be generalized to N components, not only two
|
final Collection<SearchFilter> components = filter.getFilterComponents();
|
final SearchFilter filters[] = components.toArray(new SearchFilter[0]);
|
long last1 = -1;
|
long first1 = -1;
|
long last2 = -1;
|
long first2 = -1;
|
if (filters.length > 0)
|
{
|
SearchParams msg1 = optimizeSearchUsingFilter(filters[0]);
|
last1 = msg1.highestChangeNumber;
|
first1 = msg1.lowestChangeNumber;
|
}
|
if (filters.length > 1)
|
{
|
SearchParams msg2 = optimizeSearchUsingFilter(filters[1]);
|
last2 = msg2.highestChangeNumber;
|
first2 = msg2.lowestChangeNumber;
|
}
|
if (last1 == -1)
|
{
|
params.highestChangeNumber = last2;
|
}
|
else if (last2 == -1)
|
{
|
params.highestChangeNumber = last1;
|
}
|
else
|
{
|
params.highestChangeNumber = Math.min(last1, last2);
|
}
|
|
params.lowestChangeNumber = Math.max(first1, first2);
|
}
|
return params;
|
}
|
|
private static long decodeChangeNumber(final AttributeValue assertionValue)
|
throws DirectoryException
|
{
|
try
|
{
|
return Long.decode(assertionValue.getNormalizedValue().toString());
|
}
|
catch (NumberFormatException e)
|
{
|
throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX,
|
Message.raw("Could not convert value '%s' to long", assertionValue.getNormalizedValue().toString()));
|
}
|
}
|
|
private boolean matches(SearchFilter filter, FilterType filterType, String primaryName)
|
{
|
return filter.getFilterType() == filterType
|
&& filter.getAttributeType() != null
|
&& filter.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName);
|
}
|
|
/**
|
* Runs the "initial search" phase (as opposed to a "persistent search" phase).
|
* The "initial search" phase is the only search run by normal searches,
|
* but it is also run by persistent searches with <code>changesOnly=false</code>.
|
* Persistent searches with <code>changesOnly=true</code> never execute this code.
|
* <p>
|
* Note: this method is executed only once per persistent search, single threaded.
|
*/
|
private void initialSearch(final SearchParams searchParams, final SearchOperation searchOperation)
|
throws DirectoryException, ChangelogException
|
{
|
if (searchParams.isCookieBasedSearch())
|
{
|
initialSearchFromCookie(searchParams, searchOperation);
|
}
|
else
|
{
|
initialSearchFromChangeNumber(searchParams, searchOperation);
|
}
|
}
|
|
/**
|
* Search the changelog when a cookie control is provided.
|
*/
|
private void initialSearchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation)
|
throws DirectoryException, ChangelogException
|
{
|
validateProvidedCookie(searchParams);
|
|
final MultiDomainServerState cookie = searchParams.cookie;
|
final CookieEntrySender entrySender;
|
if (isPersistentSearch(searchOperation))
|
{
|
// communicate the cookie between the "initial search" phase and the "persistent search" phase
|
searchOperation.setAttachment(COOKIE_ATTACHMENT, cookie);
|
entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
|
}
|
else
|
{
|
entrySender = new CookieEntrySender(searchOperation);
|
}
|
|
if (!sendBaseChangelogEntry(searchOperation))
|
{ // only return the base entry: stop here
|
return;
|
}
|
|
ECLMultiDomainDBCursor replicaUpdatesCursor = null;
|
try
|
{
|
final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
|
final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
|
cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
|
replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
|
|
boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
|
if (continueSearch)
|
{
|
entrySender.transitioningToPersistentSearchPhase();
|
sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
|
}
|
}
|
finally
|
{
|
entrySender.finalizeInitialSearch();
|
StaticUtils.close(replicaUpdatesCursor);
|
}
|
}
|
|
private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
|
ECLMultiDomainDBCursor replicaUpdatesCursor, final MultiDomainServerState cookie) throws ChangelogException,
|
DirectoryException
|
{
|
boolean continueSearch = true;
|
while (continueSearch && replicaUpdatesCursor.next())
|
{
|
final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
|
final DN domainBaseDN = replicaUpdatesCursor.getData();
|
continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN, cookie);
|
}
|
return continueSearch;
|
}
|
|
private boolean isPersistentSearch(SearchOperation op)
|
{
|
for (PersistentSearch pSearch : getPersistentSearches())
|
{
|
if (op == pSearch.getSearchOperation())
|
{
|
return true;
|
}
|
}
|
return false;
|
}
|
|
/** {@inheritDoc} */
|
@Override
|
public void registerPersistentSearch(PersistentSearch pSearch)
|
{
|
initializeAttachements(pSearch);
|
|
if (isCookieBased(pSearch.getSearchOperation()))
|
{
|
cookieBasedPersistentSearches.add(pSearch);
|
}
|
else
|
{
|
changeNumberBasedPersistentSearches.add(pSearch);
|
}
|
super.registerPersistentSearch(pSearch);
|
}
|
|
private void initializeAttachements(PersistentSearch pSearch)
|
{
|
final SearchOperation searchOp = pSearch.getSearchOperation();
|
if (isCookieBased(searchOp))
|
{
|
if (pSearch.isChangesOnly())
|
{
|
// this changesOnly persistent search will not go through #initialSearch()
|
// so we must initialize the cookie here
|
searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp));
|
}
|
searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp));
|
}
|
else
|
{
|
searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp));
|
}
|
}
|
|
private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
|
{
|
if (!isCookieBased(searchOp))
|
{
|
return null;
|
}
|
|
final MultiDomainServerState cookie = new MultiDomainServerState();
|
for (final Iterator<ReplicationServerDomain> it =
|
replicationServer.getDomainIterator(); it.hasNext();)
|
{
|
final DN baseDN = it.next().getBaseDN();
|
final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN);
|
cookie.update(baseDN, state);
|
}
|
return cookie;
|
}
|
|
/**
|
* Validates the cookie contained in search parameters by checking its content
|
* with the actual replication server state.
|
*
|
* @throws DirectoryException
|
* If the state is not valid
|
*/
|
private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException
|
{
|
final MultiDomainServerState cookie = searchParams.cookie;
|
if (cookie != null && !cookie.isEmpty())
|
{
|
replicationServer.validateCookie(cookie, searchParams.getExcludedBaseDNs());
|
}
|
}
|
|
/**
|
* Search the changelog using change number(s).
|
*/
|
private void initialSearchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation)
|
throws ChangelogException, DirectoryException
|
{
|
// "initial search" phase must return the base entry immediately
|
sendBaseChangelogEntry(searchOperation);
|
|
final ChangeNumberEntrySender entrySender;
|
if (isPersistentSearch(searchOperation))
|
{
|
entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
|
}
|
else
|
{
|
entrySender = new ChangeNumberEntrySender(searchOperation);
|
}
|
|
DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
|
final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<MultiDomainDBCursor>();
|
try
|
{
|
cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber);
|
MultiDomainServerState cookie = new MultiDomainServerState();
|
final boolean continueSearch =
|
sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor, cookie);
|
if (continueSearch)
|
{
|
entrySender.transitioningToPersistentSearchPhase();
|
sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor, cookie);
|
}
|
}
|
finally
|
{
|
entrySender.finalizeInitialSearch();
|
StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor.get());
|
}
|
}
|
|
private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender,
|
final SearchParams params, DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor,
|
AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor, MultiDomainServerState cookie)
|
throws ChangelogException, DirectoryException
|
{
|
boolean continueSearch = true;
|
while (continueSearch && cnIndexDBCursor.next())
|
{
|
// Handle the current cnIndex record
|
final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
|
if (replicaUpdatesCursor.get() == null)
|
{
|
replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord));
|
initializeCookieForChangeNumberMode(cookie, cnIndexRecord);
|
}
|
else
|
{
|
cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
|
}
|
continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
|
if (continueSearch)
|
{
|
final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get());
|
if (updateMsg != null)
|
{
|
continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie);
|
replicaUpdatesCursor.get().next();
|
}
|
}
|
}
|
return continueSearch;
|
}
|
|
/** Initialize the provided cookie from the provided change number index record. */
|
private void initializeCookieForChangeNumberMode(
|
MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
|
{
|
ECLMultiDomainDBCursor eclCursor = null;
|
try
|
{
|
cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
|
MultiDomainDBCursor cursor =
|
getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie,
|
LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
|
eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
|
eclCursor.next();
|
cookie.update(eclCursor.toCookie());
|
}
|
finally
|
{
|
close(eclCursor);
|
}
|
}
|
|
private MultiDomainDBCursor initializeReplicaUpdatesCursor(
|
final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
|
{
|
final MultiDomainServerState state = new MultiDomainServerState();
|
state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
|
|
// No need for ECLMultiDomainDBCursor in this case
|
// as updateMsg will be matched with cnIndexRecord
|
final MultiDomainDBCursor replicaUpdatesCursor =
|
getChangelogDB().getReplicationDomainDB().getCursorFrom(state, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
|
replicaUpdatesCursor.next();
|
return replicaUpdatesCursor;
|
}
|
|
/**
|
* Returns the replica update message corresponding to the provided
|
* cnIndexRecord.
|
*
|
* @return the update message, which may be {@code null} if the update message
|
* could not be found because it was purged or because corresponding
|
* baseDN was removed from the changelog
|
* @throws DirectoryException
|
* If inconsistency is detected between the available update
|
* messages and the provided cnIndexRecord
|
*/
|
private UpdateMsg findReplicaUpdateMessage(
|
final ChangeNumberIndexRecord cnIndexRecord,
|
final MultiDomainDBCursor replicaUpdatesCursor)
|
throws DirectoryException, ChangelogException
|
{
|
while (true)
|
{
|
final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
|
final int compareIndexWithUpdateMsg = cnIndexRecord.getCSN().compareTo(updateMsg.getCSN());
|
if (compareIndexWithUpdateMsg < 0) {
|
// Either update message has been purged or baseDN has been removed from changelogDB,
|
// ignore current index record and go to the next one
|
return null;
|
}
|
else if (compareIndexWithUpdateMsg == 0)
|
{
|
// Found the matching update message
|
return updateMsg;
|
}
|
// Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet
|
if (!replicaUpdatesCursor.next())
|
{
|
// Should never happen, as it means some messages have disappeared
|
// TODO : put the correct I18N message
|
throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
|
Message.raw("Could not find replica update message matching index record. " +
|
"No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist."));
|
}
|
}
|
}
|
|
/** Returns a cursor on CNIndexDB for the provided first change number. */
|
private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(
|
final long firstChangeNumber) throws ChangelogException
|
{
|
final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB();
|
long changeNumberToUse = firstChangeNumber;
|
if (changeNumberToUse <= 1)
|
{
|
final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
|
changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber();
|
}
|
return cnIndexDB.getCursorFrom(changeNumberToUse);
|
}
|
|
/**
|
* Creates a changelog entry.
|
*/
|
private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie,
|
final UpdateMsg msg) throws DirectoryException
|
{
|
if (msg instanceof AddMsg)
|
{
|
return createAddMsg(baseDN, changeNumber, cookie, msg);
|
}
|
else if (msg instanceof ModifyCommonMsg)
|
{
|
return createModifyMsg(baseDN, changeNumber, cookie, msg);
|
}
|
else if (msg instanceof DeleteMsg)
|
{
|
final DeleteMsg delMsg = (DeleteMsg) msg;
|
return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName());
|
}
|
throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
|
Message.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN.toString(),
|
msg.getClass().toString()));
|
}
|
|
/**
|
* Creates an entry from an add message.
|
* <p>
|
* Map addMsg to an LDIF string for the 'changes' attribute, and pull out
|
* change initiators name if available which is contained in the creatorsName
|
* attribute.
|
*/
|
private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
|
throws DirectoryException
|
{
|
final AddMsg addMsg = (AddMsg) msg;
|
String changeInitiatorsName = null;
|
String ldifChanges = null;
|
try
|
{
|
final StringBuilder builder = new StringBuilder(256);
|
for (Attribute attr : addMsg.getAttributes())
|
{
|
if (attr.getAttributeType().equals(CREATORS_NAME_TYPE) && !attr.isEmpty())
|
{
|
// This attribute is not multi-valued.
|
changeInitiatorsName = attr.iterator().next().toString();
|
}
|
final String attrName = attr.getNameWithOptions();
|
for (AttributeValue value : attr)
|
{
|
builder.append(attrName);
|
appendLDIFSeparatorAndValue(builder, value.getValue());
|
builder.append('\n');
|
}
|
}
|
ldifChanges = builder.toString();
|
}
|
catch (Exception e)
|
{
|
logEncodingMessageError("add", addMsg.getDN(), e);
|
}
|
|
return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName);
|
}
|
|
/**
|
* Creates an entry from a modify message.
|
* <p>
|
* Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull
|
* out change initiators name if available which is contained in the
|
* modifiersName attribute.
|
*/
|
private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie,
|
final UpdateMsg msg) throws DirectoryException
|
{
|
final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
|
String changeInitiatorsName = null;
|
String ldifChanges = null;
|
try
|
{
|
final StringBuilder builder = new StringBuilder(128);
|
for (Modification mod : modifyMsg.getMods())
|
{
|
final Attribute attr = mod.getAttribute();
|
if (mod.getModificationType() == ModificationType.REPLACE
|
&& attr.getAttributeType().equals(MODIFIERS_NAME_TYPE)
|
&& !attr.isEmpty())
|
{
|
// This attribute is not multi-valued.
|
changeInitiatorsName = attr.iterator().next().toString();
|
}
|
final String attrName = attr.getNameWithOptions();
|
builder.append(mod.getModificationType().getLDIFName());
|
builder.append(": ");
|
builder.append(attrName);
|
builder.append('\n');
|
|
for (AttributeValue value : attr)
|
{
|
builder.append(attrName);
|
appendLDIFSeparatorAndValue(builder, value.getValue());
|
builder.append('\n');
|
}
|
builder.append("-\n");
|
}
|
ldifChanges = builder.toString();
|
}
|
catch (Exception e)
|
{
|
logEncodingMessageError("modify", modifyMsg.getDN(), e);
|
}
|
|
final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg;
|
final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges,
|
isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName);
|
|
if (isModifyDNMsg)
|
{
|
final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg;
|
addAttribute(entry, "newrdn", modDNMsg.getNewRDN());
|
if (modDNMsg.getNewSuperior() != null)
|
{
|
addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior());
|
}
|
addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn()));
|
}
|
return entry;
|
}
|
|
/**
|
* Log an encoding message error.
|
*
|
* @param messageType
|
* String identifying type of message. Should be "add" or "modify".
|
* @param entryDN
|
* DN of original entry
|
*/
|
private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception)
|
{
|
TRACER.debugCaught(DebugLogLevel.ERROR, exception);
|
logError(Message.raw(Category.SYNC, Severity.MILD_ERROR,
|
"An exception was encountered while trying to encode a replication " + messageType + " message for entry \""
|
+ entryDN + "\" into an External Change Log entry: " + exception.getMessage()));
|
}
|
|
private void checkChangelogReadPrivilege(SearchOperation searchOp) throws DirectoryException
|
{
|
if (!searchOp.getClientConnection().hasPrivilege(Privilege.CHANGELOG_READ, searchOp))
|
{
|
throw new DirectoryException(ResultCode.INSUFFICIENT_ACCESS_RIGHTS,
|
NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get());
|
}
|
}
|
|
/**
|
* Create a changelog entry from a set of provided information. This is the part of
|
* entry creation common to all types of msgs (ADD, DEL, MOD, MODDN).
|
*/
|
private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie,
|
final LDAPUpdateMsg msg, final String ldifChanges, final String changeType,
|
final String changeInitiatorsName) throws DirectoryException
|
{
|
final CSN csn = msg.getCSN();
|
String dnString;
|
if (changeNumber > 0)
|
{
|
// change number mode
|
dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
|
}
|
else
|
{
|
// Cookie mode
|
dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT;
|
}
|
|
final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
|
final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
|
|
// Operational standard attributes
|
addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY_LC,
|
ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs);
|
addAttributeByType("numsubordinates", "numSubordinates", "0", userAttrs, opAttrs);
|
addAttributeByType("hassubordinates", "hasSubordinates", "false", userAttrs, opAttrs);
|
addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs);
|
|
// REQUIRED attributes
|
if (changeNumber > 0)
|
{
|
addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs);
|
}
|
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME);
|
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ??
|
final String format = dateFormat.format(new Date(csn.getTime()));
|
addAttributeByType("changetime", "changeTime", format, userAttrs, opAttrs);
|
addAttributeByType("changetype", "changeType", changeType, userAttrs, opAttrs);
|
addAttributeByType("targetdn", "targetDN", msg.getDN().toString(), userAttrs, opAttrs);
|
|
// NON REQUESTED attributes
|
addAttributeByType("replicationcsn", "replicationCSN", csn.toString(), userAttrs, opAttrs);
|
addAttributeByType("replicaidentifier", "replicaIdentifier", Integer.toString(csn.getServerId()),
|
userAttrs, opAttrs);
|
|
if (ldifChanges != null)
|
{
|
addAttributeByType("changes", "changes", ldifChanges, userAttrs, opAttrs);
|
}
|
if (changeInitiatorsName != null)
|
{
|
addAttributeByType("changeinitiatorsname", "changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs);
|
}
|
|
final String targetUUID = msg.getEntryUUID();
|
if (targetUUID != null)
|
{
|
addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs);
|
}
|
final String cookie2 = cookie != null ? cookie : "";
|
addAttributeByType("changelogcookie", "changeLogCookie", cookie2, userAttrs, opAttrs);
|
|
final List<RawAttribute> includedAttributes = msg.getEclIncludes();
|
if (includedAttributes != null && !includedAttributes.isEmpty())
|
{
|
final StringBuilder builder = new StringBuilder(256);
|
for (final RawAttribute includedAttribute : includedAttributes)
|
{
|
final String name = includedAttribute.getAttributeType();
|
for (final ByteString value : includedAttribute.getValues())
|
{
|
builder.append(name);
|
appendLDIFSeparatorAndValue(builder, value);
|
builder.append('\n');
|
}
|
}
|
final String includedAttributesLDIF = builder.toString();
|
addAttributeByType("includedattributes", "includedAttributes", includedAttributesLDIF, userAttrs, opAttrs);
|
}
|
|
return new Entry(DN.decode(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs);
|
}
|
|
/**
|
* Sends the entry if it matches the base, scope and filter of the current search operation.
|
* It will also send the base changelog entry if it needs to be sent and was not sent before.
|
*
|
* @return {@code true} if search should continue, {@code false} otherwise
|
*/
|
private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie)
|
throws DirectoryException
|
{
|
if (matchBaseAndScopeAndFilter(searchOp, entry))
|
{
|
return searchOp.returnEntry(entry, getControls(cookie));
|
}
|
// maybe the next entry will match?
|
return true;
|
}
|
|
/** Indicates if the provided entry matches the filter, base and scope. */
|
private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
|
{
|
return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
|
&& searchOp.getFilter().matchesEntry(entry);
|
}
|
|
private static List<Control> getControls(String cookie)
|
{
|
if (cookie != null)
|
{
|
final Control c = new EntryChangelogNotificationControl(true, cookie);
|
return Collections.singletonList(c);
|
}
|
return Collections.emptyList();
|
}
|
|
/**
|
* Create and returns the base changelog entry to the underlying search operation.
|
*
|
* @return {@code true} if search should continue, {@code false} otherwise
|
*/
|
private boolean sendBaseChangelogEntry(SearchOperation searchOp) throws DirectoryException
|
{
|
final DN baseDN = searchOp.getBaseDN();
|
final SearchFilter filter = searchOp.getFilter();
|
final SearchScope scope = searchOp.getScope();
|
|
if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope))
|
{
|
final Entry entry = buildBaseChangelogEntry();
|
if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null))
|
{
|
// Abandon, size limit reached.
|
return false;
|
}
|
}
|
return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN)
|
|| !scope.equals(SearchScope.BASE_OBJECT);
|
}
|
|
private Entry buildBaseChangelogEntry() throws DirectoryException
|
{
|
final String hasSubordinatesStr = Boolean.toString(baseChangelogHasSubordinates());
|
|
final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
|
final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
|
|
// We never return the numSubordinates attribute for the base changelog entry
|
// and there is a very good reason for that:
|
// - Either we compute it before sending the entries,
|
// -- then we risk returning more entries if new entries come in after we computed numSubordinates
|
// -- or we risk returning less entries if purge kicks in after we computed numSubordinates
|
// - Or we accumulate all the entries that must be returned before sending them => OutOfMemoryError
|
|
addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
|
addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
|
ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
|
addAttributeByUppercaseName("hassubordinates", "hasSubordinates", hasSubordinatesStr, userAttrs, operationalAttrs);
|
addAttributeByUppercaseName("entrydn", "entryDN", DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
|
return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
|
}
|
|
private static void addAttribute(final Entry e, final String attrType, final String attrValue)
|
{
|
e.addAttribute(Attributes.create(attrType, attrValue), null);
|
}
|
|
private static void addAttributeByType(String attrNameLowercase,
|
String attrNameUppercase, String attrValue,
|
Map<AttributeType, List<Attribute>> userAttrs,
|
Map<AttributeType, List<Attribute>> operationalAttrs)
|
{
|
addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true);
|
}
|
|
private static void addAttributeByUppercaseName(String attrNameLowercase,
|
String attrNameUppercase, String attrValue,
|
Map<AttributeType, List<Attribute>> userAttrs,
|
Map<AttributeType, List<Attribute>> operationalAttrs)
|
{
|
addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, false);
|
}
|
|
private static void addAttribute(final String attrNameLowercase,
|
final String attrNameUppercase, final String attrValue,
|
final Map<AttributeType, List<Attribute>> userAttrs,
|
final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType)
|
{
|
AttributeType attrType = DirectoryServer.getAttributeType(attrNameLowercase);
|
if (attrType == null)
|
{
|
attrType = DirectoryServer.getDefaultAttributeType(attrNameUppercase);
|
}
|
final Attribute a = addByType
|
? Attributes.create(attrType, attrValue)
|
: Attributes.create(attrNameUppercase, attrValue);
|
final List<Attribute> attrList = Collections.singletonList(a);
|
if (attrType.isOperational())
|
{
|
operationalAttrs.put(attrType, attrList);
|
}
|
else
|
{
|
userAttrs.put(attrType, attrList);
|
}
|
}
|
|
/**
|
* Describes the current search phase.
|
*/
|
private enum SearchPhase
|
{
|
/**
|
* "Initial search" phase. The "initial search" phase is running
|
* concurrently. All update notifications are ignored.
|
*/
|
INITIAL,
|
/**
|
* Transitioning from the "initial search" phase to the "persistent search"
|
* phase. "Initial search" phase has finished reading from the DB. It now
|
* verifies if any more updates have been persisted to the DB since stopping
|
* and send them. All update notifications are blocked.
|
*/
|
TRANSITIONING,
|
/**
|
* "Persistent search" phase. "Initial search" phase has completed. All
|
* update notifications are published.
|
*/
|
PERSISTENT;
|
}
|
|
/**
|
* Contains data to ensure that the same change is not sent twice to clients
|
* because of race conditions between the "initial search" phase and the
|
* "persistent search" phase.
|
*/
|
private static class SendEntryData<K extends Comparable<K>>
|
{
|
private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<SearchPhase>(SearchPhase.INITIAL);
|
private final Object transitioningLock = new Object();
|
private volatile K lastKeySentByInitialSearch;
|
|
private void finalizeInitialSearch()
|
{
|
searchPhase.set(SearchPhase.PERSISTENT);
|
synchronized (transitioningLock)
|
{ // initial search phase has completed, release all persistent searches
|
transitioningLock.notifyAll();
|
}
|
}
|
|
public void transitioningToPersistentSearchPhase()
|
{
|
searchPhase.set(SearchPhase.TRANSITIONING);
|
}
|
|
private void initialSearchSendsEntry(final K key)
|
{
|
lastKeySentByInitialSearch = key;
|
}
|
|
private boolean persistentSearchCanSendEntry(K key)
|
{
|
final SearchPhase stateValue = searchPhase.get();
|
switch (stateValue)
|
{
|
case INITIAL:
|
return false;
|
case TRANSITIONING:
|
synchronized (transitioningLock)
|
{
|
while (SearchPhase.TRANSITIONING.equals(searchPhase.get()))
|
{
|
// "initial search" phase is over, and is now verifying whether new
|
// changes have been published to the DB.
|
// Wait for this check to complete
|
try
|
{
|
transitioningLock.wait();
|
}
|
catch (InterruptedException e)
|
{
|
Thread.currentThread().interrupt();
|
// Shutdown must have been called. Stop sending entries.
|
return false;
|
}
|
}
|
}
|
return key.compareTo(lastKeySentByInitialSearch) > 0;
|
case PERSISTENT:
|
return true;
|
default:
|
throw new RuntimeException("Not implemented for " + stateValue);
|
}
|
}
|
}
|
|
/** Sends entries to clients for change number searches. */
|
private static class ChangeNumberEntrySender
|
{
|
private final SearchOperation searchOp;
|
private final SendEntryData<Long> sendEntryData = new SendEntryData<Long>();
|
|
private ChangeNumberEntrySender(SearchOperation searchOp)
|
{
|
this.searchOp = searchOp;
|
}
|
|
private void finalizeInitialSearch()
|
{
|
sendEntryData.finalizeInitialSearch();
|
}
|
|
public void transitioningToPersistentSearchPhase()
|
{
|
sendEntryData.transitioningToPersistentSearchPhase();
|
}
|
|
/**
|
* @return {@code true} if search should continue, {@code false} otherwise
|
*/
|
private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg,
|
MultiDomainServerState cookie) throws DirectoryException
|
{
|
final DN baseDN = cnIndexRecord.getBaseDN();
|
sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber());
|
final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
|
return sendEntryIfMatches(searchOp, entry, null);
|
}
|
|
private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException
|
{
|
if (sendEntryData.persistentSearchCanSendEntry(changeNumber))
|
{
|
sendEntryIfMatches(searchOp, entry, null);
|
}
|
}
|
}
|
|
/** Sends entries to clients for cookie-based searches. */
|
private static class CookieEntrySender {
|
private final SearchOperation searchOp;
|
private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData =
|
new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR);
|
|
private CookieEntrySender(SearchOperation searchOp)
|
{
|
this.searchOp = searchOp;
|
}
|
|
public void finalizeInitialSearch()
|
{
|
for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
|
{
|
sendEntryData.finalizeInitialSearch();
|
}
|
}
|
|
public void transitioningToPersistentSearchPhase()
|
{
|
for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
|
{
|
sendEntryData.transitioningToPersistentSearchPhase();
|
}
|
}
|
|
private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn)
|
{
|
final Pair<DN, Integer> replicaId = Pair.of(baseDN, csn.getServerId());
|
SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId);
|
if (data == null)
|
{
|
final SendEntryData<CSN> newData = new SendEntryData<CSN>();
|
data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData);
|
return data == null ? newData : data;
|
}
|
return data;
|
}
|
|
private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN,
|
final MultiDomainServerState cookie) throws DirectoryException
|
{
|
final CSN csn = updateMsg.getCSN();
|
final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
|
sendEntryData.initialSearchSendsEntry(csn);
|
final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
|
final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
|
return sendEntryIfMatches(searchOp, entry, cookieString);
|
}
|
|
private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg)
|
throws DirectoryException
|
{
|
final CSN csn = updateMsg.getCSN();
|
final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
|
if (sendEntryData.persistentSearchCanSendEntry(csn))
|
{
|
// multi threaded case: wait for the "initial search" phase to set the cookie
|
final MultiDomainServerState cookie = searchOp.getAttachment(COOKIE_ATTACHMENT);
|
|
final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
|
final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
|
// FIXME JNR use this instead of previous line:
|
// entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
|
sendEntryIfMatches(searchOp, cookieEntry, cookieString);
|
}
|
}
|
|
private String updateCookie(final MultiDomainServerState cookie, DN baseDN, final CSN csn)
|
{
|
synchronized (cookie)
|
{ // forbid concurrent updates to the cookie
|
cookie.update(baseDN, csn);
|
return cookie.toString();
|
}
|
}
|
}
|
}
|