/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2007-2009 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.util.*; import org.opends.messages.Message; import org.opends.server.admin.Configuration; import org.opends.server.admin.server.ServerManagementContext; import org.opends.server.admin.std.server.*; import org.opends.server.api.Backend; import org.opends.server.api.SynchronizationProvider; import org.opends.server.backends.jeb.BackupManager; import org.opends.server.config.ConfigException; import org.opends.server.core.*; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.plugin.ReplicationServerListener; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.types.*; import org.opends.server.util.*; import static java.util.Collections.*; import static org.opends.messages.BackendMessages.*; import static org.opends.messages.JebMessages.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.types.FilterType.*; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.*; /** * This class defines a backend that stores its information in an associated * replication server object. *

* This is primarily intended to take advantage of the backup/restore/ * import/export of the backend API, and to provide an LDAP access to the * replication server database. *

* Entries stored in this backend are held in the DB associated with the * replication server. *

* Currently are only implemented the create and restore backup features. */ public class ReplicationBackend extends Backend { private static final String CHANGE_NUMBER = "replicationChangeNumber"; /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private static final String BASE_DN = "dc=replicationchanges"; /** The base DNs for this backend. */ private DN[] baseDNs; /** The base DNs for this backend, in a hash set. */ private Set baseDNSet; /** The set of supported controls for this backend. */ private Set supportedControls; /** The set of supported features for this backend. */ private Set supportedFeatures; private ReplicationServer server; /** * The number of milliseconds between job progress reports. */ private long progressInterval = 10000; /** * The current number of entries exported. */ private long exportedCount = 0; /** * The current number of entries skipped. */ private long skippedCount = 0; /** Objectclass for getEntry root entries. */ private Map rootObjectclasses; /** Attributes used for getEntry root entries. */ private Map> attributes; /** Operational attributes used for getEntry root entries. */ private Map> operationalAttributes; /** * Creates a new backend with the provided information. All backend * implementations must implement a default constructor that use * super() to invoke this constructor. */ public ReplicationBackend() { super(); // Perform all initialization in initializeBackend. } /** * {@inheritDoc} */ @Override() public void configureBackend(Configuration config) throws ConfigException { if (config != null) { Validator.ensureTrue(config instanceof BackendCfg); BackendCfg cfg = (BackendCfg) config; DN[] newBaseDNs = new DN[cfg.getBaseDN().size()]; cfg.getBaseDN().toArray(newBaseDNs); this.baseDNs = newBaseDNs; } } /** * {@inheritDoc} */ @Override() public synchronized void initializeBackend() throws ConfigException, InitializationException { if ((baseDNs == null) || (baseDNs.length != 1)) { Message message = ERR_MEMORYBACKEND_REQUIRE_EXACTLY_ONE_BASE.get(); throw new ConfigException(message); } baseDNSet = new HashSet(Arrays.asList(baseDNs)); supportedControls = new HashSet(); supportedFeatures = new HashSet(); for (DN dn : baseDNs) { try { DirectoryServer.registerBaseDN(dn, this, true); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } Message message = ERR_BACKEND_CANNOT_REGISTER_BASEDN.get( dn.toString(), getExceptionMessage(e)); throw new InitializationException(message, e); } } rootObjectclasses = new LinkedHashMap(3); rootObjectclasses.put(DirectoryServer.getTopObjectClass(), OC_TOP); ObjectClass domainOC = DirectoryServer.getObjectClass("domain", true); rootObjectclasses.put(domainOC, "domain"); ObjectClass objectclassOC = DirectoryServer.getObjectClass(ATTR_OBJECTCLASSES_LC, true); rootObjectclasses.put(objectclassOC, ATTR_OBJECTCLASSES_LC); attributes = new LinkedHashMap>(); Attribute a = Attributes.create("changetype", "add"); List attrList = new ArrayList(1); attrList.add(a); attributes.put(a.getAttributeType(), attrList); operationalAttributes = new LinkedHashMap>(); } /** * {@inheritDoc} */ @Override() public synchronized void finalizeBackend() { for (DN dn : baseDNs) { try { DirectoryServer.deregisterBaseDN(dn); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } } } /** * {@inheritDoc} */ @Override() public DN[] getBaseDNs() { return baseDNs; } /** * {@inheritDoc} */ @Override() public synchronized long getEntryCount() { if (server==null) { try { server = getReplicationServer(); if (server == null) { return 0; } } catch(Exception e) { return 0; } } //This method only returns the number of actual change entries, the //domain and any baseDN entries are not counted. long retNum=0; for (ReplicationServerDomain rsd : toIterable(server.getDomainIterator())) { retNum += rsd.getChangesCount(); } return retNum; } /** * {@inheritDoc} */ @Override() public boolean isLocal() { return true; } /** * {@inheritDoc} */ @Override() public boolean isIndexed(AttributeType attributeType, IndexType indexType) { return true; } /** * {@inheritDoc} */ @Override() public synchronized Entry getEntry(DN entryDN) { try { if (baseDNSet.contains(entryDN)) { return new Entry(entryDN, rootObjectclasses, attributes, operationalAttributes); } InternalClientConnection conn = InternalClientConnection.getRootConnection(); SearchFilter filter = SearchFilter.createFilterFromString("(changetype=*)"); InternalSearchOperation searchOp = new InternalSearchOperation(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), null, entryDN, SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, filter, null, null); search(searchOp); List resultEntries = searchOp.getSearchEntries(); if (resultEntries.size() != 0) { return resultEntries.get(0); } } catch (DirectoryException ignored) { } return null; } /** * {@inheritDoc} */ @Override() public synchronized boolean entryExists(DN entryDN) { return getEntry(entryDN) != null; } /** * {@inheritDoc} */ @Override() public synchronized void addEntry(Entry entry, AddOperation addOperation) throws DirectoryException { Message message = ERR_BACKUP_ADD_NOT_SUPPORTED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * {@inheritDoc} */ @Override() public synchronized void deleteEntry(DN entryDN, DeleteOperation deleteOperation) throws DirectoryException { Message message = ERR_BACKUP_DELETE_NOT_SUPPORTED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * {@inheritDoc} */ @Override() public synchronized void replaceEntry(Entry oldEntry, Entry newEntry, ModifyOperation modifyOperation) throws DirectoryException { Message message = ERR_BACKUP_MODIFY_NOT_SUPPORTED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * {@inheritDoc} */ @Override() public synchronized void renameEntry(DN currentDN, Entry entry, ModifyDNOperation modifyDNOperation) throws DirectoryException { Message message = ERR_BACKUP_MODIFY_DN_NOT_SUPPORTED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * {@inheritDoc} */ @Override() public Set getSupportedControls() { return supportedControls; } /** * {@inheritDoc} */ @Override() public Set getSupportedFeatures() { return supportedFeatures; } /** * {@inheritDoc} */ @Override() public boolean supportsLDIFExport() { return true; } /** * {@inheritDoc} */ @Override() public synchronized void exportLDIF(LDIFExportConfig exportConfig) throws DirectoryException { if(server == null) { Message message = ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,message); } final List exportContainers = findExportContainers(exportConfig); // Make a note of the time we started. long startTime = System.currentTimeMillis(); // Start a timer for the progress report. Timer timer = new Timer(); TimerTask progressTask = new ProgressTask(); timer.scheduleAtFixedRate(progressTask, progressInterval, progressInterval); // Create the LDIF writer. LDIFWriter ldifWriter; try { ldifWriter = new LDIFWriter(exportConfig); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } Message message = ERR_BACKEND_CANNOT_CREATE_LDIF_WRITER.get(String.valueOf(e)); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message, e); } exportRootChanges(exportContainers, exportConfig, ldifWriter); try { // Iterate through the containers. for (ReplicationServerDomain exportContainer : exportContainers) { if (exportConfig.isCancelled()) { break; } writeChangesAfterCSN(exportContainer, exportConfig, ldifWriter, null, null); } } finally { timer.cancel(); close(ldifWriter); } long finishTime = System.currentTimeMillis(); long totalTime = finishTime - startTime; float rate = 0; if (totalTime > 0) { rate = 1000f*exportedCount / totalTime; } Message message = NOTE_JEB_EXPORT_FINAL_STATUS.get( exportedCount, skippedCount, totalTime/1000, rate); logError(message); } private List findExportContainers( LDIFExportConfig exportConfig) throws DirectoryException { List includeBranches = exportConfig.getIncludeBranches(); List exportContainers = new ArrayList(); for (Iterator iter = server.getDomainIterator(); iter.hasNext();) { ReplicationServerDomain rsd = iter.next(); // Skip containers that are not covered by the include branches. if (includeBranches == null || includeBranches.isEmpty()) { exportContainers.add(rsd); } else { DN baseDN = DN.decode(rsd.getBaseDN() + "," + BASE_DN); for (DN includeBranch : includeBranches) { if (includeBranch.isDescendantOf(baseDN) || includeBranch.isAncestorOf(baseDN)) { exportContainers.add(rsd); } } } } return exportContainers; } /** * Exports the root changes of the export, and one entry by domain. */ private void exportRootChanges(List exportContainers, final LDIFExportConfig exportConfig, LDIFWriter ldifWriter) { AttributeType ocType = DirectoryServer.getObjectClassAttributeType(); AttributeBuilder builder = new AttributeBuilder(ocType); builder.add("top"); builder.add("domain"); Attribute ocAttr = builder.toAttribute(); Map> attrs = new HashMap>(); attrs.put(ocType, singletonList(ocAttr)); try { ChangeRecordEntry changeRecord = new AddChangeRecordEntry(DN.decode(BASE_DN), attrs); ldifWriter.writeChangeRecord(changeRecord); } catch (Exception e) { /* do nothing */ } if (exportConfig == null) { return; } for (ReplicationServerDomain exportContainer : exportContainers) { if (exportConfig.isCancelled()) { break; } final ServerState serverState = exportContainer.getLatestServerState(); TRACER.debugInfo("State=" + serverState); Attribute stateAttr = Attributes.create("state", serverState.toString()); Attribute genidAttr = Attributes.create("generation-id", "" + exportContainer.getGenerationId() + exportContainer.getBaseDN()); attrs.clear(); attrs.put(ocType, singletonList(ocAttr)); attrs.put(stateAttr.getAttributeType(), singletonList(stateAttr)); attrs.put(genidAttr.getAttributeType(), singletonList(genidAttr)); final String dnString = exportContainer.getBaseDN() + "," + BASE_DN; try { DN dn = DN.decode(dnString); ChangeRecordEntry changeRecord = new AddChangeRecordEntry(dn, attrs); ldifWriter.writeChangeRecord(changeRecord); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } logError(ERR_BACKEND_EXPORT_ENTRY.get(dnString, String.valueOf(e))); } } } /** * Exports or returns all the changes from a ReplicationServerDomain coming * after the CSN specified in the searchOperation. */ private void writeChangesAfterCSN(ReplicationServerDomain rsDomain, final LDIFExportConfig exportConfig, LDIFWriter ldifWriter, SearchOperation searchOperation, final CSN previousCSN) throws DirectoryException { if (exportConfig != null && exportConfig.isCancelled()) { // Abort if cancelled return; } ReplicaDBCursor cursor = rsDomain.getCursorFrom(previousCSN); try { int lookthroughCount = 0; // Walk through the changes cursor.next(); // first try to advance the cursor while (cursor.getChange() != null) { if (exportConfig != null && exportConfig.isCancelled()) { // abort if cancelled return; } if (!canContinue(searchOperation, lookthroughCount)) { break; } lookthroughCount++; writeChange(cursor.getChange(), ldifWriter, searchOperation, rsDomain.getBaseDN(), exportConfig != null); cursor.next(); } } catch (ChangelogException e) { throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e); } finally { close(cursor); } } private boolean canContinue(SearchOperation searchOperation, int lookthroughCount) { if (searchOperation == null) { return true; } int limit = searchOperation.getClientConnection().getLookthroughLimit(); if (lookthroughCount > limit && limit > 0) { // lookthrough limit exceeded searchOperation.setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED); searchOperation.setErrorMessage(null); return false; } try { searchOperation.checkIfCanceled(false); return true; } catch (CanceledOperationException e) { searchOperation.setResultCode(ResultCode.CANCELED); searchOperation.setErrorMessage(null); return false; } } private CSN extractCSN(SearchOperation searchOperation) { if (searchOperation != null) { return extractCSN(searchOperation.getFilter()); } return null; } /** * Attempt to extract a CSN from searchFilter like * ReplicationChangeNumber=xxxx or ReplicationChangeNumber>=xxxx. * * @param filter * The filter to evaluate. * @return The extracted CSN or null if no CSN was found. */ private CSN extractCSN(SearchFilter filter) { // Try to optimize for filters like replicationChangeNumber>=xxxxx // or replicationChangeNumber=xxxxx : // If the search filter is one of these 2 filters, move directly to // ChangeNumber=xxxx before starting the iteration. final FilterType filterType = filter.getFilterType(); if (GREATER_OR_EQUAL.equals(filterType) || EQUALITY.equals(filterType)) { AttributeType changeNumberAttrType = DirectoryServer.getDefaultAttributeType(CHANGE_NUMBER); if (filter.getAttributeType().equals(changeNumberAttrType)) { try { CSN startingCSN = new CSN(filter.getAssertionValue().getValue().toString()); return new CSN(startingCSN.getTime(), startingCSN.getSeqnum() - 1, startingCSN.getServerId()); } catch (Exception e) { // don't try to optimize the search if the ChangeNumber is // not a valid replication CSN. } } } else if (AND.equals(filterType)) { for (SearchFilter filterComponent : filter.getFilterComponents()) { // This code does not expect more than one CSN in the search filter. // It is ok, since it is only used by developers/testers for debugging. final CSN previousCSN = extractCSN(filterComponent); if (previousCSN != null) { return previousCSN; } } } return null; } /** * Exports one change. */ private void writeChange(UpdateMsg updateMsg, LDIFWriter ldifWriter, SearchOperation searchOperation, DN baseDN, boolean isExport) { InternalClientConnection conn = InternalClientConnection.getRootConnection(); Entry entry = null; DN dn = null; ObjectClass extensibleObjectOC = DirectoryServer.getDefaultObjectClass("extensibleObject"); try { if (updateMsg instanceof LDAPUpdateMsg) { LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg; if (msg instanceof AddMsg) { AddMsg addMsg = (AddMsg)msg; AddOperation addOperation = (AddOperation)msg.createOperation(conn); dn = DN.decode("puid=" + addMsg.getParentEntryUUID() + "+" + CHANGE_NUMBER + "=" + msg.getCSN() + "+" + msg.getDN() + "," + BASE_DN); Map> attrs = new HashMap>(); Map objectclasses = new HashMap(); for (RawAttribute a : addOperation.getRawAttributes()) { Attribute attr = a.toAttribute(); if (attr.getAttributeType().isObjectClassType()) { for (ByteString os : a.getValues()) { String ocName = os.toString(); ObjectClass oc = DirectoryServer.getObjectClass(toLowerCase(ocName)); if (oc == null) { oc = DirectoryServer.getDefaultObjectClass(ocName); } objectclasses.put(oc,ocName); } } else { addAttribute(attrs, attr); } } addAttribute(attrs, "changetype", "add"); if (isExport) { ChangeRecordEntry changeRecord = new AddChangeRecordEntry(dn, attrs); ldifWriter.writeChangeRecord(changeRecord); } else { entry = new Entry(dn, objectclasses, attrs, null); } } else if (msg instanceof DeleteMsg) { dn = computeDN(msg); ChangeRecordEntry changeRecord = new DeleteChangeRecordEntry(dn); entry = writeChangeRecord(ldifWriter, changeRecord, isExport); } else if (msg instanceof ModifyMsg) { ModifyOperation op = (ModifyOperation)msg.createOperation(conn); dn = computeDN(msg); ChangeRecordEntry changeRecord = new ModifyChangeRecordEntry(dn, op.getRawModifications()); entry = writeChangeRecord(ldifWriter, changeRecord, isExport); } else if (msg instanceof ModifyDNMsg) { ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn); dn = computeDN(msg); ChangeRecordEntry changeRecord = new ModifyDNChangeRecordEntry( dn, op.getNewRDN(), op.deleteOldRDN(), op.getNewSuperior()); entry = writeChangeRecord(ldifWriter, changeRecord, isExport); } if (isExport) { this.exportedCount++; } else { // Add extensibleObject objectclass and the ChangeNumber in the entry. if (!entry.getObjectClasses().containsKey(extensibleObjectOC)) entry.addObjectClass(extensibleObjectOC); addAttribute(entry.getUserAttributes(), CHANGE_NUMBER, msg.getCSN().toString()); addAttribute(entry.getUserAttributes(), "replicationDomain", baseDN.toNormalizedString()); // Get the base DN, scope, and filter for the search. DN searchBaseDN = searchOperation.getBaseDN(); SearchScope scope = searchOperation.getScope(); SearchFilter filter = searchOperation.getFilter(); if (entry.matchesBaseAndScope(searchBaseDN, scope) && filter.matchesEntry(entry)) { searchOperation.returnEntry(entry, new LinkedList()); } } } } catch (Exception e) { this.skippedCount++; if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } final String dnStr = (dn != null) ? dn.toNormalizedString() : "Unknown"; Message message; if (isExport) { message = ERR_BACKEND_EXPORT_ENTRY.get(dnStr, String.valueOf(e)); } else { message = ERR_BACKEND_SEARCH_ENTRY.get(dnStr, e.getLocalizedMessage()); } logError(message); } } private DN computeDN(LDAPUpdateMsg msg) throws DirectoryException { return DN.decode("uuid=" + msg.getEntryUUID() + "," + CHANGE_NUMBER + "=" + msg.getCSN() + "," + msg.getDN() + "," + BASE_DN); } private Entry writeChangeRecord(LDIFWriter ldifWriter, ChangeRecordEntry changeRecord, boolean isExport) throws IOException, LDIFException { if (isExport) { ldifWriter.writeChangeRecord(changeRecord); return null; } final Writer writer = new Writer(); writer.getLDIFWriter().writeChangeRecord(changeRecord); return writer.getLDIFReader().readEntry(); } private void addAttribute(Map> attributes, String attrName, String attrValue) { addAttribute(attributes, Attributes.create(attrName, attrValue)); } /** * Add an attribute to a provided Map of attribute. * * @param attributes The Map that should be updated. * @param attribute The attribute that should be added to the Map. */ private void addAttribute( Map> attributes, Attribute attribute) { AttributeType attrType = attribute.getAttributeType(); List attrs = attributes.get(attrType); if (attrs == null) { attrs = new ArrayList(1); attrs.add(attribute); attributes.put(attrType, attrs); } else { attrs.add(attribute); } } /** * {@inheritDoc} */ @Override() public boolean supportsLDIFImport() { return false; } /** * {@inheritDoc} */ @Override() public synchronized LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws DirectoryException { Message message = ERR_REPLICATONBACKEND_IMPORT_LDIF_NOT_SUPPORTED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * {@inheritDoc} */ @Override() public boolean supportsBackup() { // This backend does not provide a backup/restore mechanism. return true; } /** * {@inheritDoc} */ @Override() public boolean supportsBackup(BackupConfig backupConfig, StringBuilder unsupportedReason) { return true; } /** {@inheritDoc} */ @Override() public void createBackup(BackupConfig backupConfig) throws DirectoryException { createBackupManager().createBackup(getBackendDir(), backupConfig); } /** {@inheritDoc} */ @Override() public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException { createBackupManager().restoreBackup(getBackendDir(), restoreConfig); } /** {@inheritDoc} */ @Override() public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException { createBackupManager().removeBackup(backupDirectory, backupID); } private BackupManager createBackupManager() { return new BackupManager(getBackendID()); } private File getBackendDir() throws DirectoryException { return getFileForPath(getReplicationServerCfg().getReplicationDBDirectory()); } /** * {@inheritDoc} */ @Override() public boolean supportsRestore() { return true; } /** * {@inheritDoc} */ @Override() public long numSubordinates(DN entryDN, boolean subtree) throws DirectoryException { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get()); } /** * {@inheritDoc} */ @Override() public ConditionResult hasSubordinates(DN entryDN) throws DirectoryException { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_HAS_SUBORDINATES_NOT_SUPPORTED.get()); } /** * Set the replication server associated with this backend. * @param server The replication server. */ public void setServer(ReplicationServer server) { this.server = server; } /** * This class reports progress of the export job at fixed intervals. */ private final class ProgressTask extends TimerTask { /** * The number of entries that had been exported at the time of the * previous progress report. */ private long previousCount = 0; /** * The time in milliseconds of the previous progress report. */ private long previousTime; /** * Create a new export progress task. */ public ProgressTask() { previousTime = System.currentTimeMillis(); } /** * The action to be performed by this timer task. */ @Override public void run() { long latestCount = exportedCount; long deltaCount = latestCount - previousCount; long latestTime = System.currentTimeMillis(); long deltaTime = latestTime - previousTime; if (deltaTime == 0) { return; } float rate = 1000f*deltaCount / deltaTime; Message message = NOTE_JEB_EXPORT_PROGRESS_REPORT.get(latestCount, skippedCount, rate); logError(message); previousCount = latestCount; previousTime = latestTime; } } /** * {@inheritDoc} */ @Override() public synchronized void search(SearchOperation searchOperation) throws DirectoryException { //This check is for GroupManager initialization. It currently doesn't //come into play because the replication server variable is null in //the check above. But if the order of initialization of the server variable //is ever changed, the following check will keep replication change entries //from being added to the groupmanager cache erroneously. List requestControls = searchOperation.getRequestControls(); if (requestControls != null) { for (Control c : requestControls) { if (OID_INTERNAL_GROUP_MEMBERSHIP_UPDATE.equals(c.getOID())) { return; } } } // don't do anything if the search is a base search on the backend suffix. try { DN backendBaseDN = DN.decode(BASE_DN); if ( searchOperation.getScope().equals(SearchScope.BASE_OBJECT) && backendBaseDN.equals(searchOperation.getBaseDN()) ) { return; } } catch (Exception e) { return; } // Make sure the base entry exists if it's supposed to be in this backend. final DN searchBaseDN = searchOperation.getBaseDN(); if (!handlesEntry(searchBaseDN)) { DN matchedDN = searchBaseDN.getParentDNInSuffix(); while (matchedDN != null) { if (handlesEntry(matchedDN)) { break; } matchedDN = matchedDN.getParentDNInSuffix(); } Message message = ERR_REPLICATIONBACKEND_ENTRY_DOESNT_EXIST. get(String.valueOf(searchBaseDN)); throw new DirectoryException( ResultCode.NO_SUCH_OBJECT, message, matchedDN, null); } if (server==null) { server = getReplicationServer(); if (server == null) { if (!baseDNSet.contains(searchBaseDN)) { Message message = ERR_REPLICATIONBACKEND_ENTRY_DOESNT_EXIST.get( String.valueOf(searchBaseDN)); throw new DirectoryException( ResultCode.NO_SUCH_OBJECT, message, null, null); } return; } } // Walk through all entries and send the ones that match. final List searchContainers = findSearchContainers(searchBaseDN); for (ReplicationServerDomain exportContainer : searchContainers) { final CSN previousCSN = extractCSN(searchOperation); writeChangesAfterCSN(exportContainer, null, null, searchOperation, previousCSN); } } private List findSearchContainers(DN searchBaseDN) throws DirectoryException { List searchContainers = new ArrayList(); for (Iterator iter = server.getDomainIterator(); iter.hasNext();) { ReplicationServerDomain rsd = iter.next(); // Skip containers that are not covered by the include branches. DN baseDN = DN.decode(rsd.getBaseDN() + "," + BASE_DN); if (searchBaseDN.isDescendantOf(baseDN) || searchBaseDN.isAncestorOf(baseDN)) { searchContainers.add(rsd); } } return searchContainers; } /** * Retrieves the replication server associated to this backend. * * @return The server retrieved * @throws DirectoryException When it occurs. */ private ReplicationServer getReplicationServer() throws DirectoryException { for (SynchronizationProvider provider : DirectoryServer.getSynchronizationProviders()) { if (provider instanceof MultimasterReplication) { MultimasterReplication mmp = (MultimasterReplication)provider; ReplicationServerListener list = mmp.getReplicationServerListener(); if (list != null) { return list.getReplicationServer(); } } } return null; } /** * Find the replication server configuration associated with this replication * backend. */ private ReplicationServerCfg getReplicationServerCfg() throws DirectoryException { RootCfg root = ServerManagementContext.getInstance().getRootConfiguration(); for (String name : root.listSynchronizationProviders()) { SynchronizationProviderCfg syncCfg; try { syncCfg = root.getSynchronizationProvider(name); } catch (ConfigException e) { throw new DirectoryException(ResultCode.OPERATIONS_ERROR, ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND.get(), e); } if (syncCfg instanceof ReplicationSynchronizationProviderCfg) { ReplicationSynchronizationProviderCfg scfg = (ReplicationSynchronizationProviderCfg) syncCfg; try { return scfg.getReplicationServer(); } catch (ConfigException e) { throw new DirectoryException(ResultCode.OPERATIONS_ERROR, ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND.get(), e); } } } // No replication server found. throw new DirectoryException(ResultCode.OPERATIONS_ERROR, ERR_REPLICATION_SERVER_CONFIG_NOT_FOUND.get()); } /** * Writer class to read/write from/to a bytearray. */ private static final class Writer { /** The underlying output stream. */ private final ByteArrayOutputStream stream; /** The underlying LDIF config. */ private final LDIFExportConfig config; /** The LDIF writer. */ private final LDIFWriter writer; /** * Create a new string writer. */ public Writer() { this.stream = new ByteArrayOutputStream(); this.config = new LDIFExportConfig(stream); try { this.writer = new LDIFWriter(config); } catch (IOException e) { // Should not happen. throw new RuntimeException(e); } } /** * Get the LDIF writer. * * @return Returns the LDIF writer. */ public LDIFWriter getLDIFWriter() { return writer; } /** * Close the writer and get an LDIF reader for the LDIF content. * * @return Returns an LDIF Reader. * @throws IOException * If an error occurred closing the writer. */ public LDIFReader getLDIFReader() throws IOException { writer.close(); String ldif = stream.toString("UTF-8"); ldif = ldif.replace("\n-\n", "\n"); ByteArrayInputStream istream = new ByteArrayInputStream(ldif.getBytes()); LDIFImportConfig newConfig = new LDIFImportConfig(istream); // ReplicationBackend may contain entries that are not schema // compliant. Let's ignore them for now. newConfig.setValidateSchema(false); return new LDIFReader(newConfig); } } /** * {@inheritDoc} */ @Override public void preloadEntryCache() throws UnsupportedOperationException { throw new UnsupportedOperationException("Operation not supported."); } }