opends/resource/config/config.ldif
@@ -182,7 +182,6 @@ ds-cfg-db-directory-permissions: 700 ds-cfg-index-entry-limit: 4000 ds-cfg-preload-time-limit: 0 seconds ds-cfg-import-queue-size: 100 ds-cfg-import-thread-count: 8 ds-cfg-entries-compressed: false ds-cfg-compact-encoding: true opends/resource/schema/02-config.ldif
@@ -1085,11 +1085,6 @@ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.222 NAME 'ds-cfg-import-queue-size' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.223 NAME 'ds-cfg-import-thread-count' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 @@ -2464,7 +2459,6 @@ MUST ds-cfg-db-directory MAY ( ds-cfg-index-entry-limit $ ds-cfg-preload-time-limit $ ds-cfg-import-queue-size $ ds-cfg-import-thread-count $ ds-cfg-entries-compressed $ ds-cfg-db-directory-permissions $ @@ -4035,7 +4029,6 @@ ds-cfg-ndb-thread-count $ ds-cfg-ndb-num-connections $ ds-cfg-deadlock-retry-limit $ ds-cfg-import-queue-size $ ds-cfg-import-thread-count $ ds-cfg-index-entry-limit ) X-ORIGIN 'OpenDS Directory Server' ) opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml
@@ -222,33 +222,6 @@ </ldap:attribute> </adm:profile> </adm:property> <adm:property name="import-queue-size" advanced="true"> <adm:synopsis> Specifies the size (in number of entries) of the queue that is used to hold the entries read during an LDIF import. </adm:synopsis> <adm:requires-admin-action> <adm:none> <adm:synopsis> Changes do not take effect for any import that may already be in progress. </adm:synopsis> </adm:none> </adm:requires-admin-action> <adm:default-behavior> <adm:defined> <adm:value>100</adm:value> </adm:defined> </adm:default-behavior> <adm:syntax> <adm:integer lower-limit="1" upper-limit="2147483647" /> </adm:syntax> <adm:profile name="ldap"> <ldap:attribute> <ldap:name>ds-cfg-import-queue-size</ldap:name> </ldap:attribute> </adm:profile> </adm:property> <adm:property name="import-thread-count" advanced="true"> <adm:synopsis> Specifies the number of threads that is used for concurrent opends/src/messages/messages/jeb.properties
@@ -178,7 +178,10 @@ NOTICE_JEB_EXPORT_PROGRESS_REPORT_88=Exported %d records and skipped %d (recent \ rate %.1f/sec) NOTICE_JEB_IMPORT_THREAD_COUNT_89=Import Thread Count: %d threads INFO_JEB_IMPORT_BUFFER_SIZE_90=Buffer size per thread = %,d SEVERE_ERR_IMPORT_LDIF_LACK_MEM_90=Insufficiant free memory to perform import. \ At least %dMB of free memory is required INFO_JEB_IMPORT_LDIF_PROCESSING_TIME_91=LDIF processing took %d seconds INFO_JEB_IMPORT_INDEX_PROCESSING_TIME_92=Index processing took %d seconds NOTICE_JEB_IMPORT_CLOSING_DATABASE_93=Flushing data to disk @@ -276,8 +279,8 @@ the import process can start SEVERE_ERR_JEB_IMPORT_THREAD_EXCEPTION_153=An error occurred in import thread \ %s: %s. The thread can not continue SEVERE_ERR_JEB_IMPORT_NO_WORKER_THREADS_154=There are no more import worker \ threads to process the imported entries NOTICE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT_154=Index %s: bytes left = %d, \ key processed rate = %.1f/sec SEVERE_ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR_155=Unable to create the temporary \ directory %s SEVERE_ERR_JEB_INVALID_LOGGING_LEVEL_156=The database logging level string \ @@ -326,27 +329,25 @@ NOTICE_JEB_IMPORT_STARTING_173=%s starting import (build %s, R%d) SEVERE_ERR_JEB_DIRECTORY_DOES_NOT_EXIST_174=The backend database directory \ '%s' does not exist SEVERE_ERR_JEB_IMPORT_LDIF_ABORT_175=The import was aborted because an \ uncaught exception was thrown during processing SEVERE_ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR_175=The following I/O \ error was received while processing the %s index scratch file: %s NOTICE_JEB_IMPORT_LDIF_ROOTCONTAINER_CLOSE_176=Import LDIF environment close \ took %d seconds NOTICE_JEB_IMPORT_LDIF_BUFFER_FLUSH_177=Begin substring buffer flush of %d \ elements. Buffer total access: %d buffer hits: %d NOTICE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED_178=Substring buffer flush \ completed in %d seconds NOTICE_JEB_IMPORT_LDIF_FINAL_CLEAN_179=Begin final cleaner run NOTICE_JEB_IMPORT_LDIF_CLEAN_180=Begin cleaner run NOTICE_JEB_IMPORT_LDIF_CLEANER_RUN_DONE_181=Cleaner run took %d seconds %d logs \ removed NOTICE_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS_182=Cleaner will remove %d logs NOTICE_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM_184=Available buffer memory %d bytes is \ below the minimum value of %d bytes. Setting available buffer memory to \ the minimum NOTICE_JEB_IMPORT_LDIF_MEMORY_INFO_185=Setting DB cache to %d bytes and \ internal buffer to %d bytes NOTICE_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM_186=Available buffer memory %d \ bytes is below the minimum value of %d bytes allowed for a single import \ context. Setting context available buffer memory to the minimum NOTICE_JEB_IMPORT_LDIF_DN_NO_PARENT_177=The DN %s was found to be missing \ a parent during the phase two parent check NOTICE_JEB_IMPORT_LDIF_DN_CLOSE_178=DN phase two processing completed. \ Processed %d DNs NOTICE_JEB_IMPORT_LDIF_INDEX_CLOSE_179=Index %s phase two processing completed SEVERE_ERR_EXECUTION_ERROR_180=Execution error during backend operation: %s SEVERE_ERR_INTERRUPTED_ERROR_181=Interrupted error during backend operation: %s NOTICE_JEB_IMPORT_LDIF_TRUSTED_FAILED_182=Setting indexes trusted failed \ for the following reason: %s NOTICE_JEB_IMPORT_LDIF_LOG_BYTES_184=Setting DB log byte size to %d bytes NOTICE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO_185=Setting DB cache size to %d bytes \ and phase one buffer size to to %d bytes NOTICE_JEB_IMPORT_LDIF_TOT_MEM_BUF_186=The amount of freeemory available to \ the import task is %d bytes. The number of phase one buffers required is \ %d buffers NOTICE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS_187=Checkpoints performed: %d NOTICE_JEB_IMPORT_LDIF_CLEANER_STATS_188=Cleaner runs: %d files deleted: %d \ entries read: %d IN nodes cleaned: %d opends/src/messages/messages/tools.properties
@@ -2505,4 +2505,9 @@ Windows Service INFO_INSTALLDS_DO_NOT_ENABLE_WINDOWS_SERVICE_1682=Do not enable the server to \ run as a Windows Service INFO_LDIFIMPORT_DESCRIPTION_TEMP_DIRECTORY_1683=Path to temporary directory \ for index scratch files during LDIF import INFO_LDIFIMPORT_TEMP_DIR_PLACEHOLDER_1684={directory} INFO_LDIFIMPORT_DESCRIPTION_DN_CHECK_PHASE_2_1685=Perform DN validation \ during second phase of LDIF import opends/src/server/org/opends/server/backends/jeb/BackendImpl.java
@@ -30,6 +30,7 @@ import java.io.IOException; import java.io.File; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ExecutionException; import java.io.FileInputStream; import java.io.FilenameFilter; @@ -41,6 +42,7 @@ import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.RunRecoveryException; import org.opends.server.backends.jeb.importLDIF.*; import org.opends.server.admin.std.meta.LocalDBIndexCfgDefn; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.Backend; @@ -71,7 +73,7 @@ import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.api.ExtensibleIndexer; import org.opends.server.types.DN; import org.opends.server.backends.jeb.importLDIF.Importer; import org.opends.server.api.ExtensibleMatchingRule; /** * This is an implementation of a Directory Server Backend which stores entries @@ -107,12 +109,12 @@ /** * A count of the total operation threads currently in the backend. */ private AtomicInteger threadTotalCount = new AtomicInteger(0); private final AtomicInteger threadTotalCount = new AtomicInteger(0); /** * A count of the write operation threads currently in the backend. */ private AtomicInteger threadWriteCount = new AtomicInteger(0); private final AtomicInteger threadWriteCount = new AtomicInteger(0); /** * A list of monitor providers created for this backend instance. @@ -281,6 +283,7 @@ /** * {@inheritDoc} */ @Override public void configureBackend(Configuration cfg) throws ConfigException { @@ -1128,10 +1131,11 @@ envConfig.setAllowCreate(true); envConfig.setTransactional(false); envConfig.setTxnNoSync(false); envConfig.setConfigParam("je.env.isLocking", "false"); envConfig.setConfigParam("je.env.isLocking", "true"); envConfig.setConfigParam("je.env.runCheckpointer", "false"); //Loop through local indexes and see if any are substring. boolean hasSubIndex = false; int indexCount = cfg.listLocalDBIndexes().length; subIndex: for (String idx : cfg.listLocalDBIndexes()) { final LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx); @@ -1164,11 +1168,33 @@ } } } Importer importer = new Importer(importConfig, hasSubIndex); envConfig.setConfigParam("je.maxMemory", importer.getDBCacheSize()); Importer importer = new Importer(importConfig, cfg); importer.init(envConfig); rootContainer = initializeRootContainer(envConfig); return importer.processImport(rootContainer); } catch (ExecutionException execEx) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, execEx); } Message message = ERR_EXECUTION_ERROR.get(execEx.getMessage()); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message); } catch (InterruptedException intEx) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, intEx); } Message message = ERR_INTERRUPTED_ERROR.get(intEx.getMessage()); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message); } catch (IOException ioe) { if (debugEnabled()) @@ -1188,14 +1214,6 @@ throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), je.getMessageObject()); } catch (DatabaseException de) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, de); } throw createDirectoryException(de); } catch (InitializationException ie) { if (debugEnabled()) @@ -1656,12 +1674,10 @@ * @param e The DatabaseException to be converted. * @return DirectoryException created from exception. */ DirectoryException createDirectoryException(DatabaseException e) { DirectoryException createDirectoryException(DatabaseException e) { ResultCode resultCode = DirectoryServer.getServerErrorResultCode(); Message message = null; if(e instanceof RunRecoveryException) { if (e instanceof RunRecoveryException) { message = NOTE_BACKEND_ENVIRONMENT_UNUSABLE.get(getBackendID()); logError(message); DirectoryServer.sendAlertNotification(DirectoryServer.getInstance(), @@ -1669,8 +1685,7 @@ } String jeMessage = e.getMessage(); if (jeMessage == null) { if (jeMessage == null) { jeMessage = stackTraceToSingleLineString(e); } message = ERR_JEB_DATABASE_EXCEPTION.get(jeMessage); @@ -1680,45 +1695,38 @@ /** * {@inheritDoc} */ public String getClassName() { public String getClassName() { return CLASS_NAME; } /** * {@inheritDoc} */ public LinkedHashMap<String,String> getAlerts() { LinkedHashMap<String,String> alerts = new LinkedHashMap<String,String>(); public LinkedHashMap<String, String> getAlerts() { LinkedHashMap<String, String> alerts = new LinkedHashMap<String, String>(); alerts.put(ALERT_TYPE_BACKEND_ENVIRONMENT_UNUSABLE, ALERT_DESCRIPTION_BACKEND_ENVIRONMENT_UNUSABLE); ALERT_DESCRIPTION_BACKEND_ENVIRONMENT_UNUSABLE); return alerts; } /** * {@inheritDoc} */ public DN getComponentEntryDN() { public DN getComponentEntryDN() { return cfg.dn(); } private RootContainer initializeRootContainer(EnvironmentConfig envConfig) throws ConfigException, InitializationException { throws ConfigException, InitializationException { // Open the database environment try { try { RootContainer rc = new RootContainer(this, cfg); rc.open(envConfig); return rc; } catch (DatabaseException e) { if (debugEnabled()) { catch (DatabaseException e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } Message message = ERR_JEB_OPEN_ENV_FAIL.get(e.getMessage()); @@ -1729,11 +1737,11 @@ /** * {@inheritDoc} */ @Override public void preloadEntryCache() throws UnsupportedOperationException { UnsupportedOperationException { EntryCachePreloader preloader = new EntryCachePreloader(this); new EntryCachePreloader(this); preloader.preload(); } } opends/src/server/org/opends/server/backends/jeb/Index.java
@@ -34,7 +34,6 @@ import org.opends.server.types.*; import org.opends.server.util.StaticUtils; import org.opends.server.backends.jeb.importLDIF.IntegerImportIDSet; import org.opends.server.backends.jeb.importLDIF.ImportIDSet; import static org.opends.messages.JebMessages.*; @@ -61,7 +60,7 @@ /** * The comparator for index keys. */ private Comparator<byte[]> comparator; private final Comparator<byte[]> comparator; /** * The limit on the number of entry IDs that may be indexed by one key. @@ -72,7 +71,7 @@ * Limit on the number of entry IDs that may be retrieved by cursoring * through an index. */ private int cursorEntryLimit; private final int cursorEntryLimit; /** * Number of keys that have exceeded the entry limit since this @@ -91,7 +90,7 @@ */ boolean maintainCount; private State state; private final State state; /** * A flag to indicate if this index should be trusted to be consistent @@ -115,8 +114,7 @@ private boolean rebuildRunning = false; //Thread local area to store per thread cursors. private ThreadLocal<Cursor> curLocal = new ThreadLocal<Cursor>(); private final ThreadLocal<Cursor> curLocal = new ThreadLocal<Cursor>(); /** * Create a new index object. @@ -310,43 +308,33 @@ } /** * Insert the specified import ID set into this index a the provided key. * * @param key The key to add the set to. * @param importIdSet The set of import IDs. * @param data Database entry to reuse for read. * @param cursor A database cursor to use. * @throws DatabaseException If an database error occurs. */ private void insert(DatabaseEntry key, ImportIDSet importIdSet, DatabaseEntry data, Cursor cursor) throws DatabaseException { OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT); insertKey(DatabaseEntry key, ImportIDSet importIdSet, DatabaseEntry data) throws DatabaseException { OperationStatus status = read(null, key, data, LockMode.RMW); if(status == OperationStatus.SUCCESS) { ImportIDSet newImportIDSet = new IntegerImportIDSet(); if (newImportIDSet.merge(data.getData(), importIdSet, indexEntryLimit, maintainCount) && importIdSet.isDirty()) { ImportIDSet newImportIDSet = new ImportIDSet(); if (newImportIDSet.merge(data.getData(), importIdSet, indexEntryLimit, maintainCount)) { entryLimitExceededCount++; importIdSet.setDirty(false); } data.setData(newImportIDSet.toDatabase()); cursor.putCurrent(data); put(null, key, data); } else if(status == OperationStatus.NOTFOUND) { if(!importIdSet.isDefined() && importIdSet.isDirty()) { if(!importIdSet.isDefined()) { entryLimitExceededCount++; importIdSet.setDirty(false); } data.setData(importIdSet.toDatabase()); cursor.put(key,data); put(null, key, data); } else { //Should never happen during import. throw new DatabaseException(); } } /** * Insert the specified import ID set into this index. Creates a DB * cursor if needed. @@ -364,7 +352,7 @@ cursor = openCursor(null, null); curLocal.set(cursor); } insert(key, importIdSet, data, cursor); insertKey(key, importIdSet, data); } @@ -383,14 +371,9 @@ boolean insert(ImportIDSet importIDSet, Set<byte[]> keySet, DatabaseEntry keyData, DatabaseEntry data) throws DatabaseException { Cursor cursor = curLocal.get(); if(cursor == null) { cursor = openCursor(null, null); curLocal.set(cursor); } for(byte[] key : keySet) { keyData.setData(key); insert(keyData, importIDSet, data, cursor); insert(keyData, importIDSet, data); } keyData.setData(null); data.setData(null); @@ -1021,6 +1004,7 @@ } } /** * Reads a range of keys and collects all their entry IDs into a * single set. @@ -1168,14 +1152,7 @@ curLocal.remove(); } } /** * Increment the count of the number of keys that have exceeded the entry * limit since this object was created. */ public void incEntryLimitExceededCount() { entryLimitExceededCount++; } /** * Update the index buffer for a deleted entry. @@ -1440,4 +1417,14 @@ { return maintainCount; } /** * Return an indexes comparator. * * @return The comparator related to an index. */ public Comparator<byte[]> getComparator() { return this.comparator; } } opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
File was deleted opends/src/server/org/opends/server/backends/jeb/importLDIF/DNContext.java
File was deleted opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
@@ -22,115 +22,495 @@ * CDDL HEADER END * * * Copyright 2008 Sun Microsystems, Inc. * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.backends.jeb.importLDIF; import org.opends.server.backends.jeb.EntryID; import org.opends.server.backends.jeb.JebFormat; /** * Interface defining and import ID set. * An import ID set backed by an array of ints. */ public interface ImportIDSet { public class ImportIDSet { /** * Add an entry ID to the set. * The internal array where elements are stored. */ private long[] array = null; /** * The number of valid elements in the array. */ private int count = 0; //Boolean to keep track if the instance is defined or not. private boolean isDefined=true; //Size of the undefines. private long undefinedSize = 0; //Key related to an ID set. private byte[] key; /** * Create an empty import set. */ public ImportIDSet() { } /** * Create an import ID set of the specified size plus an extra 128 slots. * * @param entryID The entry ID to add. * @param entryLimit The entry limit. * @param maintainCount Maintain count of IDs if in undefined mode. * @param size The size of the the underlying array, plus some extra space. */ public ImportIDSet(int size) { this.array = new long[size + 128]; } /** * Create an import set and add the specified entry ID to it. * * @param id The entry ID. */ public ImportIDSet(EntryID id) { this.array = new long[1]; this.array[0] = id.longValue(); count=1; } /** * Return if an import ID set is defined or not. * * @return <CODE>True</CODE> if an import ID set is defined. */ public boolean isDefined() { return isDefined; } /** * Return the undefined size of an import ID set. * * @return The undefined size of an import ID set. */ long getUndefinedSize() { return undefinedSize; } /** * Set an import ID set to undefined. */ void setUndefined() { array = null; isDefined = false; } /** * Merge an instance of an import ID set with the import ID set specified * in the parameter. The specified limit and maintain count parameters define * if the newly merged set is defined or not. * * @param importIDSet The import ID set to merge with. * @param limit The index limit to use in the undefined calculation. * @param maintainCount <CODE>True</CODE> if a count of the IDs should be kept * after going undefined. */ public void addEntryID(EntryID entryID, int entryLimit, boolean maintainCount); merge(ImportIDSet importIDSet, int limit, boolean maintainCount) { if(!isDefined() && !importIDSet.isDefined()) //both undefined { if(maintainCount) { undefinedSize += importIDSet.getUndefinedSize(); } return; } else if(!isDefined()) //this undefined { if(maintainCount) { undefinedSize += importIDSet.size(); } return; } else if(!importIDSet.isDefined()) //other undefined { isDefined = false; if(maintainCount) { undefinedSize = size() + importIDSet.getUndefinedSize(); } else { undefinedSize = Long.MAX_VALUE; } array = null; count = 0; } else if ((count + importIDSet.size()) > limit) //add together => undefined { isDefined = false; if(maintainCount) { undefinedSize = size() + importIDSet.size(); } else { undefinedSize = Long.MAX_VALUE; } array = null; count = 0; } else { addAll(importIDSet); } } /** * Return if a set is defined or not. * Add the specified entry id to an import ID set. * * @return <CODE>True</CODE> if a set is defined. * @param entryID The entry ID to add to an import ID set. * @param limit The index limit to use in the undefined calculation. * @param maintainCount <CODE>True</CODE> if a count of the IDs should be kept * after going undefined. */ public boolean isDefined(); public void addEntryID(EntryID entryID, int limit, boolean maintainCount) { addEntryID(entryID.longValue(), limit, maintainCount); } /** * Return the memory size of a set. /** * Add the specified long value to an import ID set. * * @return The sets current memory size. * @param l The long value to add to an import ID set. * @param limit The index limit to use in the undefined calculation. * @param maintainCount <CODE>True</CODE> if a count of the IDs should be kept * after going undefined. */ public int getMemorySize(); public void addEntryID(long l, int limit, boolean maintainCount) { if(!isDefined()) { if(maintainCount) { undefinedSize++; } return; } if(isDefined() && ((count + 1) > limit)) { isDefined = false; array = null; if(maintainCount) { undefinedSize = count + 1; } else { undefinedSize = Long.MAX_VALUE; } count = 0; } else { add(l); } } private boolean mergeCount(byte[] dBbytes, ImportIDSet importIdSet, int limit) { boolean incrLimitCount=false; boolean dbUndefined = ((dBbytes[0] & 0x80) == 0x80); if(dbUndefined && (!importIdSet.isDefined())) { undefinedSize = JebFormat.entryIDUndefinedSizeFromDatabase(dBbytes) + importIdSet.getUndefinedSize(); isDefined=false; } else if(dbUndefined && (importIdSet.isDefined())) { undefinedSize = JebFormat.entryIDUndefinedSizeFromDatabase(dBbytes) + importIdSet.size(); importIdSet.setUndefined(); isDefined=false; } else if(!importIdSet.isDefined()) { int dbSize = JebFormat.entryIDListFromDatabase(dBbytes).length; undefinedSize= dbSize + importIdSet.getUndefinedSize(); isDefined=false; incrLimitCount = true; } else { array = JebFormat.entryIDListFromDatabase(dBbytes); if(array.length + importIdSet.size() > limit) { undefinedSize = array.length + importIdSet.size(); importIdSet.setUndefined(); isDefined=false; incrLimitCount=true; } else { count = array.length; addAll(importIdSet); } } return incrLimitCount; } /** * Convert a set to a byte array suitable for saving to DB. * Merge the specified byte array read from a DB, with the specified import * ID set. The specified limit and maintain count parameters define * if the newly merged set is defined or not. * * @return A byte array representing the set. * @param dBbytes The byte array of IDs read from a DB. * @param importIdSet The import ID set to merge the byte array with. * @param limit The index limit to use in the undefined calculation. * @param maintainCount <CODE>True</CODE> if the import ID set should * maintain a count of import IDs. * @return <CODE>True</CODE> if the import ID set started keeping a count as * a result of the merge. */ public byte[] toDatabase(); public boolean merge(byte[] dBbytes, ImportIDSet importIdSet, int limit, boolean maintainCount) { boolean incrLimitCount=false; if(maintainCount) { incrLimitCount = mergeCount(dBbytes, importIdSet, limit); } else { boolean dbUndefined = ((dBbytes[0] & 0x80) == 0x80); if(dbUndefined) { isDefined=false; importIdSet.setUndefined(); undefinedSize = Long.MAX_VALUE; } else if(!importIdSet.isDefined()) { isDefined=false; incrLimitCount=true; undefinedSize = Long.MAX_VALUE; } else { array = JebFormat.entryIDListFromDatabase(dBbytes); if(array.length + importIdSet.size() > limit) { isDefined=false; incrLimitCount=true; count = 0; importIdSet.setUndefined(); undefinedSize = Long.MAX_VALUE; } else { count = array.length; addAll(importIdSet); } } } return incrLimitCount; } private void addAll(ImportIDSet that) { resize(this.count+that.count); if (that.count == 0) { return; } // Optimize for the case where the two sets are sure to have no overlap. if (this.count == 0 || that.array[0] > this.array[this.count-1]) { System.arraycopy(that.array, 0, this.array, this.count, that.count); count += that.count; return; } if (this.array[0] > that.array[that.count-1]) { System.arraycopy(this.array, 0, this.array, that.count, this.count); System.arraycopy(that.array, 0, this.array, 0, that.count); count += that.count; return; } int destPos = binarySearch(this.array, this.count, that.array[0]); if (destPos < 0) { destPos = -(destPos+1); } // Make space for the copy. int aCount = this.count - destPos; int aPos = destPos + that.count; int aEnd = aPos + aCount; System.arraycopy(this.array, destPos, this.array, aPos, aCount); // Optimize for the case where there is no overlap. if (this.array[aPos] > that.array[that.count-1]) { System.arraycopy(that.array, 0, this.array, destPos, that.count); count += that.count; return; } int bPos; for ( bPos = 0; aPos < aEnd && bPos < that.count; ) { if ( this.array[aPos] < that.array[bPos] ) { this.array[destPos++] = this.array[aPos++]; } else if ( this.array[aPos] > that.array[bPos] ) { this.array[destPos++] = that.array[bPos++]; } else { this.array[destPos++] = this.array[aPos++]; bPos++; } } // Copy any remainder. int aRemain = aEnd - aPos; if (aRemain > 0) { System.arraycopy(this.array, aPos, this.array, destPos, aRemain); destPos += aRemain; } int bRemain = that.count - bPos; if (bRemain > 0) { System.arraycopy(that.array, bPos, this.array, destPos, bRemain); destPos += bRemain; } count = destPos; } /** * Return the size of the set. * Return the number of IDs in an import ID set. * * @return The size of the ID set. * @return The current size of an import ID set. */ public int size(); public int size() { return count; } private boolean add(long v) { resize(count+1); if (count == 0 || v > array[count-1]) { array[count++] = v; return true; } int pos = binarySearch(array, count, v); if (pos >=0) { return false; } // For a negative return value r, the index -(r+1) gives the array // index at which the specified value can be inserted to maintain // the sorted order of the array. pos = -(pos+1); System.arraycopy(array, pos, array, pos+1, count-pos); array[pos] = v; count++; return true; } private static int binarySearch(long[] a, int count, long key) { int low = 0; int high = count-1; while (low <= high) { int mid = (low + high) >> 1; long midVal = a[mid]; if (midVal < key) low = mid + 1; else if (midVal > key) high = mid - 1; else return mid; // key found } return -(low + 1); // key not found. } private void resize(int size) { if (array == null) { array = new long[size]; } else if (array.length < size) { // Expand the size of the array in powers of two. int newSize = array.length == 0 ? 1 : array.length; do { newSize *= 2; } while (newSize < size); long[] newBytes = new long[newSize]; System.arraycopy(array, 0, newBytes, 0, count); array = newBytes; } } /** * Merge a byte array read from DB with a ID set. * Create a byte array suitable to write to a JEB DB from an import ID set. * * @param dbBytes The byte array read from DB. * @param bufImportIDSet The import ID set to merge. * @param entryLimit The entry limit. * @param maintainCount Maintain count of iDs if in undefined mode. * @return <CODE>True</CODE> if the merged set is undefined. * @return A byte array suitable for writing to a JEB DB. */ public boolean merge(byte[] dbBytes, ImportIDSet bufImportIDSet, int entryLimit, boolean maintainCount); public byte[] toDatabase() { if(isDefined) { return encode(null); } else { return JebFormat.entryIDUndefinedSizeToDatabase(undefinedSize); } } private byte[] encode(byte[] bytes) { int encodedSize = count * 8; if (bytes == null || bytes.length < encodedSize) { bytes = new byte[encodedSize]; } for (int pos = 0, i = 0; i < count; i++) { long v = array[i] & 0x00ffffffffL; bytes[pos++] = (byte) ((v >>> 56) & 0xFF); bytes[pos++] = (byte) ((v >>> 48) & 0xFF); bytes[pos++] = (byte) ((v >>> 40) & 0xFF); bytes[pos++] = (byte) ((v >>> 32) & 0xFF); bytes[pos++] = (byte) ((v >>> 24) & 0xFF); bytes[pos++] = (byte) ((v >>> 16) & 0xFF); bytes[pos++] = (byte) ((v >>> 8) & 0xFF); bytes[pos++] = (byte) (v & 0xFF); } return bytes; } /** * Merge the specified import ID set with the current import ID set using the * specified entry limit an maintain count values. * Set the DB key related to an import ID set. * * @param bufImportIDSet The import ID set to merge. * @param entryLimit The entry limit to use. * @param maintainCount <CODE>True</CODE> if maintain count is being kept. * @param key Byte array containing the key. */ public void merge(ImportIDSet bufImportIDSet, int entryLimit, boolean maintainCount); public void setKey(byte[] key) { this.key = key; } /** * Set the import ID set to the undefined state. */ public void setUndefined(); /** * Return the undefined size. * Return the DB key related to an import ID set. * * @return The undefined count. * @return The byte array containing the key. */ public long getUndefinedSize(); /** * Reset set. */ public void reset(); /** * Set the first entry ID to the specified entry ID. * * @param entryID The entry ID to use. */ public void setEntryID(EntryID entryID); /** * Return if a undefined entry ID set has been written to the index DB. * * @return Return <CODE>True</CODE>if the undefined entry ID set has been * written to the index DB. */ public boolean isDirty(); /** * Set the dirty flag to the specifed value. * * @param dirty The value to set the flag to. */ public void setDirty(boolean dirty); public byte[] getKey() { return this.key; } } opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -27,954 +27,1756 @@ package org.opends.server.backends.jeb.importLDIF; import org.opends.server.types.*; import org.opends.server.loggers.debug.DebugTracer; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.messages.JebMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.util.DynamicConstants.*; import static org.opends.server.util.ServerConstants.*; import java.io.*; import java.nio.*; import java.nio.channels.FileChannel; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.opends.server.util.StaticUtils.getFileForPath; import org.opends.messages.Message; import org.opends.messages.Category; import org.opends.messages.Severity; import org.opends.server.admin.std.server.LocalDBBackendCfg; import org.opends.server.util.LDIFReader; import org.opends.server.util.StaticUtils; import org.opends.server.util.LDIFException; import org.opends.server.util.RuntimeInformation; import static org.opends.server.util.DynamicConstants.BUILD_ID; import static org.opends.server.util.DynamicConstants.REVISION_NUMBER; import org.opends.server.backends.jeb.*; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.backends.jeb.*; import org.opends.messages.Message; import org.opends.messages.JebMessages; import static org.opends.messages.JebMessages.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.*; import java.io.IOException; import org.opends.server.types.*; import org.opends.server.util.*; import com.sleepycat.je.*; /** * Performs a LDIF import. */ public class Importer { private final int DRAIN_TO = 3; private final int TIMER_INTERVAL = 10000; private final int MB = (1024 * 1024); private final int LDIF_READER_BUF_SIZE = 2 * MB; private final int MIN_IMPORT_MEM_REQUIRED = 16 * MB; private final int MAX_BUFFER_SIZE = 48 * MB; private final int MIN_BUFFER_SIZE = 1024 * 100; private final int MIN_READ_AHEAD_CACHE_SIZE = 4096; private final int MAX_DB_CACHE_SIZE = 128 * MB; private final int MIN_DB_CACHE_SIZE = 16 * MB; private final int MAX_DB_LOG_BUF_BYTES = 100 * MB; private final int MEM_PCT_PHASE_1 = 60; private final int MEM_PCT_PHASE_2 = 50; public class Importer implements Thread.UncaughtExceptionHandler { private final String DIRECT_PROPERTY = "import.directphase2"; private final AtomicInteger bufferCount = new AtomicInteger(0); private final File tempDir; private final int indexCount, threadCount; private final boolean dn2idPhase2; private final LDIFImportConfig config; private final ByteBuffer directBuffer; /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * The JE backend configuration. */ private LocalDBBackendCfg config; /** * The root container used for this import job. */ private RootContainer rootContainer; /** * The LDIF import configuration. */ private LDIFImportConfig ldifImportConfig; /** * The LDIF reader. */ private LDIFReader reader; /** * Map of base DNs to their import context. */ private LinkedHashMap<DN, DNContext> importMap = new LinkedHashMap<DN, DNContext>(); private int bufferSize; private long dbCacheSize = 0, dbLogBufSize = 0; /** * The number of entries migrated. */ private int migratedCount; //The executor service used for the sort tasks. private ExecutorService sortService; /** * The number of entries imported. */ private int importedCount; //The executor service used for the index processing tasks. private ExecutorService indexProcessService; /** * The number of milliseconds between job progress reports. */ private long progressInterval = 10000; //Queue of free index buffers -- used to re-cycle index buffers; private final BlockingQueue<IndexBuffer> freeBufQue = new LinkedBlockingQueue<IndexBuffer>(); /** * The progress report timer. */ private Timer timer; //Map of DB containers to que of index buffers. Used to allocate sorted //index buffers to a index writer thread. private final Map<DatabaseContainer, BlockingQueue<IndexBuffer>> containerQueMap = new LinkedHashMap<DatabaseContainer, BlockingQueue<IndexBuffer>>(); //Thread array. private CopyOnWriteArrayList<WorkThread> threads; //Map of DB containers to index managers. Used to start phase 2. private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap = new LinkedHashMap<DatabaseContainer, IndexManager>(); //Progress task. private ProgressTask pTask; //Futures used to indicate when the index file writers are done flushing //their work queues and have exited. End of phase one. private final List<Future<?>> indexWriterFutures; //Number of entries import before checking if cleaning is needed after //eviction has been detected. private static final int entryCleanInterval = 250000; //List of index file writer tasks. Used to signal stopIndexWriterTasks to the //index file writer tasks when the LDIF file has been done. private final List<IndexFileWriterTask> indexWriterList; //Minimum buffer amount to give to a buffer manager. private static final long minBuffer = 1024 * 1024; //Total available memory for the buffer managers. private long totalAvailBufferMemory = 0; //Memory size to be used for the DB cache in string format. private String dbCacheSizeStr; //Used to do an initial clean after eviction has been detected. private boolean firstClean=false; //A thread threw an Runtime exception stop the import. private boolean unCaughtExceptionThrown = false; //Set to true if substring indexes are defined. private boolean hasSubIndexes = false; //Work thread 0, used to add the first 20 or so entries single threaded. private WorkThread workThread0; //Counter for thread 0; private int worker0Proc=0; //Max thread 0 adds. private static final int maxWorker0 = 20; //Map of DNs to Suffix objects. Placeholder for when multiple suffixes are //supported. private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>(); /** * Create a new import job with the specified ldif import config. * * @param ldifImportConfig The LDIF import config. * @param hasSubIndexes <CODE>True</CODE> If substring indexes are defined. * @param config The LDIF import config. * @param cfg The local DB backend config. * @throws IOException If a problem occurs while opening the LDIF file for * reading. */ public Importer(LDIFImportConfig ldifImportConfig, boolean hasSubIndexes) public Importer(LDIFImportConfig config, LocalDBBackendCfg cfg ) throws IOException { this.ldifImportConfig = ldifImportConfig; this.threads = new CopyOnWriteArrayList<WorkThread>(); this.hasSubIndexes = hasSubIndexes; calcMemoryLimits(); this.config = config; threadCount = cfg.getImportThreadCount(); indexCount = cfg.listLocalDBIndexes().length + 2; indexWriterList = new ArrayList<IndexFileWriterTask>(indexCount); indexWriterFutures = new CopyOnWriteArrayList<Future<?>>(); File parentDir; if(config.getTmpDirectory() == null) { parentDir = getFileForPath("import-tmp"); } else { parentDir = getFileForPath(config.getTmpDirectory()); } tempDir = new File(parentDir, cfg.getBackendId()); if(!tempDir.exists() && !tempDir.mkdirs()) { Message msg = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get( String.valueOf(tempDir)); throw new IOException(msg.toString()); } if (tempDir.listFiles() != null) { for (File f : tempDir.listFiles()) { f.delete(); } } dn2idPhase2 = config.getDNCheckPhase2(); String propString = System.getProperty(DIRECT_PROPERTY); if(propString != null) { int directSize = Integer.valueOf(propString); directBuffer = ByteBuffer.allocateDirect(directSize); } else { directBuffer = null; } } private void getBufferSizes(long availMem, int buffers) { long mem = availMem - (MAX_DB_CACHE_SIZE + MAX_DB_LOG_BUF_BYTES); bufferSize = (int) (mem/buffers); if(bufferSize >= MIN_BUFFER_SIZE) { dbCacheSize = MAX_DB_CACHE_SIZE; dbLogBufSize = MAX_DB_LOG_BUF_BYTES; if(bufferSize > MAX_BUFFER_SIZE) { bufferSize = MAX_BUFFER_SIZE; } } else { mem = availMem - MIN_DB_CACHE_SIZE - (MIN_DB_CACHE_SIZE * 7) / 100; bufferSize = (int) (mem/buffers); dbCacheSize = MIN_DB_CACHE_SIZE; if(bufferSize < MIN_BUFFER_SIZE) { System.out.println("Log size less than default -- give it a try"); bufferSize = MIN_BUFFER_SIZE; } else { long constrainedMem = mem - (buffers * MIN_BUFFER_SIZE); bufferSize = (int) ((buffers * MIN_BUFFER_SIZE) + (constrainedMem * 50/100)); bufferSize /= buffers; dbCacheSize = MIN_DB_CACHE_SIZE + (constrainedMem * 50/100); } } } /** * Return the suffix instance in the specified map that matches the specified * DN. * * @param dn The DN to search for. * @param map The map to search. * @return The suffix instance that matches the DN, or null if no match is * found. */ public static Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map) { Suffix suffix = null; DN nodeDN = dn; while (suffix == null && nodeDN != null) { suffix = map.get(nodeDN); if (suffix == null) { nodeDN = nodeDN.getParentDNInSuffix(); } } return suffix; } /** * Start the worker threads. * Calculate buffer sizes and initialize JEB properties based on memory. * * @throws DatabaseException If a DB problem occurs. * @param envConfig The environment config to use in the calculations. * * @throws InitializationException If a problem occurs during calculation. */ private void startWorkerThreads() throws DatabaseException { int importThreadCount = config.getImportThreadCount(); //Figure out how much buffer memory to give to each context. int contextCount = importMap.size(); long memoryPerContext = totalAvailBufferMemory / contextCount; //Below min, use the min value. if(memoryPerContext < minBuffer) { Message msg = NOTE_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM.get(memoryPerContext, minBuffer); public void init(EnvironmentConfig envConfig) throws InitializationException { Message msg; Runtime runtime = Runtime.getRuntime(); long freeMemory = runtime.freeMemory(); long availMemImport = (freeMemory * MEM_PCT_PHASE_1) / 100; int phaseOneBuffers = 2 * (indexCount * threadCount); msg = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(availMemImport, phaseOneBuffers); logError(msg); if (availMemImport < MIN_IMPORT_MEM_REQUIRED) { msg = ERR_IMPORT_LDIF_LACK_MEM.get(16); throw new InitializationException(msg); } getBufferSizes(availMemImport, phaseOneBuffers); envConfig.setConfigParam("je.maxMemory", Long.toString(dbCacheSize)); msg = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize, bufferSize); logError(msg); if(dbLogBufSize != 0) { envConfig.setConfigParam("je.log.totalBufferBytes", Long.toString(dbLogBufSize)); msg = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(dbLogBufSize); logError(msg); memoryPerContext = minBuffer; } // Create one set of worker threads/buffer managers for each base DN. for (DNContext context : importMap.values()) { BufferManager bufferManager = new BufferManager(memoryPerContext); context.setBufferManager(bufferManager); for (int i = 0; i < importThreadCount; i++) { WorkThread t = new WorkThread(context.getWorkQueue(), i, bufferManager, rootContainer, importMap); t.setUncaughtExceptionHandler(this); threads.add(t); if(i == 0) { workThread0 = t; } t.start(); } } // Start a timer for the progress report. timer = new Timer(); TimerTask progressTask = new ProgressTask(); //Used to get at extra functionality such as eviction detected. pTask = (ProgressTask) progressTask; timer.scheduleAtFixedRate(progressTask, progressInterval, progressInterval); return; } private void initIndexBuffers(int threadCount) { int bufferCount = 2 * (indexCount * threadCount); for(int i = 0; i < bufferCount; i++) { IndexBuffer b = IndexBuffer.createIndexBuffer(bufferSize); freeBufQue.add(b); } } private void initSuffixes() throws ConfigException, InitializationException { Iterator<EntryContainer> i = rootContainer.getEntryContainers().iterator(); EntryContainer ec = i.next(); Suffix suffix = Suffix.createSuffixContext(ec, config, rootContainer); dnSuffixMap.put(ec.getBaseDN(), suffix); } /** * Import a ldif using the specified root container. * * @param rootContainer The root container. * @param rootContainer The root container to use during the import. * * @return A LDIF result. * @throws DatabaseException If a DB error occurs. * @throws IOException If a IO error occurs. * @throws org.opends.server.backends.jeb.JebException If a JEB error occurs. * @throws DirectoryException If a directory error occurs. * @throws ConfigException If a configuration has an error. * @throws ConfigException If the import failed because of an configuration * error. * @throws IOException If the import failed because of an IO error. * @throws InitializationException If the import failed because of an * initialization error. * @throws JebException If the import failed due to a database error. * @throws InterruptedException If the import failed due to an interrupted * error. * @throws ExecutionException If the import failed due to an execution error. */ public LDIFImportResult processImport(RootContainer rootContainer) throws DatabaseException, IOException, JebException, DirectoryException, ConfigException { // Create an LDIF reader. Throws an exception if the file does not exist. reader = new LDIFReader(ldifImportConfig); public LDIFImportResult processImport(RootContainer rootContainer) throws ConfigException, InitializationException, IOException, JebException, InterruptedException, ExecutionException { this.rootContainer = rootContainer; this.config = rootContainer.getConfiguration(); Message message; long startTime; try { int importThreadCount = config.getImportThreadCount(); message = NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER); logError(message); message = NOTE_JEB_IMPORT_THREAD_COUNT.get(importThreadCount); logError(message); RuntimeInformation.logInfo(); for (EntryContainer entryContainer : rootContainer.getEntryContainers()) { DNContext DNContext = getImportContext(entryContainer); if(DNContext != null) { importMap.put(entryContainer.getBaseDN(), DNContext); } } // Make a note of the time we started. startTime = System.currentTimeMillis(); startWorkerThreads(); try { importedCount = 0; migratedCount = 0; migrateExistingEntries(); processLDIF(); migrateExcludedEntries(); } finally { if(!unCaughtExceptionThrown) { cleanUp(); switchContainers(); } } } finally { reader.close(); } importProlog(startTime); return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored()); } /** * Switch containers if the migrated entries were written to the temporary * container. * * @throws DatabaseException If a DB problem occurs. * @throws JebException If a JEB problem occurs. */ private void switchContainers() throws DatabaseException, JebException { for(DNContext importContext : importMap.values()) { DN baseDN = importContext.getBaseDN(); EntryContainer srcEntryContainer = importContext.getSrcEntryContainer(); if(srcEntryContainer != null) { if (debugEnabled()) { TRACER.debugInfo("Deleteing old entry container for base DN " + "%s and renaming temp entry container", baseDN); } EntryContainer unregEC = rootContainer.unregisterEntryContainer(baseDN); //Make sure the unregistered EC for the base DN is the same as //the one in the import context. if(unregEC != srcEntryContainer) { if(debugEnabled()) { TRACER.debugInfo("Current entry container used for base DN " + "%s is not the same as the source entry container used " + "during the migration process.", baseDN); } rootContainer.registerEntryContainer(baseDN, unregEC); continue; } srcEntryContainer.lock(); srcEntryContainer.close(); srcEntryContainer.delete(); srcEntryContainer.unlock(); EntryContainer newEC = importContext.getEntryContainer(); newEC.lock(); newEC.setDatabasePrefix(baseDN.toNormalizedString()); newEC.unlock(); rootContainer.registerEntryContainer(baseDN, newEC); } } } /** * Create and log messages at the end of the successful import. * * @param startTime The time the import started. */ private void importProlog(long startTime) { Message message; this.reader = new LDIFReader(config, rootContainer, LDIF_READER_BUF_SIZE); Message message = NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER); logError(message); message = NOTE_JEB_IMPORT_THREAD_COUNT.get(threadCount); logError(message); RuntimeInformation.logInfo(); initSuffixes(); long startTime = System.currentTimeMillis(); processPhaseOne(); processPhaseTwo(); setIndexesTrusted(); tempDir.delete(); long finishTime = System.currentTimeMillis(); long importTime = (finishTime - startTime); float rate = 0; if (importTime > 0) rate = 1000f * reader.getEntriesRead() / importTime; message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(), reader.getEntriesRead(), reader.getEntriesIgnored(), reader .getEntriesRejected(), 0, importTime / 1000, rate); logError(message); return new LDIFImportResult(reader.getEntriesRead(), reader .getEntriesRejected(), reader.getEntriesIgnored()); } private void setIndexesTrusted() throws JebException { try { for(Suffix s : dnSuffixMap.values()) { s.setIndexesTrusted(); } } catch (DatabaseException ex) { rate = 1000f*importedCount / importTime; Message msg = NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()); throw new JebException(msg); } } message = NOTE_JEB_IMPORT_FINAL_STATUS. get(reader.getEntriesRead(), importedCount, reader.getEntriesIgnored(), reader.getEntriesRejected(), migratedCount, importTime/1000, rate); logError(message); message = NOTE_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get( getEntryLimitExceededCount()); logError(message); private void processPhaseOne() throws InterruptedException, ExecutionException { initIndexBuffers(threadCount); FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask(); Timer timer = new Timer(); timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL); indexProcessService = Executors.newFixedThreadPool(2 * indexCount); sortService = Executors.newFixedThreadPool(threadCount); //Import tasks are collective tasks. List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); for (int i = 0; i < threadCount; i++) { tasks.add(new ImportTask()); } ExecutorService execService = Executors.newFixedThreadPool(threadCount); List<Future<Void>> results = execService.invokeAll(tasks); for (Future<Void> result : results) assert result.isDone(); stopIndexWriterTasks(); for (Future<?> result : indexWriterFutures) { result.get(); } execService.shutdown(); freeBufQue.clear(); sortService.shutdown(); timer.cancel(); } private void processPhaseTwo() throws InterruptedException { SecondPhaseProgressTask progress2Task = new SecondPhaseProgressTask(containerIndexMgrMap); Timer timer2 = new Timer(); timer2.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL, TIMER_INTERVAL); processIndexFiles(); timer2.cancel(); } private void processIndexFiles() throws InterruptedException { List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(indexCount); if(bufferCount.get() == 0) { return; } int cacheSize = cacheSizeFromFreeMemory(); int p = 0; int offSet = 0; if(directBuffer != null) { cacheSize = cacheSizeFromDirectMemory(); } for(Map.Entry<DatabaseContainer, IndexManager> e : containerIndexMgrMap.entrySet()) { DatabaseContainer container = e.getKey(); IndexManager indexMgr = e.getValue(); boolean isDN2ID = false; if(container instanceof DN2ID) { isDN2ID = true; } if(directBuffer != null) { int cacheSizes = cacheSize * indexMgr.getBufferList().size(); offSet += cacheSizes; directBuffer.limit(offSet); directBuffer.position(p); ByteBuffer b = directBuffer.slice(); tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, b, cacheSize)); p += cacheSizes; } else { tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, cacheSize)); } } List<Future<Void>> results = indexProcessService.invokeAll(tasks); for (Future<Void> result : results) assert result.isDone(); indexProcessService.shutdown(); } private int cacheSizeFromDirectMemory() { int cap = directBuffer.capacity(); int cacheSize = cap/bufferCount.get(); if(cacheSize > bufferSize) { cacheSize = bufferSize; } System.out.println("Direct indexes begin Total bufferCount: " + bufferCount.get() + " cacheSize: " + cacheSize); return cacheSize; } private int cacheSizeFromFreeMemory() { Runtime runtime = Runtime.getRuntime(); long availMemory = runtime.freeMemory() * MEM_PCT_PHASE_2 / 100; int avgBufSize = (int)(availMemory / bufferCount.get()); int cacheSize = Math.max(MIN_READ_AHEAD_CACHE_SIZE, avgBufSize); if(cacheSize > bufferSize) { cacheSize = bufferSize; } System.out.println("Indirect indexes begin Total bufferCount: " + bufferCount.get() + " avgBufSize: " + avgBufSize + " cacheSize: " + cacheSize); return cacheSize; } private void stopIndexWriterTasks() { IndexBuffer idxBuffer = IndexBuffer.createIndexBuffer(0); for(IndexFileWriterTask task : indexWriterList) { task.que.add(idxBuffer); } } /** * Run the cleaner if it is needed. * * @param entriesRead The number of entries read so far. * @param evictEntryNumber The number of entries to run the cleaner after * being read. * @throws DatabaseException If a DB problem occurs. * This task processes the LDIF file during phase 1. */ private void runCleanerIfNeeded(long entriesRead, long evictEntryNumber) throws DatabaseException { if(!firstClean || (entriesRead % evictEntryNumber) == 0) { //Make sure work queue is empty before starting. drainWorkQueue(); Message msg = NOTE_JEB_IMPORT_LDIF_CLEAN.get(); runCleaner(msg); if(!firstClean) { firstClean=true; } } } private final class ImportTask implements Callable<Void> { private final Map<Suffix, Map<DatabaseContainer, IndexBuffer>> suffixMap = new HashMap<Suffix, Map<DatabaseContainer, IndexBuffer>>(); private final Set<byte[]> insertKeySet = new HashSet<byte[]>(); private final IndexBuffer.DNComparator dnComparator = new IndexBuffer.DNComparator(); private final IndexBuffer.IndexComparator indexComparator = new IndexBuffer.IndexComparator(); /** * Run the cleaner, pausing the task thread output. * * @param header Message to be printed before cleaning. * @throws DatabaseException If a DB problem occurs. */ private void runCleaner(Message header) throws DatabaseException { Message msg; long startTime = System.currentTimeMillis(); //Need to force a checkpoint. rootContainer.importForceCheckPoint(); logError(header); pTask.setPause(true); //Actually clean the files. int cleaned = rootContainer.cleanedLogFiles(); //This checkpoint removes the files if any were cleaned. if(cleaned > 0) { msg = NOTE_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS.get(cleaned); logError(msg); rootContainer.importForceCheckPoint(); } pTask.setPause(false); long finishTime = System.currentTimeMillis(); long cleanTime = (finishTime - startTime) / 1000; msg = NOTE_JEB_IMPORT_LDIF_CLEANER_RUN_DONE.get(cleanTime, cleaned); logError(msg); } /** * Process a LDIF reader. * * @throws JebException If a JEB problem occurs. * @throws DatabaseException If a DB problem occurs. * @throws IOException If an IO exception occurs. */ private void processLDIF() throws JebException, DatabaseException, IOException { Message message = NOTE_JEB_IMPORT_LDIF_START.get(); logError(message); do { if (ldifImportConfig.isCancelled()) { break; } if(threads.size() <= 0) { message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get(); throw new JebException(message); } if(unCaughtExceptionThrown) { abortImport(); } try { // Read the next entry. Entry entry = reader.readEntry(); // Check for end of file. if (entry == null) { message = NOTE_JEB_IMPORT_LDIF_END.get(); logError(message); /** * {@inheritDoc} */ public Void call() throws Exception { Suffix suffix = null; while (true) { if (config.isCancelled()) { IndexBuffer idxBuffer = IndexBuffer.createIndexBuffer(0); freeBufQue.add(idxBuffer); return null; } Entry entry = reader.readEntry(dnSuffixMap); if (entry == null) { break; } // Route it according to base DN. DNContext DNContext = getImportConfig(entry.getDN()); processEntry(DNContext, entry); //If the progress task has noticed eviction proceeding, start running //the cleaner. if(pTask.isEvicting()) { runCleanerIfNeeded(reader.getEntriesRead(), entryCleanInterval); } } catch (LDIFException e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } catch (DirectoryException e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } catch (DatabaseException e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } } while (true); } /** * Process an entry using the specified import context. * * @param DNContext The import context. * @param entry The entry to process. */ private void processEntry(DNContext DNContext, Entry entry) throws DirectoryException, DatabaseException, JebException { if(worker0Proc < maxWorker0) { DNContext.addPending(entry.getDN()); WorkElement element = WorkElement.decode(entry, DNContext); workThread0.process(element); worker0Proc++; } else { //Add this DN to the pending map. DNContext.addPending(entry.getDN()); addEntryQueue(DNContext, entry); } } /** * Add work item to specified import context's queue. * @param context The import context. * @param item The work item to add. * @return <CODE>True</CODE> if the the work item was added to the queue. */ private boolean addQueue(DNContext context, WorkElement item) { try { while(!context.getWorkQueue().offer(item, 1000, TimeUnit.MILLISECONDS)) { if(threads.size() <= 0) { // All worker threads died. We must stop now. return false; } } } catch (InterruptedException e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } return true; } /** * Wait until the work queue is empty. */ private void drainWorkQueue() { if(threads.size() > 0) { for (DNContext context : importMap.values()) { while (context.getWorkQueue().size() > 0) { try { Thread.sleep(100); } catch (Exception e) { // No action needed. } } } } } private void abortImport() throws JebException { //Stop work threads telling them to skip substring flush. stopWorkThreads(false); timer.cancel(); Message message = ERR_JEB_IMPORT_LDIF_ABORT.get(); throw new JebException(message); } /** * Stop work threads. * * @param abort <CODE>True</CODE> if stop work threads was called from an * abort. * @throws JebException if a Jeb error occurs. */ private void stopWorkThreads(boolean abort) throws JebException { for (WorkThread t : threads) { t.stopProcessing(); } // Wait for each thread to stop. for (WorkThread t : threads) { try { if(!abort && unCaughtExceptionThrown) { timer.cancel(); Message message = ERR_JEB_IMPORT_LDIF_ABORT.get(); throw new JebException(message); } t.join(); importedCount += t.getImportedCount(); } catch (InterruptedException ie) { // No action needed? } } } /** * Clean up after a successful import. * * @throws DatabaseException If a DB error occurs. * @throws JebException If a Jeb error occurs. */ private void cleanUp() throws DatabaseException, JebException { Message msg; //Drain the work queue. drainWorkQueue(); pTask.setPause(true); long startTime = System.currentTimeMillis(); stopWorkThreads(true); //Flush the buffer managers. for(DNContext context : importMap.values()) { context.getBufferManager().prepareFlush(); context.getBufferManager().flushAll(); } long finishTime = System.currentTimeMillis(); long flushTime = (finishTime - startTime) / 1000; msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime); logError(msg); timer.cancel(); for(DNContext context : importMap.values()) { context.setIndexesTrusted(); } msg = NOTE_JEB_IMPORT_LDIF_FINAL_CLEAN.get(); //Run the cleaner. runCleaner(msg); closeIndexCursors(); } private void closeIndexCursors() throws DatabaseException { for (DNContext ic : importMap.values()) { ic.getEntryContainer().closeIndexCursors(); } } /** * Uncaught exception handler. * * @param t The thread working when the exception was thrown. * @param e The exception. */ public void uncaughtException(Thread t, Throwable e) { unCaughtExceptionThrown = true; threads.remove(t); Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get( t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause())); logError(msg); } /** * Get the entry limit exceeded counts from the indexes. * * @return Count of the index with entry limit exceeded values. */ private int getEntryLimitExceededCount() { int count = 0; for (DNContext ic : importMap.values()) { count += ic.getEntryContainer().getEntryLimitExceededCount(); } return count; } /** * Return an import context related to the specified DN. * @param dn The dn. * @return An import context. * @throws DirectoryException If an directory error occurs. */ private DNContext getImportConfig(DN dn) throws DirectoryException { DNContext DNContext = null; DN nodeDN = dn; while (DNContext == null && nodeDN != null) { DNContext = importMap.get(nodeDN); if (DNContext == null) { nodeDN = nodeDN.getParentDNInSuffix(); } } if (nodeDN == null) { // The entry should not have been given to this backend. Message message = JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn)); throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message); } return DNContext; } /** * Creates an import context for the specified entry container. * * @param entryContainer The entry container. * @return Import context to use during import. * @throws DatabaseException If a database error occurs. * @throws JebException If a JEB error occurs. * @throws ConfigException If a configuration contains error. */ private DNContext getImportContext(EntryContainer entryContainer) throws DatabaseException, JebException, ConfigException { DN baseDN = entryContainer.getBaseDN(); EntryContainer srcEntryContainer = null; List<DN> includeBranches = new ArrayList<DN>(); List<DN> excludeBranches = new ArrayList<DN>(); if(!ldifImportConfig.appendToExistingData() && !ldifImportConfig.clearBackend()) { for(DN dn : ldifImportConfig.getExcludeBranches()) { if(baseDN.equals(dn)) DN entryDN = entry.getDN(); EntryID entryID = (EntryID) entry.getAttachment(); suffix = getMatchSuffix(entryDN, dnSuffixMap); if(!suffixMap.containsKey(suffix)) { // This entire base DN was explicitly excluded. Skip. return null; suffixMap.put(suffix, new HashMap<DatabaseContainer, IndexBuffer>()); } if(baseDN.isAncestorOf(dn)) if(!dn2idPhase2) { excludeBranches.add(dn); } } if(!ldifImportConfig.getIncludeBranches().isEmpty()) { for(DN dn : ldifImportConfig.getIncludeBranches()) { if(baseDN.isAncestorOf(dn)) if(!processParent(entryDN, entryID, entry, suffix)) { includeBranches.add(dn); suffix.removePending(entryDN); continue; } } if(includeBranches.isEmpty()) { // There are no branches in the explicitly defined include list under // this base DN. Skip this base DN alltogether. return null; } // Remove any overlapping include branches. Iterator<DN> includeBranchIterator = includeBranches.iterator(); while(includeBranchIterator.hasNext()) { DN includeDN = includeBranchIterator.next(); boolean keep = true; for(DN dn : includeBranches) if(!suffix.getDN2ID().insert(null, entryDN, entryID)) { if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN)) { keep = false; break; } suffix.removePending(entryDN); Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get(); reader.rejectEntry(entry, msg); continue; } if(!keep) { includeBranchIterator.remove(); } } // Remvoe any exclude branches that are not are not under a include // branch since they will be migrated as part of the existing entries // outside of the include branches anyways. Iterator<DN> excludeBranchIterator = excludeBranches.iterator(); while(excludeBranchIterator.hasNext()) { DN excludeDN = excludeBranchIterator.next(); boolean keep = false; for(DN includeDN : includeBranches) { if(includeDN.isAncestorOf(excludeDN)) { keep = true; break; } } if(!keep) { excludeBranchIterator.remove(); } } if(includeBranches.size() == 1 && excludeBranches.size() == 0 && includeBranches.get(0).equals(baseDN)) { // This entire base DN is explicitly included in the import with // no exclude branches that we need to migrate. Just clear the entry // container. entryContainer.lock(); entryContainer.clear(); entryContainer.unlock(); suffix.removePending(entryDN); processID2SC(entryID, entry, suffix); } else { // Create a temp entry container srcEntryContainer = entryContainer; entryContainer = rootContainer.openEntryContainer(baseDN, baseDN.toNormalizedString() + "_importTmp"); processDN2ID(suffix, entryDN, entryID); suffix.removePending(entryDN); } suffix.getID2Entry().put(null, entryID, entry); processIndexes(suffix, entry, entryID); } flushIndexBuffers(); if(!dn2idPhase2) { suffix.getEntryContainer().getID2Children().closeCursor(); suffix.getEntryContainer().getID2Subtree().closeCursor(); } return null; } private boolean processParent(DN entryDN, EntryID entryID, Entry entry, Suffix suffix) throws DatabaseException { EntryID parentID = null; DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN); DN2ID dn2id = suffix.getDN2ID(); if(dn2id.get(null, entryDN, LockMode.DEFAULT) != null) { Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get(); reader.rejectEntry(entry, msg); return false; } if (parentDN != null) { parentID = suffix.getParentID(parentDN); if (parentID == null) { dn2id.remove(null, entryDN); Message msg = ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString()); reader.rejectEntry(entry, msg); return false; } } } // Create an import context. DNContext DNContext = new DNContext(); DNContext.setConfig(config); DNContext.setLDIFImportConfig(this.ldifImportConfig); DNContext.setLDIFReader(reader); DNContext.setBaseDN(baseDN); DNContext.setEntryContainer(entryContainer); DNContext.setSrcEntryContainer(srcEntryContainer); //Create queue. LinkedBlockingQueue<WorkElement> works = new LinkedBlockingQueue<WorkElement> (config.getImportQueueSize()); DNContext.setWorkQueue(works); // Set the include and exclude branches DNContext.setIncludeBranches(includeBranches); DNContext.setExcludeBranches(excludeBranches); return DNContext; } /** * Add specified context and entry to the work queue. * * @param context The context related to the entry DN. * @param entry The entry to work on. * @return <CODE>True</CODE> if the element was added to the work queue. */ private boolean addEntryQueue(DNContext context, Entry entry) { WorkElement element = WorkElement.decode(entry, context); return addQueue(context, element); } /** * Calculate the memory usage for the substring buffer and the DB cache. */ private void calcMemoryLimits() { Message msg; Runtime runtime = Runtime.getRuntime(); long freeMemory = runtime.freeMemory(); long maxMemory = runtime.maxMemory(); long totMemory = runtime.totalMemory(); long totFreeMemory = (freeMemory + (maxMemory - totMemory)); long dbCacheLimit = (totFreeMemory * 60) / 100; //If there are no substring indexes defined, set the DB cache //size to 75% and take a minimal substring buffer. if(!hasSubIndexes) { dbCacheLimit = (totFreeMemory * 75) / 100; } dbCacheSizeStr = Long.toString(dbCacheLimit); totalAvailBufferMemory = (totFreeMemory * 10) / 100; if(totalAvailBufferMemory < (10 * minBuffer)) { msg = NOTE_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM.get(totalAvailBufferMemory, (10 * minBuffer)); logError(msg); totalAvailBufferMemory = (10 * minBuffer); } else if(!hasSubIndexes) { totalAvailBufferMemory = (10 * minBuffer); } msg=NOTE_JEB_IMPORT_LDIF_MEMORY_INFO.get(dbCacheLimit, totalAvailBufferMemory); logError(msg); } /** * Return the string representation of the DB cache size. * * @return DB cache size string. */ public String getDBCacheSize() { return dbCacheSizeStr; } /** * Migrate any existing entries. * * @throws JebException If a JEB error occurs. * @throws DatabaseException If a DB error occurs. * @throws DirectoryException If a directory error occurs. */ private void migrateExistingEntries() throws JebException, DatabaseException, DirectoryException { for(DNContext context : importMap.values()) { EntryContainer srcEntryContainer = context.getSrcEntryContainer(); if(srcEntryContainer != null && !context.getIncludeBranches().isEmpty()) { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); LockMode lockMode = LockMode.DEFAULT; OperationStatus status; Message message = NOTE_JEB_IMPORT_MIGRATION_START.get( "existing", String.valueOf(context.getBaseDN())); logError(message); Cursor cursor = srcEntryContainer.getDN2ID().openCursor(null, CursorConfig.READ_COMMITTED); try { status = cursor.getFirst(key, data, lockMode); while(status == OperationStatus.SUCCESS && !ldifImportConfig.isCancelled()) { if(threads.size() <= 0) { message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get(); throw new JebException(message); } DN dn = DN.decode(ByteString.wrap(key.getData())); if(!context.getIncludeBranches().contains(dn)) { EntryID id = new EntryID(data); Entry entry = srcEntryContainer.getID2Entry().get(null, id, LockMode.DEFAULT); processEntry(context, entry); migratedCount++; status = cursor.getNext(key, data, lockMode); } else { // This is the base entry for a branch that will be included // in the import so we don't want to copy the branch to the new // entry container. /** * Advance the cursor to next entry at the same level in the DIT * skipping all the entries in this branch. * Set the next starting value to a value of equal length but * slightly greater than the previous DN. Since keys are compared * in reverse order we must set the first byte (the comma). * No possibility of overflow here. */ byte[] begin = StaticUtils.getBytes("," + dn.toNormalizedString()); begin[0] = (byte) (begin[0] + 1); key.setData(begin); status = cursor.getSearchKeyRange(key, data, lockMode); ArrayList<EntryID> IDs; if (parentDN != null && suffix.getParentDN() != null && parentDN.equals(suffix.getParentDN())) { IDs = new ArrayList<EntryID>(suffix.getIDs()); IDs.set(0, entryID); } else { EntryID nodeID; IDs = new ArrayList<EntryID>(entryDN.getNumComponents()); IDs.add(entryID); if (parentID != null) { IDs.add(parentID); EntryContainer ec = suffix.getEntryContainer(); for (DN dn = ec.getParentWithinBase(parentDN); dn != null; dn = ec.getParentWithinBase(dn)) { if((nodeID = getAncestorID(dn2id, dn)) == null) { return false; } else { IDs.add(nodeID); } } } finally { cursor.close(); } } suffix.setParentDN(parentDN); suffix.setIDs(IDs); entry.setAttachment(IDs); return true; } } private void processID2SC(EntryID entryID, Entry entry, Suffix suffix) throws DatabaseException { Set<byte[]> childKeySet = new HashSet<byte[]>(); Set<byte[]> subtreeKeySet = new HashSet<byte[]>(); Index id2children = suffix.getEntryContainer().getID2Children(); Index id2subtree = suffix.getEntryContainer().getID2Subtree(); id2children.indexer.indexEntry(entry, childKeySet); id2subtree.indexer.indexEntry(entry, subtreeKeySet); DatabaseEntry dbKey = new DatabaseEntry(); DatabaseEntry dbVal = new DatabaseEntry(); ImportIDSet idSet = new ImportIDSet(); idSet.addEntryID(entryID, id2children.getIndexEntryLimit(), id2children.getMaintainCount()); id2children.insert(idSet, childKeySet, dbKey, dbVal); DatabaseEntry dbSubKey = new DatabaseEntry(); DatabaseEntry dbSubVal = new DatabaseEntry(); ImportIDSet idSubSet = new ImportIDSet(); idSubSet.addEntryID(entryID, id2subtree.getIndexEntryLimit(), id2subtree.getMaintainCount()); id2subtree.insert(idSubSet, subtreeKeySet, dbSubKey, dbSubVal); } private EntryID getAncestorID(DN2ID dn2id, DN dn) throws DatabaseException { int i=0; EntryID nodeID = dn2id.get(null, dn, LockMode.DEFAULT); if(nodeID == null) { while((nodeID = dn2id.get(null, dn, LockMode.DEFAULT)) == null) { try { Thread.sleep(50); if(i == 3) { return null; } i++; } catch (Exception e) { return null; } } } return nodeID; } /** * Migrate excluded entries. * * @throws JebException If a JEB error occurs. * @throws DatabaseException If a DB error occurs. * @throws DirectoryException If a directory error occurs. */ private void migrateExcludedEntries() throws JebException, DatabaseException, DirectoryException { for(DNContext importContext : importMap.values()) { EntryContainer srcEntryContainer = importContext.getSrcEntryContainer(); if(srcEntryContainer != null && !importContext.getExcludeBranches().isEmpty()) { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); LockMode lockMode = LockMode.DEFAULT; OperationStatus status; Message message = NOTE_JEB_IMPORT_MIGRATION_START.get( "excluded", String.valueOf(importContext.getBaseDN())); logError(message); Cursor cursor = srcEntryContainer.getDN2ID().openCursor(null, CursorConfig.READ_COMMITTED); Comparator<byte[]> dn2idComparator = srcEntryContainer.getDN2ID().getComparator(); try { for(DN excludedDN : importContext.getExcludeBranches()) { byte[] suffix = StaticUtils.getBytes(excludedDN.toNormalizedString()); key.setData(suffix); status = cursor.getSearchKeyRange(key, data, lockMode); if(status == OperationStatus.SUCCESS && Arrays.equals(key.getData(), suffix)) { // This is the base entry for a branch that was excluded in the // import so we must migrate all entries in this branch over to // the new entry container. byte[] end = StaticUtils.getBytes("," + excludedDN.toNormalizedString()); end[0] = (byte) (end[0] + 1); while(status == OperationStatus.SUCCESS && dn2idComparator.compare(key.getData(), end) < 0 && !ldifImportConfig.isCancelled()) { if(threads.size() <= 0) { message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get(); throw new JebException(message); } EntryID id = new EntryID(data); Entry entry = srcEntryContainer.getID2Entry().get(null, id, LockMode.DEFAULT); processEntry(importContext, entry); migratedCount++; status = cursor.getNext(key, data, lockMode); private void processIndexes(Suffix ctx, Entry entry, EntryID entryID) throws DatabaseException, DirectoryException, JebException, ConfigException { Transaction txn = null; Map<AttributeType, AttributeIndex> attrMap = ctx.getAttrIndexMap(); for(Map.Entry<AttributeType, AttributeIndex> mapEntry : attrMap.entrySet()) { AttributeType attrType = mapEntry.getKey(); if(entry.hasAttribute(attrType)) { AttributeIndex attributeIndex = mapEntry.getValue(); Index index; if((index=attributeIndex.getEqualityIndex()) != null) { indexAttr(ctx, index, entry, entryID); } if((index=attributeIndex.getPresenceIndex()) != null) { indexAttr(ctx, index, entry, entryID); } if((index=attributeIndex.getSubstringIndex()) != null) { indexAttr(ctx, index, entry, entryID); } if((index=attributeIndex.getOrderingIndex()) != null) { indexAttr(ctx, index, entry, entryID); } if((index=attributeIndex.getApproximateIndex()) != null) { indexAttr(ctx, index, entry, entryID); } for(VLVIndex vlvIdx : ctx.getEntryContainer().getVLVIndexes()) { vlvIdx.addEntry(txn, entryID, entry); } Map<String,Collection<Index>> extensibleMap = attributeIndex.getExtensibleIndexes(); if(!extensibleMap.isEmpty()) { Collection<Index> subIndexes = attributeIndex.getExtensibleIndexes().get( EXTENSIBLE_INDEXER_ID_SUBSTRING); if(subIndexes != null) { for(Index subIndex: subIndexes) { indexAttr(ctx, subIndex, entry, entryID); } } Collection<Index> sharedIndexes = attributeIndex.getExtensibleIndexes().get( EXTENSIBLE_INDEXER_ID_SHARED); if(sharedIndexes !=null) { for(Index sharedIndex:sharedIndexes) { indexAttr(ctx, sharedIndex, entry, entryID); } } } } finally } } private void indexAttr(Suffix ctx, Index index, Entry entry, EntryID entryID) throws DatabaseException, ConfigException { insertKeySet.clear(); index.indexer.indexEntry(entry, insertKeySet); for(byte[] key : insertKeySet) { processKey(ctx, index, key, entryID, indexComparator, null); } } private void flushIndexBuffers() throws InterruptedException, ExecutionException { Iterator<Suffix> i = dnSuffixMap.values().iterator(); Suffix suffix = i.next(); for(Map<DatabaseContainer, IndexBuffer> map : suffixMap.values()) { for(Map.Entry<DatabaseContainer, IndexBuffer> e : map.entrySet()) { cursor.close(); DatabaseContainer container = e.getKey(); IndexBuffer indexBuffer = e.getValue(); if(container instanceof DN2ID) { indexBuffer.setComparator(dnComparator); } else { indexBuffer.setComparator(indexComparator); } indexBuffer.setContainer(container); indexBuffer.setEntryContainer(suffix.getEntryContainer()); Future<Void> future = sortService.submit(new SortTask(indexBuffer)); future.get(); } } } private void processKey(Suffix ctx, DatabaseContainer container, byte[] key, EntryID entryID,IndexBuffer.ComparatorBuffer<byte[]> comparator, EntryContainer entryContainer) throws ConfigException { IndexBuffer indexBuffer; Map<DatabaseContainer, IndexBuffer> conMap = suffixMap.get(ctx); if(!conMap.containsKey(container)) { indexBuffer = getNewIndexBuffer(); conMap.put(container, indexBuffer); } else { indexBuffer = conMap.get(container); } if(!indexBuffer.isSpaceAvailable(key)) { indexBuffer.setContainer(container); indexBuffer.setComparator(comparator); indexBuffer.setEntryContainer(entryContainer); sortService.submit(new SortTask(indexBuffer)); indexBuffer = getNewIndexBuffer(); conMap.remove(container); conMap.put(container, indexBuffer); } indexBuffer.add(key, entryID); } private IndexBuffer getNewIndexBuffer() throws ConfigException { IndexBuffer indexBuffer = freeBufQue.poll(); if(indexBuffer.isPoison()) { Message msg = Message.raw(Category.JEB, Severity.SEVERE_ERROR, "Abort import - MPD"); throw new ConfigException(msg); } return indexBuffer; } private void processDN2ID(Suffix suffix, DN dn, EntryID entryID) throws ConfigException { DatabaseContainer dn2id = suffix.getDN2ID(); byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString()); processKey(suffix, dn2id, dnBytes, entryID, dnComparator, suffix.getEntryContainer()); } } /** * The task reads the temporary index files and writes their results to the * index database. */ private final class IndexWriteDBTask implements Callable<Void> { private final IndexManager indexMgr; private final boolean isDN2ID; private final DatabaseEntry dbKey, dbValue; private final DN2ID dn2id; private final Index index; private final EntryContainer entryContainer; private final int id2ChildLimit; private final boolean id2ChildMCount; private TreeMap<DN,EntryID> parentIDMap = new TreeMap<DN,EntryID>(); private DN parentDN, lastDN; private EntryID parentID, lastID; private final Map<byte[], ImportIDSet> id2childTree; private final Map<byte[], ImportIDSet> id2subtreeTree; private final int cacheSize; private ByteBuffer directBuffer = null; public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID, ByteBuffer b, int cacheSize) { this(indexMgr, isDN2ID, cacheSize); directBuffer = b; } public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID, int cacheSize) { this.indexMgr = indexMgr; this.entryContainer = indexMgr.entryContainer; this.isDN2ID = isDN2ID; this.dbKey = new DatabaseEntry(); this.dbValue = new DatabaseEntry(); this.cacheSize = cacheSize; if(isDN2ID) { this.dn2id = indexMgr.dn2id; this.index = null; id2ChildLimit = entryContainer.getID2Children().getIndexEntryLimit(); id2ChildMCount = entryContainer.getID2Subtree().getMaintainCount(); Comparator<byte[]> id2ChildComparator = entryContainer.getID2Children().getComparator(); Comparator<byte[]> id2SubtreeComparator = entryContainer.getID2Subtree().getComparator(); id2childTree = new TreeMap<byte[], ImportIDSet>(id2ChildComparator); id2subtreeTree = new TreeMap<byte[], ImportIDSet>(id2SubtreeComparator); } else { this.dn2id = null; this.index = indexMgr.getIndex(); id2subtreeTree = null; id2childTree = null; id2ChildLimit = 0; id2ChildMCount = false; } } public Void call() throws Exception { Comparator<byte[]> comparator = indexMgr.getComparator(); int limit = indexMgr.getLimit(); boolean maintainCount = indexMgr.getMaintainCount(); byte[] cKey = null; ImportIDSet cIDSet = null; indexMgr.init(); List<Buffer> bufferList = indexMgr.getBufferList(); SortedSet<Buffer> bufferSet = new TreeSet<Buffer>(); int p = 0; int offSet = cacheSize; for(Buffer b : bufferList) { if(directBuffer != null) { directBuffer.position(p); directBuffer.limit(offSet); ByteBuffer slice = directBuffer.slice(); b.init(indexMgr, slice, cacheSize); p += cacheSize; offSet += cacheSize; } else { b.init(indexMgr, null, cacheSize); } bufferSet.add(b); } while(!bufferSet.isEmpty()) { Buffer b; b = bufferSet.first(); if(b == null) { System.out.println("null b"); } bufferSet.remove(b); byte[] key = b.getKey(); ImportIDSet idSet = b.getIDSet(); if(cKey == null) { cKey = key; cIDSet = idSet; } else { if(comparator.compare(key, cKey) != 0) { addToDB(cKey, cIDSet); indexMgr.incrKeyCount(); cKey = key; cIDSet = idSet; } else { cIDSet.setKey(cKey); cIDSet.merge(idSet, limit, maintainCount); } } if(b.hasMoreData()) { b.getNextRecord(); bufferSet.add(b); } } if(cKey != null) { addToDB(cKey, cIDSet); } cleanUP(); return null; } private void cleanUP() throws DatabaseException, DirectoryException, IOException { if(!isDN2ID) { index.closeCursor(); Message msg = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(index.getName()); logError(msg); } else { if(dn2idPhase2) { flushSubTreeChildIndexes(); } } indexMgr.setDone(); indexMgr.close(); indexMgr.deleteIndexFile(); } private void flushSubTreeChildIndexes() throws DatabaseException, DirectoryException { Index id2child = entryContainer.getID2Children(); Set<Map.Entry<byte[], ImportIDSet>> id2childSet = id2childTree.entrySet(); for(Map.Entry<byte[], ImportIDSet> e : id2childSet) { byte[] key = e.getKey(); ImportIDSet idSet = e.getValue(); dbKey.setData(key); id2child.insert(dbKey, idSet, dbValue); } id2child.closeCursor(); Index id2subtree = entryContainer.getID2Subtree(); Set<Map.Entry<byte[], ImportIDSet>> subtreeSet = id2subtreeTree.entrySet(); for(Map.Entry<byte[], ImportIDSet> e : subtreeSet) { byte[] key = e.getKey(); ImportIDSet idSet = e.getValue(); dbKey.setData(key); id2subtree.insert(dbKey, idSet, dbValue); } id2subtree.closeCursor(); Message msg = NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getTotDNCount()); logError(msg); } private void addToDB(byte[] key, ImportIDSet record) throws InterruptedException, DatabaseException, DirectoryException { record.setKey(key); if(!this.isDN2ID) { addIndex(record); } else { if(dn2idPhase2) { addDN2ID(record); } } } private void id2Subtree(EntryContainer ec, EntryID childID, int limit, boolean mCount) throws DatabaseException { ImportIDSet idSet; if(!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData())) { idSet = new ImportIDSet(); id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet); } else { idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData()); } idSet.addEntryID(childID, limit, mCount); for (DN dn = ec.getParentWithinBase(parentDN); dn != null; dn = ec.getParentWithinBase(dn)) { EntryID nodeID = parentIDMap.get(dn); if(!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData())) { idSet = new ImportIDSet(); id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet); } else { idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData()); } idSet.addEntryID(childID, limit, mCount); } } private void id2child(EntryID childID, int limit, boolean mCount) { ImportIDSet idSet; if(!id2childTree.containsKey(parentID.getDatabaseEntry().getData())) { idSet = new ImportIDSet(); id2childTree.put(parentID.getDatabaseEntry().getData(), idSet); } else { idSet = id2childTree.get(parentID.getDatabaseEntry().getData()); } idSet.addEntryID(childID, limit, mCount); } private boolean checkParent(DN dn, EntryID id, EntryContainer ec) { if(parentIDMap.isEmpty()) { parentIDMap.put(dn, id); return true; } else if(lastDN != null && lastDN.isAncestorOf(dn)) { parentIDMap.put(lastDN, lastID); parentDN = lastDN; parentID = lastID; lastDN = dn; lastID = id; return true; } else if(parentIDMap.lastKey().isAncestorOf(dn)) { parentDN = parentIDMap.lastKey(); parentID = parentIDMap.get(parentDN); lastDN = dn; lastID = id; return true; } else { DN pDN = ec.getParentWithinBase(dn); if(parentIDMap.containsKey(pDN)) { DN lastKey = parentIDMap.lastKey(); Map<DN, EntryID> subMap = parentIDMap.subMap(pDN, lastKey); for(Map.Entry<DN, EntryID> e : subMap.entrySet()) { subMap.remove(e.getKey()); } parentDN = pDN; parentID = parentIDMap.get(pDN); lastDN = dn; lastID = id; } else { Message msg = NOTE_JEB_IMPORT_LDIF_DN_NO_PARENT.get(dn.toString()); Entry e = new Entry(dn, null, null, null); reader.rejectEntry(e, msg); return false; } } return true; } private void addDN2ID(ImportIDSet record) throws DatabaseException, DirectoryException { DatabaseEntry idVal = new DatabaseEntry(); dbKey.setData(record.getKey()); idVal.setData(record.toDatabase()); DN dn = DN.decode(ByteString.wrap(dbKey.getData())); EntryID entryID = new EntryID(idVal); if(!checkParent(dn, entryID, entryContainer)) { return; } dn2id.putRaw(null, dbKey, idVal); indexMgr.addTotDNCount(1); if(parentDN != null) { id2child(entryID, id2ChildLimit, id2ChildMCount); id2Subtree(entryContainer, entryID, id2ChildLimit, id2ChildMCount); } } private void addIndex(ImportIDSet record) throws DatabaseException { dbKey.setData(record.getKey()); index.insert(dbKey, record, dbValue); } } /** * This task writes the temporary index files using the sorted buffers read * from a blocking queue. */ private final class IndexFileWriterTask implements Runnable { private final IndexManager indexMgr; private final BlockingQueue<IndexBuffer> que; private final ByteArrayOutputStream byteStream = new ByteArrayOutputStream(2 * bufferSize); private final DataOutputStream dataStream; private long bufCount = 0; private final File file; private final SortedSet<IndexBuffer> indexSortedSet; private boolean poisonSeen = false; public IndexFileWriterTask(BlockingQueue<IndexBuffer> que, IndexManager indexMgr) throws FileNotFoundException { this.que = que; file = indexMgr.getFile(); this.indexMgr = indexMgr; BufferedOutputStream bufferedStream = new BufferedOutputStream(new FileOutputStream(file), 2 * MB); dataStream = new DataOutputStream(bufferedStream); indexSortedSet = new TreeSet<IndexBuffer>(); } public void run() { long offset = 0; List<IndexBuffer> l = new LinkedList<IndexBuffer>(); try { while(true) { IndexBuffer indexBuffer = que.poll(); if(indexBuffer != null) { long beginOffset = offset; long bufLen; if(!que.isEmpty()) { que.drainTo(l, DRAIN_TO); l.add(indexBuffer); bufLen = writeIndexBuffers(l); for(IndexBuffer id : l) { id.reset(); } freeBufQue.addAll(l); l.clear(); if(poisonSeen) { break; } } else { if(indexBuffer.isPoison()) { break; } bufLen = writeIndexBuffer(indexBuffer); indexBuffer.reset(); freeBufQue.add(indexBuffer); } offset += bufLen; indexMgr.addBuffer(new Buffer(beginOffset, offset, bufCount)); bufCount++; bufferCount.incrementAndGet(); } } dataStream.close(); indexMgr.setFileLength(); } catch (IOException e) { Message msg = ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(file.getName(), e.getMessage()); logError(msg); } } private long writeIndexBuffer(IndexBuffer indexBuffer) throws IOException { int numKeys = indexBuffer.getNumberKeys(); indexBuffer.setPos(-1); long bufLen = 0; byteStream.reset(); for(int i = 0; i < numKeys; i++) { if(indexBuffer.getPos() == -1) { indexBuffer.setPos(i); byteStream.write(indexBuffer.getID(i)); continue; } if(!indexBuffer.compare(i)) { int recLen = indexBuffer.getKeySize(); recLen += byteStream.size(); recLen += 8; bufLen += recLen; indexBuffer.writeKey(dataStream); dataStream.writeInt(byteStream.size()); byteStream.writeTo(dataStream); indexBuffer.setPos(i); byteStream.reset(); } byteStream.write(indexBuffer.getID(i)); } if(indexBuffer.getPos() != -1) { int recLen = indexBuffer.getKeySize(); recLen += byteStream.size(); recLen += 8; bufLen += recLen; indexBuffer.writeKey(dataStream); dataStream.writeInt(byteStream.size()); byteStream.writeTo(dataStream); } return bufLen; } private long writeIndexBuffers(List<IndexBuffer> buffers) throws IOException { long id = 0; long bufLen = 0; byteStream.reset(); for(IndexBuffer b : buffers) { if(b.isPoison()) { poisonSeen = true; } else { b.setPos(0); b.setID(id++); indexSortedSet.add(b); } } byte[] saveKey = null; while(!indexSortedSet.isEmpty()) { IndexBuffer b = indexSortedSet.first(); indexSortedSet.remove(b); byte[] key = b.getKeyBytes(b.getPos()); if(saveKey == null) { saveKey = key; byteStream.write(b.getID(b.getPos())); } else { if(!b.compare(saveKey)) { int recLen = saveKey.length; recLen += byteStream.size(); recLen += 8; bufLen += recLen; dataStream.writeInt(saveKey.length); dataStream.write(saveKey); dataStream.writeInt(byteStream.size()); byteStream.writeTo(dataStream); byteStream.reset(); saveKey = key; byteStream.write(b.getID(b.getPos())); } else { byteStream.write(b.getID(b.getPos())); } } if(b.hasMoreData()) { b.getNextRecord(); indexSortedSet.add(b); } } if(saveKey != null) { int recLen = saveKey.length; recLen += byteStream.size(); recLen += 8; bufLen += recLen; dataStream.writeInt(saveKey.length); dataStream.write(saveKey); dataStream.writeInt(byteStream.size()); byteStream.writeTo(dataStream); } return bufLen; } } /** * This task main function is to sort the index buffers given to it from * the import tasks reading the LDIF file. It will also create a index * file writer task and corresponding queue if needed. The sorted index * buffers are put on the index file writer queues for writing to a temporary * file. */ private final class SortTask implements Callable<Void> { private final IndexBuffer indexBuffer; public SortTask(IndexBuffer indexBuffer) { this.indexBuffer = indexBuffer; } /** * {@inheritDoc} */ public Void call() throws Exception { if (config.isCancelled()) { return null; } indexBuffer.sort(); if(containerQueMap.containsKey(indexBuffer.getContainer())) { BlockingQueue<IndexBuffer> q = containerQueMap.get(indexBuffer.getContainer()); q.add(indexBuffer); } else { DatabaseContainer container = indexBuffer.getContainer(); EntryContainer entryContainer = indexBuffer.getEntryContainer(); createIndexWriterTask(container, entryContainer); BlockingQueue<IndexBuffer> q = containerQueMap.get(container); q.add(indexBuffer); } return null; } private void createIndexWriterTask(DatabaseContainer container, EntryContainer entryContainer) throws FileNotFoundException { synchronized(container) { if(containerQueMap.containsKey(container)) { return; } IndexManager indexMgr; if(container instanceof Index) { Index index = (Index) container; indexMgr = new IndexManager(index); } else { DN2ID dn2id = (DN2ID) container; indexMgr = new IndexManager(dn2id, entryContainer); } containerIndexMgrMap.put(container, indexMgr); BlockingQueue<IndexBuffer> newQue = new ArrayBlockingQueue<IndexBuffer>(threadCount + 5); IndexFileWriterTask indexWriter = new IndexFileWriterTask(newQue, indexMgr); indexWriterList.add(indexWriter); indexWriterFutures.add(indexProcessService.submit(indexWriter)); containerQueMap.put(container, newQue); } } } /** * The buffer class is used to process a buffer from the temporary index files * during phase 2 processing. */ private final class Buffer implements Comparable<Buffer> { private IndexManager indexMgr; private final long begin, end, id; private long offset; private ByteBuffer cache; private int keyLen, idLen; private byte[] key; private ImportIDSet idSet; public Buffer(long begin, long end, long id) { this.begin = begin; this.end = end; this.offset = 0; this.id = id; } private void init(IndexManager indexMgr, ByteBuffer b, long cacheSize) throws IOException { this.indexMgr = indexMgr; if(b == null) { cache = ByteBuffer.allocate((int)cacheSize); } else { cache = b; } loadCache(); cache.flip(); getNextRecord(); } private void loadCache() throws IOException { FileChannel fileChannel = indexMgr.getChannel(); fileChannel.position(begin + offset); long leftToRead = end - (begin + offset); long bytesToRead; if(leftToRead < cache.remaining()) { int pos = cache.position(); cache.limit((int) (pos + leftToRead)); bytesToRead = (int)leftToRead; } else { bytesToRead = Math.min((end - offset),cache.remaining()); } int bytesRead = 0; while(bytesRead < bytesToRead) { bytesRead += fileChannel.read(cache); } offset += bytesRead; indexMgr.addBytesRead(bytesRead); } public boolean hasMoreData() throws IOException { boolean ret = ((begin + offset) >= end) ? true: false; if(cache.remaining() == 0 && ret) { return false; } else { return true; } } public byte[] getKey() { return key; } public ImportIDSet getIDSet() { return idSet; } public long getBufID() { return id; } public void getNextRecord() throws IOException { getNextKey(); getNextIDSet(); } private int getInt() throws IOException { ensureData(4); return cache.getInt(); } private long getLong() throws IOException { ensureData(8); return cache.getLong(); } private void getBytes(byte[] b) throws IOException { ensureData(b.length); cache.get(b); } private void getNextKey() throws IOException, BufferUnderflowException { keyLen = getInt(); key = new byte[keyLen]; getBytes(key); } private void getNextIDSet() throws IOException, BufferUnderflowException { idLen = getInt(); int idCount = idLen/8; idSet = new ImportIDSet(idCount); for(int i = 0; i < idCount; i++) { long l = getLong(); idSet.addEntryID(l, indexMgr.getLimit(), indexMgr.getMaintainCount()); } } private void ensureData(int len) throws IOException { if(cache.remaining() == 0) { cache.clear(); loadCache(); cache.flip(); } else if(cache.remaining() < len) { cache.compact(); loadCache(); cache.flip(); } } public int compareTo(Buffer o) { if(key == null) { if(id == o.getBufID()) { return 0; } else { return id > o.getBufID() ? 1 : -1; } } if(this.equals(o)) { return 0; } int rc = indexMgr.getComparator().compare(key, o.getKey()); if(rc == 0) { if(idSet.isDefined()) { return -1; } else if(o.getIDSet().isDefined()) { return 1; } else if(idSet.size() == o.getIDSet().size()) { rc = id > o.getBufID() ? 1 : -1; } else { rc = idSet.size() - o.getIDSet().size(); } } return rc; } } /** * The index manager class is used to carry information about index processing * from phase 1 to phase 2. */ private final class IndexManager { private final Index index; private final DN2ID dn2id; private final EntryContainer entryContainer; private final File file; private RandomAccessFile raf = null; private final List<Buffer> bufferList = new LinkedList<Buffer>(); private final int limit; private long fileLength, bytesRead = 0; private final boolean maintainCount; private final Comparator<byte[]> comparator; private boolean done = false; private long totalDNS; private AtomicInteger keyCount = new AtomicInteger(0); private final String name; public IndexManager(Index index) { this.index = index; dn2id = null; file = new File(tempDir, index.getName()); name = index.getName(); limit = index.getIndexEntryLimit(); maintainCount = index.getMaintainCount(); comparator = index.getComparator(); entryContainer = null; } public IndexManager(DN2ID dn2id, EntryContainer entryContainer) { index = null; this.dn2id = dn2id; file = new File(tempDir, dn2id.getName()); limit = 1; maintainCount = false; comparator = dn2id.getComparator(); this.entryContainer = entryContainer; name = dn2id.getName(); } public void init() throws FileNotFoundException { raf = new RandomAccessFile(file, "r"); } public FileChannel getChannel() { return raf.getChannel(); } public void addBuffer(Buffer o) { this.bufferList.add(o); } public List<Buffer> getBufferList() { return bufferList; } public File getFile() { return file; } public void deleteIndexFile() { file.delete(); } public void close() throws IOException { raf.close(); } public int getLimit() { return limit; } public boolean getMaintainCount() { return maintainCount; } public Comparator<byte[]> getComparator() { return comparator; } public Index getIndex() { return index; } public void setFileLength() { this.fileLength = file.length(); } public void addBytesRead(int bytesRead) { this.bytesRead += bytesRead; } public void setDone() { this.done = true; } public void addTotDNCount(int delta) { this.totalDNS += delta; } public long getTotDNCount() { return totalDNS; } public void printStats(long deltaTime) { if(!done) { float rate = 1000f * keyCount.getAndSet(0) / deltaTime; Message msg = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(name, (fileLength - bytesRead), rate); logError(msg); } } public void incrKeyCount() { keyCount.incrementAndGet(); } } /** * This class reports progress of the import job at fixed intervals. */ private final class ProgressTask extends TimerTask private final class FirstPhaseProgressTask extends TimerTask { /** * The number of entries that had been read at the time of the @@ -993,89 +1795,72 @@ private EnvironmentStats prevEnvStats; /** * The number of bytes in a megabyte. * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB). * The number of bytes in a megabyte. Note that 1024*1024 bytes may * eventually become known as a mebibyte(MiB). */ public static final int bytesPerMegabyte = 1024*1024; public static final int bytesPerMegabyte = 1024 * 1024; //Determines if the ldif is being read. // Determines if the ldif is being read. private boolean ldifRead = false; //Determines if eviction has been detected. // Determines if eviction has been detected. private boolean evicting = false; //Entry count when eviction was detected. // Entry count when eviction was detected. private long evictionEntryCount = 0; //Suspend output. // Suspend output. private boolean pause = false; /** * Create a new import progress task. * @throws DatabaseException If an error occurs in the JE database. */ public ProgressTask() throws DatabaseException public FirstPhaseProgressTask() { previousTime = System.currentTimeMillis(); prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig()); try { prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig()); } catch (DatabaseException e) { throw new RuntimeException(e); } } /** * Return if reading the LDIF file. */ public void ldifRead() { ldifRead=true; } /** * Return value of evicting flag. * * @return <CODE>True</CODE> if eviction is detected. */ public boolean isEvicting() { return evicting; } /** * Return count of entries when eviction was detected. * * @return The entry count when eviction was detected. */ public long getEvictionEntryCount() { return evictionEntryCount; } /** * Suspend output if true. * * @param v The value to set the suspend value to. */ public void setPause(boolean v) { pause=v; } /** * The action to be performed by this timer task. */ public void run() { @Override public void run() { long latestCount = reader.getEntriesRead() + 0; long deltaCount = (latestCount - previousCount); long latestTime = System.currentTimeMillis(); long deltaTime = latestTime - previousTime; Message message; if (deltaTime == 0) { if (deltaTime == 0) { return; } if(pause) { if (pause) { return; } if(!ldifRead) { long numRead = reader.getEntriesRead(); long numIgnored = reader.getEntriesIgnored(); if (!ldifRead) { long numRead = reader.getEntriesRead(); long numIgnored = reader.getEntriesIgnored(); long numRejected = reader.getEntriesRejected(); float rate = 1000f*deltaCount / deltaTime; message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get( numRead, numIgnored, numRejected, 0, rate); float rate = 1000f * deltaCount / deltaTime; message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get(numRead, numIgnored, numRejected, 0, rate); logError(message); } try @@ -1083,16 +1868,18 @@ Runtime runtime = Runtime.getRuntime(); long freeMemory = runtime.freeMemory() / bytesPerMegabyte; EnvironmentStats envStats = rootContainer.getEnvironmentStats(new StatsConfig()); rootContainer.getEnvironmentStats(new StatsConfig()); long nCacheMiss = envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); float cacheMissRate = 0; if (deltaCount > 0) { cacheMissRate = nCacheMiss/(float)deltaCount; if (deltaCount > 0) { cacheMissRate = nCacheMiss / (float) deltaCount; } message = NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get( freeMemory, cacheMissRate); message = NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory, cacheMissRate); logError(message); long evictPasses = envStats.getNEvictPasses(); long evictNodes = envStats.getNNodesExplicitlyEvicted(); @@ -1102,37 +1889,196 @@ long cleanerEntriesRead = envStats.getNCleanerEntriesRead(); long cleanerINCleaned = envStats.getNINsCleaned(); long checkPoints = envStats.getNCheckpoints(); if(evictPasses != 0) { if(!evicting) { evicting=true; if(!ldifRead) { evictionEntryCount=reader.getEntriesRead(); if (evictPasses != 0) { if (!evicting) { evicting = true; if (!ldifRead) { evictionEntryCount = reader.getEntriesRead(); message = NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount); NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED .get(evictionEntryCount); logError(message); } } message = NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses, evictNodes, evictBinsStrip); NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get( evictPasses, evictNodes, evictBinsStrip); logError(message); } if(cleanerRuns != 0) { message = NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns, cleanerDeletions, cleanerEntriesRead, cleanerINCleaned); if (cleanerRuns != 0) { message = NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns, cleanerDeletions, cleanerEntriesRead, cleanerINCleaned); logError(message); } if(checkPoints > 1) { message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints); if (checkPoints > 1) { message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints); logError(message); } prevEnvStats = envStats; } catch (DatabaseException e) { } catch (DatabaseException e) { // Unlikely to happen and not critical. } previousCount = latestCount; previousTime = latestTime; } } } /** * This class reports progress of the import job at fixed intervals. */ private final class SecondPhaseProgressTask extends TimerTask { /** * The number of entries that had been read at the time of the * previous progress report. */ private long previousCount = 0; /** * The time in milliseconds of the previous progress report. */ private long previousTime; /** * The environment statistics at the time of the previous report. */ private EnvironmentStats prevEnvStats; /** * The number of bytes in a megabyte. Note that 1024*1024 bytes may * eventually become known as a mebibyte(MiB). */ public static final int bytesPerMegabyte = 1024 * 1024; // Determines if eviction has been detected. private boolean evicting = false; // Suspend output. private boolean pause = false; private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap; /** * Create a new import progress task. * @param containerIndexMgrMap Map of database container objects to * index manager objects. */ public SecondPhaseProgressTask(Map<DatabaseContainer, IndexManager> containerIndexMgrMap) { previousTime = System.currentTimeMillis(); this.containerIndexMgrMap = containerIndexMgrMap; try { prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig()); } catch (DatabaseException e) { throw new RuntimeException(e); } } /** * The action to be performed by this timer task. */ @Override public void run() { long latestCount = reader.getEntriesRead() + 0; long deltaCount = (latestCount - previousCount); long latestTime = System.currentTimeMillis(); long deltaTime = latestTime - previousTime; Message message; if (deltaTime == 0) { return; } if (pause) { return; } try { Runtime runtime = Runtime.getRuntime(); long freeMemory = runtime.freeMemory() / bytesPerMegabyte; EnvironmentStats envStats = rootContainer.getEnvironmentStats(new StatsConfig()); long nCacheMiss = envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); float cacheMissRate = 0; if (deltaCount > 0) { cacheMissRate = nCacheMiss / (float) deltaCount; } message = NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory, cacheMissRate); logError(message); long evictPasses = envStats.getNEvictPasses(); long evictNodes = envStats.getNNodesExplicitlyEvicted(); long evictBinsStrip = envStats.getNBINsStripped(); long cleanerRuns = envStats.getNCleanerRuns(); long cleanerDeletions = envStats.getNCleanerDeletions(); long cleanerEntriesRead = envStats.getNCleanerEntriesRead(); long cleanerINCleaned = envStats.getNINsCleaned(); long checkPoints = envStats.getNCheckpoints(); if (evictPasses != 0) { if (!evicting) { evicting = true; } message = NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get( evictPasses, evictNodes, evictBinsStrip); logError(message); } if (cleanerRuns != 0) { message = NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns, cleanerDeletions, cleanerEntriesRead, cleanerINCleaned); logError(message); } if (checkPoints > 1) { message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints); logError(message); } prevEnvStats = envStats; } catch (DatabaseException e) { // Unlikely to happen and not critical. } previousCount = latestCount; previousTime = latestTime; for(Map.Entry<DatabaseContainer, IndexManager> e : containerIndexMgrMap.entrySet()) { IndexManager indexMgr = e.getValue(); indexMgr.printStats(deltaTime); } } } } opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java
New file @@ -0,0 +1,753 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.backends.jeb.importLDIF; import java.io.DataOutputStream; import java.io.IOException; import org.opends.server.backends.jeb.*; /** * This class is used to hold the keys read from the LDIF file during * phase 1. The keys are sorted and written to an temporary index file. * */ public class IndexBuffer implements Comparable<IndexBuffer> { /** * Enumeration used when sorting a buffer. */ private enum CompareOp { LT, GT, LE, GE, EQ } //The size of a buffer. private final int size; //Byte array holding the actual buffer data. private final byte buffer[]; //id is used to break a tie (keys equal) when the buffers are being merged //when writing. private long id; //Temporaty buffers. private final byte[] intBytes = new byte[4]; private final byte[] idBytes = new byte[8]; //keyPtr - offSet where next key is written //recPtr - offSet where next value record is written //bytesLeft - amount of bytes left in the buffer private int keyPtr=0, recPtr=0, bytesLeft = 0; //keys - number of keys in the buffer //pos - used to iterate over the buffer when writing to a file. private int keys = 0, pos = 0; //Various things needed to process a buffer. private ComparatorBuffer<byte[]> comparator; private DatabaseContainer container; private EntryContainer entryContainer; private IndexBuffer(int size) { this.size = size; this.buffer = new byte[size]; this.bytesLeft = size; this.recPtr = size - 1; } /** * Create an instance of a IndexBuffer using the specified size. * * @param size The size of the underlying byte array. * @return A newly created instance of an IndexBuffer. */ public static IndexBuffer createIndexBuffer(int size) { return new IndexBuffer(size); } /** * Reset an IndexBuffer so it can be re-used. */ public void reset() { bytesLeft = size; keyPtr = 0; recPtr = size - 1; keys = 0; pos = 0; container = null; entryContainer = null; comparator = null; } /** * Compare current IndexBuffer to the one in the specified argument. The key * at the value of pos in both buffers are used in the comparision. * * @param b The IndexBuffer to compare to. * @return 0 if the buffers are equal, -1 if the current buffer is less * than the specified buffer, or 1 if it is greater. */ public int compareTo(IndexBuffer b) { byte[] key2 = b.getKeyBytes(b.getPos()); int xKeyOffset = pos * 4; int xOffset = getValue(xKeyOffset); int xLen = getValue(xOffset); xOffset += 4; int rc = comparator.compare(buffer, xOffset, xLen, key2); if(rc == 0) { if(this.id == b.getBufID()) { rc = 0; } else if(this.id < b.getBufID()) { rc = -1; } else { rc = 1; } } return rc; } /** * Set the ID of a buffer to the specified value. * * @param id The value to set the ID to. */ public void setID(long id) { this.id = id; } /** * Determines if a buffer is a posion buffer. A posion buffer is used to * shutdown work queues when the LDIF reader is completed. A poison buffer * has a 0 size. * * @return <CODE>True</CODE> if a buffer is a poison buffer. */ public boolean isPoison() { return (size == 0); } /** * Return the ID of a buffer. * * @return The value of a buffer's ID. */ public long getBufID() { return this.id; } /** * Set the DB container to be used in the buffer processing to the specified * value. * * @param container The DB container to set a buffer's container to. */ public void setContainer(DatabaseContainer container) { this.container = container; } /** * Return the DB container value of a buffer. * * @return The DB container value of a buffer. */ public DatabaseContainer getContainer() { return this.container; } /** * Determine is there enough space available to write the specified byte array * in the buffer. * * @param keyBytes The byte array to check space against. * @return <CODE>True</CODE> if there is space to write the byte array in a * buffer. */ public boolean isSpaceAvailable(byte[] keyBytes) { int recLen = 4 + keyBytes.length + 8; return (recLen + 4) < bytesLeft; } /** * Set the comparator to be used in the buffer processing to the specified * value. * * @param comparator The comparator to set the buffer's comparator to. */ public void setComparator(ComparatorBuffer<byte[]> comparator) { this.comparator = comparator; } /** * Set an buffer's entry container to the specified paramter. * * @param entryContainer The entry container to set the buffer' container to. */ public void setEntryContainer(EntryContainer entryContainer) { this.entryContainer = entryContainer; } /** * Return a buffer's entry container value. * * @return The buffer's entry container value. */ public EntryContainer getEntryContainer() { return entryContainer; } /** * Return a buffer's current pos value. * * @return The buffer's current pos value. */ public int getPos() { return pos; } /** * Set a buffer's pos value to the specified value. * * @param mark The value to set the pos to. */ public void setPos(int mark) { this.pos = mark; } /** * Sort the buffer. */ public void sort() { sort(0, keys); } /** * Add the specifed key byte array and EntryID to the buffer. * * @param keyBytes The key byte array. * @param IDEntry The EntryID. */ public void add(byte[] keyBytes, EntryID IDEntry) { byte[] idBytes = JebFormat.entryIDToDatabase(IDEntry.longValue()); int recLen = 4 + keyBytes.length + 8; recPtr -= recLen; System.arraycopy(getBytes(recPtr), 0, buffer, keyPtr, 4); keyPtr += 4; System.arraycopy(getBytes(keyBytes.length), 0, buffer, recPtr, 4); System.arraycopy(keyBytes, 0, buffer, (recPtr+4), keyBytes.length); System.arraycopy(idBytes, 0, buffer, (recPtr + 4 + keyBytes.length), 8); bytesLeft = recPtr - keyPtr; keys++; } /** * Return the byte array representing the entry ID * at the specified index value. * * @param index The index value to retrieve. * @return The byte array at the index value. */ public byte[] getID(int index) { int offset = index * 4; int recOffset = getValue(offset); int dnLen = getValue(recOffset); System.arraycopy(buffer, recOffset + 4 + dnLen, idBytes, 0, 8); return idBytes; } /** * Compare the byte array at the current pos with the specified one. * * @param b The byte array to compare. * @return <CODE>True</CODE> if the byte arrays are equal. */ public boolean compare(byte[] b) { return is(pos, b, CompareOp.EQ); } /** * Compare the byte array at the current pos with the byte array at the * specified index. * * @param i The index pointing to the byte array to compare. * @return <CODE>True</CODE> if the byte arrays are equal. */ public boolean compare(int i) { return is(i, pos, CompareOp.EQ); } /** * Return the number of keys in an index buffer. * * @return The number of keys currently in an index buffer. */ public int getNumberKeys() { return keys; } /** * Write a key to an output stream. * * @param out The stream to write the key to. * * @throws IOException If there was an error writing the key. */ public void writeKey(DataOutputStream out) throws IOException { int offSet = pos * 4; int recOffset = getValue(offSet); int len = getValue(recOffset); out.writeInt(len); out.write(buffer, recOffset + 4, len); } /** * Return the size of the key part of the record. * * @return The size of the key part of the record. */ public int getKeySize() { int offSet = pos * 4; int recOffset = getValue(offSet); return getValue(recOffset); } /** * Return the key value part of a record specifed by the index. * * @param index The index to return the key value of. * @return byte array containing the key value. */ public byte[] getKeyBytes(int index) { int offSet = index * 4; int recOffset = getValue(offSet); int dnLen = getValue(recOffset); byte[] b = new byte[dnLen]; System.arraycopy(buffer, recOffset + 4, b, 0, dnLen); return b; } /** * Return if the buffer has more data. Used when iterating over the * buffer examining keys. * * @return <CODE>True</CODE> if the buffer has more data to process. */ public boolean hasMoreData() { return (pos + 1) < keys ? true : false; } /** * Move to the next record in the buffer. Used when iterating over the * buffer examining keys. */ public void getNextRecord() { pos++; } private byte[] getBytes(int val) { for (int i = 3; i >= 0; i--) { intBytes[i] = (byte) (val & 0xff); val >>>= 8; } return intBytes; } private int getValue(int pos) { int answer = 0; for (int i = 0; i < 4; i++) { byte b = buffer[pos + i]; answer <<= 8; answer |= (b & 0xff); } return answer; } private boolean is(int x, int y, CompareOp op) { int xKeyOffset = x * 4; int xOffset = getValue(xKeyOffset); int xLen = getValue(xOffset); xOffset += 4; int yKeyOffset = y * 4; int yOffset = getValue(yKeyOffset); int yLen = getValue(yOffset); yOffset += 4; return eval(comparator.compare(buffer, xOffset, xLen, yOffset, yLen), op); } private boolean is(int x, byte[] m, CompareOp op) { int xKeyOffset = x * 4; int xOffset = getValue(xKeyOffset); int xLen = getValue(xOffset); xOffset += 4; return eval(comparator.compare(buffer, xOffset, xLen, m), op); } private int med3(int a, int b, int c) { return (is(a,b, CompareOp.LT) ? (is(b,c,CompareOp.LT) ? b : is(a,c,CompareOp.LT) ? c : a) : (is(b,c,CompareOp.GT) ? b :is(a,c,CompareOp.GT) ? c : a)); } private void sort(int off, int len) { if (len < 7) { for (int i=off; i<len+off; i++) for (int j=i; j>off && is(j-1, j, CompareOp.GT); j--) swap(j, j-1); return; } int m = off + (len >> 1); if (len > 7) { int l = off; int n = off + len - 1; if (len > 40) { int s = len/8; l = med3(l, l+s, l+2*s); m = med3(m-s, m, m+s); n = med3(n-2*s, n-s, n); } m = med3(l, m, n); } byte[] mKey = this.getKeyBytes(m); int a = off, b = a, c = off + len - 1, d = c; while(true) { while (b <= c && is(b, mKey, CompareOp.LE)) { if (is(b, mKey, CompareOp.EQ)) swap(a++, b); b++; } while (c >= b && is(c, mKey, CompareOp.GE)) { if (is(c, mKey, CompareOp.EQ)) swap(c, d--); c--; } if (b > c) break; swap(b++, c--); } // Swap partition elements back to middle int s, n = off + len; s = Math.min(a-off, b-a ); vecswap(off, b-s, s); s = Math.min(d-c, n-d-1); vecswap(b, n-s, s); // Recursively sort non-partition-elements if ((s = b-a) > 1) sort(off, s); if ((s = d-c) > 1) sort(n-s, s); } private void swap(int a, int b) { int aOffset = a * 4; int bOffset = b * 4; int bVal = getValue(bOffset); System.arraycopy(buffer, aOffset, buffer, bOffset, 4); System.arraycopy(getBytes(bVal), 0, buffer, aOffset, 4); } private void vecswap(int a, int b, int n) { for (int i=0; i<n; i++, a++, b++) swap(a, b); } private boolean eval(int rc, CompareOp op) { boolean retVal = false; switch(op) { case LT: retVal = rc < 0; break; case GT: retVal = rc > 0; break; case LE: retVal = rc <= 0; break; case GE: retVal = rc >= 0; break; case EQ: retVal = rc == 0; break; } return retVal; } /** * Inteface that defines two methods used to compare keys used in this * class. The Comparator interface cannot be used in this class, so this * special one is used that knows about the special properties of this class. * * @param <T> object to use in the comparisions */ public static interface ComparatorBuffer<T> { /** * Compare two offsets in an object, usually a byte array. * * @param o The object. * @param offset The first offset. * @param len The first length. * @param offset1 The second offset. * @param len1 The second length. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second. */ int compare(T o, int offset, int len, int offset1, int len1); /** * Compare an offset in an object with the specified object. * * @param o The first object. * @param offset The first offset. * @param len The first length. * @param o2 The second object. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second * object. */ int compare(T o, int offset, int len, T o2); } /** * Implementation of ComparatorBuffer interface. Used to compare keys when * they are DNs. */ public static class DNComparator implements IndexBuffer.ComparatorBuffer<byte[]> { /** * Compare two offsets in an byte array using the DN comparision algorithm. * * @param b The byte array. * @param offset The first offset. * @param len The first length. * @param offset1 The second offset. * @param len1 The second length. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second. */ public int compare(byte[] b, int offset, int len, int offset1, int len1) { for (int ai = len - 1, bi = len1 - 1; ai >= 0 && bi >= 0; ai--, bi--) { if (b[offset + ai] > b[offset1 + bi]) { return 1; } else if (b[offset + ai] < b[offset1 + bi]) { return -1; } } if(len == len1) { return 0; } if(len > len1) { return 1; } else { return -1; } } /** * Compare an offset in an byte array with the specified byte array, * using the DN comparision algorithm. * * @param b The byte array. * @param offset The first offset. * @param len The first length. * @param m The second byte array to compare to. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second * byte array. */ public int compare(byte[] b, int offset, int len, byte[]m) { int len1 = m.length; for (int ai = len - 1, bi = len1 - 1; ai >= 0 && bi >= 0; ai--, bi--) { if (b[offset + ai] > m[bi]) { return 1; } else if (b[offset + ai] < m[bi]) { return -1; } } if(len == len1) { return 0; } if(len > len1) { return 1; } else { return -1; } } } /** * Implementation of ComparatorBuffer interface. Used to compare keys when * they are regular indexes. */ public static class IndexComparator implements IndexBuffer.ComparatorBuffer<byte[]> { /** * Compare two offsets in an byte array using the index comparision * algorithm. * * @param b The byte array. * @param offset The first offset. * @param len The first length. * @param offset1 The second offset. * @param len1 The second length. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second. */ public int compare(byte[] b, int offset, int len, int offset1, int len1) { for(int i = 0; i < len && i < len1; i++) { if(b[offset + i] > b[offset1 + i]) { return 1; } else if (b[offset + i] < b[offset1 + i]) { return -1; } } if(len == len1) { return 0; } if (len > len1) { return 1; } else { return -1; } } /** * Compare an offset in an byte array with the specified byte array, * using the DN comparision algorithm. * * @param b The byte array. * @param offset The first offset. * @param len The first length. * @param m The second byte array to compare to. * @return a negative integer, zero, or a positive integer as the first * offset value is less than, equal to, or greater than the second * byte array. */ public int compare(byte[] b, int offset, int len, byte[] m) { int len1 = m.length; for(int i = 0; i < len && i < len1; i++) { if(b[offset + i] > m[i]) { return 1; } else if (b[offset + i] < m[i]) { return -1; } } if(len == len1) { return 0; } if (len > len1) { return 1; } else { return -1; } } } } opends/src/server/org/opends/server/backends/jeb/importLDIF/IntegerImportIDSet.java
File was deleted opends/src/server/org/opends/server/backends/jeb/importLDIF/LongImportIDSet.java
File was deleted opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java
New file @@ -0,0 +1,397 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.backends.jeb.importLDIF; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.opends.server.backends.jeb.*; import org.opends.server.config.ConfigException; import org.opends.server.types.*; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.LockMode; /** * The class represents a suffix. OpenDS backends can have multiple suffixes. */ public class Suffix { private final RootContainer rootContainer; private final LDIFImportConfig config; private final List<DN> includeBranches = new ArrayList<DN>(); private final List<DN> excludeBranches = new ArrayList<DN>(); private final DN baseDN; private EntryContainer srcEntryContainer = null; private EntryContainer entryContainer; private boolean exclude = false; private final Object synchObject = new Object(); private static final int PARENT_ID_MAP_SIZE = 4096; private ConcurrentHashMap<DN,DN> pendingMap = new ConcurrentHashMap<DN, DN>() ; private HashMap<DN,EntryID> parentIDMap = new HashMap<DN,EntryID>(PARENT_ID_MAP_SIZE); private DN parentDN; private ArrayList<EntryID> IDs; private Suffix(EntryContainer entryContainer, LDIFImportConfig config, RootContainer rootContainer) throws InitializationException, ConfigException { this.rootContainer = rootContainer; this.entryContainer = entryContainer; this.config = config; this.baseDN = entryContainer.getBaseDN(); init(); } /** * Creates a suffix instance using the specified parameters. * * @param entryContainer The entry container pertaining to the suffix. * @param config The import config instance. * @param rootContainer The root container. * * @return A suffix instance. * @throws InitializationException If the suffix cannot be initialized. * @throws ConfigException If an error occured reading the configuration. */ public static Suffix createSuffixContext(EntryContainer entryContainer, LDIFImportConfig config, RootContainer rootContainer) throws InitializationException, ConfigException { return new Suffix(entryContainer, config, rootContainer); } /** * Returns the DN2ID instance pertaining to a suffix instance. * * @return A DN2ID instance that can be used to manipulate the DN2ID database. */ public DN2ID getDN2ID() { return entryContainer.getDN2ID(); } /** * Returns the ID2Entry instance pertaining to a suffix instance. * * @return A ID2Entry instance that can be used to manipulate the ID2Entry * database. */ public ID2Entry getID2Entry() { return entryContainer.getID2Entry(); } /** * Returns the DN2URI instance pertaining to a suffix instance. * * @return A DN2URI instance that can be used to manipulate the DN2URI * database. */ public DN2URI getDN2URI() { return entryContainer.getDN2URI(); } /** * Returns the entry container pertaining to a suffix instance. * * @return The entry container used to create a suffix instance. */ public EntryContainer getEntryContainer() { return entryContainer; } private void init() throws InitializationException, ConfigException { if(!config.appendToExistingData() && !config.clearBackend()) { for(DN dn : config.getExcludeBranches()) { if(baseDN.equals(dn)) exclude = true; if(baseDN.isAncestorOf(dn)) excludeBranches.add(dn); } if(!config.getIncludeBranches().isEmpty()) { for(DN dn : config.getIncludeBranches()) { if(baseDN.isAncestorOf(dn)) includeBranches.add(dn); } if(includeBranches.isEmpty()) this.exclude = true; // Remove any overlapping include branches. Iterator<DN> includeBranchIterator = includeBranches.iterator(); while(includeBranchIterator.hasNext()) { DN includeDN = includeBranchIterator.next(); boolean keep = true; for(DN dn : includeBranches) { if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN)) { keep = false; break; } } if(!keep) includeBranchIterator.remove(); } // Remove any exclude branches that are not are not under a include // branch since they will be migrated as part of the existing entries // outside of the include branches anyways. Iterator<DN> excludeBranchIterator = excludeBranches.iterator(); while(excludeBranchIterator.hasNext()) { DN excludeDN = excludeBranchIterator.next(); boolean keep = false; for(DN includeDN : includeBranches) { if(includeDN.isAncestorOf(excludeDN)) { keep = true; break; } } if(!keep) excludeBranchIterator.remove(); } try { if(includeBranches.size() == 1 && excludeBranches.size() == 0 && includeBranches.get(0).equals(baseDN)) { // This entire base DN is explicitly included in the import with // no exclude branches that we need to migrate. Just clear the entry // container. entryContainer.lock(); entryContainer.clear(); entryContainer.unlock(); } else { // Create a temporary entry container srcEntryContainer = entryContainer; String tmpName = baseDN.toNormalizedString() +"_importTmp"; entryContainer = rootContainer.openEntryContainer(baseDN, tmpName); } } catch (DatabaseException e) { // Message msg = ERR_CONFIG_IMPORT_SUFFIX_ERROR.get(e.getMessage()); // throw new InitializationException(msg); } } } } /** * Return the Attribute Type - Index map used to map an attribute type to an * index instance. * * @return A suffixes Attribute Type - Index map. */ public Map<AttributeType, AttributeIndex> getAttrIndexMap() { return entryContainer.getAttributeIndexMap(); } /** * Check if the parent DN is in the pending map. * * @param parentDN The DN of the parent. * @return <CODE>True</CODE> if the parent is in the pending map. */ private boolean isPending(DN parentDN) { boolean ret = false; if(pendingMap.containsKey(parentDN)) { ret = true; } return ret; } /** * Add specified DN to the pending map. * * @param dn The DN to add to the map. */ public void addPending(DN dn) { pendingMap.putIfAbsent(dn, dn); } /** * Remove the specified DN from the pending map. * * @param dn The DN to remove from the map. */ public void removePending(DN dn) { pendingMap.remove(dn); } /** * Return the entry ID related to the specified entry DN. First the instance's * cache of parent IDs is checked, if it isn't found then the DN2ID is * searched. * * @param parentDN The DN to get the id for. * @return The entry ID related to the parent DN, or null if the id wasn't * found in the cache or dn2id database. * * @throws DatabaseException If an error occurred search the dn2id database. */ public EntryID getParentID(DN parentDN) throws DatabaseException { EntryID parentID; synchronized(synchObject) { parentID = parentIDMap.get(parentDN); if (parentID != null) { return parentID; } } int i=0; //If the parent is in the pending map, another thread is working on the //parent entry; wait 500 ms until that thread is done with the parent. while(isPending(parentDN)) { try { Thread.sleep(50); if(i == 10) { System.out.println("Timed out waiting for: " + parentDN.toString()); return null; } i++; } catch (Exception e) { System.out.println("Exception: " + parentDN.toString()); return null; } } parentID = entryContainer.getDN2ID().get(null, parentDN, LockMode.DEFAULT); //If the parent is in dn2id, add it to the cache. if (parentID != null) { synchronized(synchObject) { if (parentIDMap.size() >= PARENT_ID_MAP_SIZE) { Iterator<DN> iterator = parentIDMap.keySet().iterator(); iterator.next(); iterator.remove(); } parentIDMap.put(parentDN, parentID); } } else { System.out.println("parent not found: " + parentDN.toString()); } return parentID; } /** * Sets all of the indexes, vlvIndexes, id2children and id2subtree indexes to * trusted. * * @throws DatabaseException If an error occurred setting the indexes to * trusted. */ public void setIndexesTrusted() throws DatabaseException { entryContainer.getID2Children().setTrusted(null,true); entryContainer.getID2Subtree().setTrusted(null, true); for(AttributeIndex attributeIndex : entryContainer.getAttributeIndexes()) { Index index; if((index = attributeIndex.getEqualityIndex()) != null) { index.setTrusted(null, true); } if((index=attributeIndex.getPresenceIndex()) != null) { index.setTrusted(null, true); } if((index=attributeIndex.getSubstringIndex()) != null) { index.setTrusted(null, true); } if((index=attributeIndex.getOrderingIndex()) != null) { index.setTrusted(null, true); } if((index=attributeIndex.getApproximateIndex()) != null) { index.setTrusted(null, true); } } for(VLVIndex vlvIdx : entryContainer.getVLVIndexes()) { vlvIdx.setTrusted(null, true); } } /** * Get the parent DN of the last entry added to a suffix. * * @return The parent DN of the last entry added. */ public DN getParentDN() { return parentDN; } /** * Set the parent DN of the last entry added to a suffix. * * @param parentDN The parent DN to save. */ public void setParentDN(DN parentDN) { this.parentDN = parentDN; } /** * Get the entry ID list of the last entry added to a suffix. * * @return Return the entry ID list of the last entry added to a suffix. */ public ArrayList<EntryID> getIDs() { return IDs; } /** * Set the entry ID list of the last entry added to a suffix. * * @param IDs The entry ID list to save. */ public void setIDs(ArrayList<EntryID> IDs) { this.IDs = IDs; } } opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkElement.java
File was deleted opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
File was deleted opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009ds Sun Microsystems, Inc. */ package org.opends.server.config; @@ -3971,6 +3971,21 @@ public static final String ATTR_IMPORT_IS_ENCRYPTED = NAME_PREFIX_TASK + "import-is-encrypted"; /** * The name of the attribute in an import task definition that specifies * the temp directory path. */ public static final String ATTR_IMPORT_TMP_DIRECTORY = NAME_PREFIX_TASK + "import-tmp-directory"; /** * The name of the attribute in an import task definition that specifies * that minimal DN validation should be done during phase 2. */ public static final String ATTR_IMPORT_DN_CHECK_PHASE2 = NAME_PREFIX_TASK + "import-dn-check-phase2"; /** * The name of the objectclass that will be used for a Directory Server opends/src/server/org/opends/server/tasks/ImportTask.java
@@ -170,6 +170,8 @@ private boolean replaceExisting = false; private boolean skipSchemaValidation = false; private boolean clearBackend = false; private boolean dnCheckPhase2 = false; private String tmpDirectory = null; private String backendID = null; private String rejectFile = null; private String skipFile = null; @@ -241,6 +243,8 @@ AttributeType typeIsEncrypted; AttributeType typeClearBackend; AttributeType typeRandomSeed; AttributeType typeTmpDirectory; AttributeType typeDNCheckPhase2; typeLdifFile = getAttributeType(ATTR_IMPORT_LDIF_FILE, true); @@ -280,6 +284,10 @@ getAttributeType(ATTR_IMPORT_CLEAR_BACKEND, true); typeRandomSeed = getAttributeType(ATTR_IMPORT_RANDOM_SEED, true); typeTmpDirectory = getAttributeType(ATTR_IMPORT_TMP_DIRECTORY, true); typeDNCheckPhase2 = getAttributeType(ATTR_IMPORT_DN_CHECK_PHASE2, true); List<Attribute> attrList; @@ -323,6 +331,12 @@ attrList = taskEntry.getAttribute(typeAppend); append = TaskUtils.getBoolean(attrList, false); attrList = taskEntry.getAttribute(typeDNCheckPhase2); dnCheckPhase2 = TaskUtils.getBoolean(attrList, true); attrList = taskEntry.getAttribute(typeTmpDirectory); tmpDirectory = TaskUtils.getSingleValueString(attrList); attrList = taskEntry.getAttribute(typeReplaceExisting); replaceExisting = TaskUtils.getBoolean(attrList, false); @@ -861,6 +875,10 @@ ArrayList<String> fileList = new ArrayList<String>(ldifFiles); importConfig = new LDIFImportConfig(fileList); } if(tmpDirectory == null) { tmpDirectory = "import-tmp"; } importConfig.setAppendToExistingData(append); importConfig.setReplaceExistingEntries(replaceExisting); importConfig.setCompressed(isCompressed); @@ -873,6 +891,8 @@ importConfig.setIncludeBranches(includeBranches); importConfig.setIncludeFilters(includeFilters); importConfig.setValidateSchema(!skipSchemaValidation); importConfig.setDNCheckPhase2(dnCheckPhase2); importConfig.setTmpDirectory(tmpDirectory); // FIXME -- Should this be conditional? importConfig.setInvokeImportPlugins(true); opends/src/server/org/opends/server/tools/ImportLDIF.java
@@ -145,14 +145,15 @@ } // Define the command-line arguments that may be used with this program. private BooleanArgument append = null; //Append and replace removed for new import. // private BooleanArgument append = null; private BooleanArgument countRejects = null; private BooleanArgument displayUsage = null; private BooleanArgument isCompressed = null; private BooleanArgument isEncrypted = null; private BooleanArgument overwrite = null; private BooleanArgument quietMode = null; private BooleanArgument replaceExisting = null; // private BooleanArgument replaceExisting = null; private BooleanArgument skipSchemaValidation = null; private BooleanArgument clearBackend = null; private IntegerArgument randomSeed = null; @@ -169,6 +170,8 @@ private StringArgument rejectFile = null; private StringArgument skipFile = null; private StringArgument templateFile = null; private BooleanArgument dnCheckPhase2 = null; private StringArgument tmpDirectory = null; private int process(String[] args, boolean initializeServer, OutputStream outStream, OutputStream errStream) { @@ -239,6 +242,8 @@ INFO_LDIFIMPORT_DESCRIPTION_TEMPLATE_FILE.get()); argParser.addArgument(templateFile); /* Append and replace removed for new import. append = new BooleanArgument("append", 'a', "append", @@ -251,7 +256,7 @@ "replaceexisting", 'r', "replaceExisting", INFO_LDIFIMPORT_DESCRIPTION_REPLACE_EXISTING.get()); argParser.addArgument(replaceExisting); */ backendID = new StringArgument("backendid", 'n', "backendID", false, false, true, @@ -352,6 +357,20 @@ argParser.addArgument(skipSchemaValidation); dnCheckPhase2 = new BooleanArgument("dnPhase2", null, "dnCheckPhase2", INFO_LDIFIMPORT_DESCRIPTION_DN_CHECK_PHASE_2.get()); argParser.addArgument(dnCheckPhase2); tmpDirectory = new StringArgument("tmpdirectory", null, "tmpdirectory", false, false, true, INFO_LDIFIMPORT_TEMP_DIR_PLACEHOLDER.get(), "import-tmp", null, INFO_LDIFIMPORT_DESCRIPTION_TEMP_DIRECTORY.get()); argParser.addArgument(tmpDirectory); countRejects = new BooleanArgument("countrejects", null, "countRejects", INFO_LDIFIMPORT_DESCRIPTION_COUNT_REJECTS.get()); @@ -527,6 +546,9 @@ // // Optional attributes // /* Append and replace removed for new import. if (append.getValue() != null && !append.getValue().equals(append.getDefaultValue())) { values = new ArrayList<ByteString>(1); @@ -541,7 +563,7 @@ values.add(ByteString.valueOf(replaceExisting.getValue())); attributes.add(new LDAPAttribute(ATTR_IMPORT_REPLACE_EXISTING, values)); } */ if (backendID.getValue() != null && !backendID.getValue().equals( backendID.getDefaultValue())) { @@ -637,6 +659,25 @@ new LDAPAttribute(ATTR_IMPORT_SKIP_SCHEMA_VALIDATION, values)); } if (tmpDirectory.getValue() != null && !tmpDirectory.getValue().equals( tmpDirectory.getDefaultValue())) { values = new ArrayList<ByteString>(1); values.add(ByteString.valueOf(tmpDirectory.getValue())); attributes.add(new LDAPAttribute(ATTR_IMPORT_TMP_DIRECTORY, values)); } if (dnCheckPhase2.getValue() != null && !dnCheckPhase2.getValue().equals( dnCheckPhase2.getDefaultValue())) { values = new ArrayList<ByteString>(1); values.add(ByteString.valueOf(dnCheckPhase2.getValue())); attributes.add( new LDAPAttribute(ATTR_IMPORT_DN_CHECK_PHASE2, values)); } if (isCompressed.getValue() != null && !isCompressed.getValue().equals( isCompressed.getDefaultValue())) { @@ -1153,11 +1194,12 @@ } // Make sure that if the "backendID" argument was provided, no include base // was included, and the "append" option was not provided, the // was included, the // "clearBackend" argument was also provided if there are more then one // baseDNs for the backend being imported. if(backendID.isPresent() && !includeBranchStrings.isPresent() && !append.isPresent() && defaultIncludeBranches.size() > 1 && defaultIncludeBranches.size() > 1 && !clearBackend.isPresent()) { StringBuilder builder = new StringBuilder(); @@ -1283,8 +1325,8 @@ // Create the LDIF import configuration to use when reading the LDIF. importConfig.setAppendToExistingData(append.isPresent()); importConfig.setReplaceExistingEntries(replaceExisting.isPresent()); // importConfig.setAppendToExistingData(append.isPresent()); // importConfig.setReplaceExistingEntries(replaceExisting.isPresent()); importConfig.setCompressed(isCompressed.isPresent()); importConfig.setClearBackend(clearBackend.isPresent()); importConfig.setEncrypted(isEncrypted.isPresent()); @@ -1295,6 +1337,9 @@ importConfig.setIncludeBranches(includeBranches); importConfig.setIncludeFilters(includeFilters); importConfig.setValidateSchema(!skipSchemaValidation.isPresent()); importConfig.setDNCheckPhase2(dnCheckPhase2.isPresent()); importConfig.setTmpDirectory(tmpDirectory.getValue()); importConfig.setBufferSize(LDIF_BUFFER_SIZE); importConfig.setExcludeAllUserAttributes( excludeAllUserAttributes); opends/src/server/org/opends/server/types/LDIFImportConfig.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.types; @@ -164,6 +164,8 @@ // excluded. private boolean excludeAllOpAttrs; private String tmpDirectory; private boolean dnCheckPhase2 = false; /** @@ -1383,5 +1385,45 @@ } } } /** * Set the temporary directory to the specified path. * * @param path The path to set the temporary directory to. */ public void setTmpDirectory(String path) { tmpDirectory = path; } /** * Return the temporary directory path. * * @return The temporary directory string. */ public String getTmpDirectory() { return tmpDirectory; } /** * Set the dn check in phase two boolean to the specified value. * * @param v The value to set the dn check in phase two boolean to. */ public void setDNCheckPhase2(boolean v) { dnCheckPhase2 = v; } /** * Return the dn check in phase two boolean. * * @return Return the dn check in phase two boolean value. */ public boolean getDNCheckPhase2() { return dnCheckPhase2; } } opends/src/server/org/opends/server/util/LDIFException.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.util; import org.opends.messages.Message; @@ -107,13 +107,12 @@ * @param canContinueReading Indicates whether it is possible to continue * reading from the LDIF input source. */ public LDIFException(Message message, long lineNumber, public LDIFException(Message message, Number lineNumber, boolean canContinueReading) { super(message); this.lineNumber = lineNumber; this.lineNumber = lineNumber.longValue(); this.canContinueReading = canContinueReading; } @@ -131,13 +130,12 @@ * @param cause The underlying cause that triggered this LDIF * exception. */ public LDIFException(Message message, long lineNumber, public LDIFException(Message message, Number lineNumber, boolean canContinueReading, Throwable cause) { super(message, cause); this.lineNumber = lineNumber; this.lineNumber = lineNumber.longValue(); this.canContinueReading = canContinueReading; } opends/src/server/org/opends/server/util/LDIFReader.java
@@ -46,6 +46,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.opends.server.core.DirectoryServer; import org.opends.server.core.PluginConfigManager; @@ -55,6 +56,10 @@ import org.opends.server.types.*; import org.opends.server.api.plugin.PluginResult; import org.opends.server.backends.jeb.RootContainer; import org.opends.server.backends.jeb.EntryID; import org.opends.server.backends.jeb.importLDIF.Suffix; import org.opends.server.backends.jeb.importLDIF.Importer; /** @@ -90,17 +95,18 @@ // read. private LinkedList<StringBuilder> lastEntryHeaderLines; // The number of entries that have been ignored by this LDIF reader because // they didn't match the criteria. private long entriesIgnored; private final AtomicLong entriesIgnored = new AtomicLong(); // The number of entries that have been read by this LDIF reader, including // those that were ignored because they didn't match the criteria, and // including those that were rejected because they were invalid in some way. private long entriesRead; private final AtomicLong entriesRead = new AtomicLong(); // The number of entries that have been rejected by this LDIF reader. private long entriesRejected; private final AtomicLong entriesRejected = new AtomicLong(); // The line number on which the last entry started. private long lastEntryLineNumber; @@ -112,6 +118,10 @@ // on the entries as they are read. private PluginConfigManager pluginConfigManager; private RootContainer rootContainer; //Temporary until multiple suffixes are supported. private volatile Suffix suffix = null; /** @@ -132,9 +142,6 @@ reader = importConfig.getReader(); buffer = new byte[4096]; entriesRead = 0; entriesIgnored = 0; entriesRejected = 0; lineNumber = 0; lastEntryLineNumber = -1; lastEntryBodyLines = new LinkedList<StringBuilder>(); @@ -143,6 +150,38 @@ } /** * Creates a new LDIF reader that will read information from the * specified file. * * @param importConfig * The import configuration for this LDIF reader. It must not * be <CODE>null</CODE>. * @param rootContainer The root container needed to get the next entry ID. * @param size The size of the buffer to read the LDIF bytes into. * * @throws IOException * If a problem occurs while opening the LDIF file for * reading. */ public LDIFReader(LDIFImportConfig importConfig, RootContainer rootContainer, int size) throws IOException { ensureNotNull(importConfig); this.importConfig = importConfig; this.reader = importConfig.getReader(); this.lineNumber = 0; this.lastEntryLineNumber = -1; this.lastEntryBodyLines = new LinkedList<StringBuilder>(); this.lastEntryHeaderLines = new LinkedList<StringBuilder>(); this.pluginConfigManager = DirectoryServer.getPluginConfigManager(); this.buffer = new byte[size]; this.rootContainer = rootContainer; } /** * Reads the next entry from the LDIF source. @@ -164,6 +203,211 @@ /** * Reads the next entry from the LDIF source. This method will need * to be changed when multiple suffixes is supported. * * @return The next entry read from the LDIF source, or <CODE>null</CODE> if * the end of the LDIF data is reached. * * @param map A * * @throws IOException If an I/O problem occurs while reading from the file. * * @throws LDIFException If the information read cannot be parsed as an LDIF * entry. */ public final Entry readEntry(Map<DN, Suffix> map) throws IOException, LDIFException { return readEntry(importConfig.validateSchema(), map); } private final Entry readEntry(boolean checkSchema, Map<DN, Suffix> map) throws IOException, LDIFException { while (true) { LinkedList<StringBuilder> lines; DN entryDN; EntryID entryID; synchronized (this) { // Read the set of lines that make up the next entry. lines = readEntryLines(); if (lines == null) { return null; } lastEntryBodyLines = lines; lastEntryHeaderLines = new LinkedList<StringBuilder>(); // Read the DN of the entry and see if it is one that should be included // in the import. entryDN = readDN(lines); if (entryDN == null) { // This should only happen if the LDIF starts with the "version:" line // and has a blank line immediately after that. In that case, simply // read and return the next entry. continue; } else if (!importConfig.includeEntry(entryDN)) { if (debugEnabled()) { TRACER.debugInfo("Skipping entry %s because the DN isn't" + "one that should be included based on the include and " + "exclude branches.", entryDN); } entriesRead.incrementAndGet(); Message message = ERR_LDIF_SKIP.get(String.valueOf(entryDN)); logToSkipWriter(lines, message); entriesIgnored.incrementAndGet(); continue; } entryID = rootContainer.getNextEntryID(); } //Temporary until multiple suffixes are supported. //getMatchSuffix calls the expensive DN getParentDNInSuffix if(suffix == null) { suffix= Importer.getMatchSuffix(entryDN, map); } if(suffix == null) { if (debugEnabled()) { TRACER.debugInfo("Skipping entry %s because the DN isn't" + "one that should be included based on a suffix match" + "check." ,entryDN); } entriesRead.incrementAndGet(); Message message = ERR_LDIF_SKIP.get(String.valueOf(entryDN)); logToSkipWriter(lines, message); entriesIgnored.incrementAndGet(); continue; } entriesRead.incrementAndGet(); suffix.addPending(entryDN); // Read the set of attributes from the entry. HashMap<ObjectClass,String> objectClasses = new HashMap<ObjectClass,String>(); HashMap<AttributeType,List<Attribute>> userAttributes = new HashMap<AttributeType,List<Attribute>>(); HashMap<AttributeType,List<Attribute>> operationalAttributes = new HashMap<AttributeType,List<Attribute>>(); try { for (StringBuilder line : lines) { readAttribute(lines, line, entryDN, objectClasses, userAttributes, operationalAttributes, checkSchema); } } catch (LDIFException e) { entriesRejected.incrementAndGet(); suffix.removePending(entryDN); throw e; } // Create the entry and see if it is one that should be included in the // import. Entry entry = new Entry(entryDN, objectClasses, userAttributes, operationalAttributes); TRACER.debugProtocolElement(DebugLogLevel.VERBOSE, entry.toString()); try { if (! importConfig.includeEntry(entry)) { if (debugEnabled()) { TRACER.debugInfo("Skipping entry %s because the DN is not one " + "that should be included based on the include and exclude " + "filters.", entryDN); } Message message = ERR_LDIF_SKIP.get(String.valueOf(entryDN)); logToSkipWriter(lines, message); entriesIgnored.incrementAndGet(); suffix.removePending(entryDN); continue; } } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } suffix.removePending(entryDN); Message message = ERR_LDIF_COULD_NOT_EVALUATE_FILTERS_FOR_IMPORT. get(String.valueOf(entry.getDN()), lastEntryLineNumber, String.valueOf(e)); throw new LDIFException(message, lastEntryLineNumber, true, e); } // If we should invoke import plugins, then do so. if (importConfig.invokeImportPlugins()) { PluginResult.ImportLDIF pluginResult = pluginConfigManager.invokeLDIFImportPlugins(importConfig, entry); if (! pluginResult.continueProcessing()) { Message m; Message rejectMessage = pluginResult.getErrorMessage(); if (rejectMessage == null) { m = ERR_LDIF_REJECTED_BY_PLUGIN_NOMESSAGE.get( String.valueOf(entryDN)); } else { m = ERR_LDIF_REJECTED_BY_PLUGIN.get(String.valueOf(entryDN), rejectMessage); } logToRejectWriter(lines, m); entriesRejected.incrementAndGet(); suffix.removePending(entryDN); continue; } } // Make sure that the entry is valid as per the server schema if it is // appropriate to do so. if (checkSchema) { MessageBuilder invalidReason = new MessageBuilder(); if (! entry.conformsToSchema(null, false, true, false, invalidReason)) { Message message = ERR_LDIF_SCHEMA_VIOLATION.get( String.valueOf(entryDN), lastEntryLineNumber, invalidReason.toString()); logToRejectWriter(lines, message); entriesRejected.incrementAndGet(); suffix.removePending(entryDN); throw new LDIFException(message, lastEntryLineNumber, true); } } entry.setAttachment(entryID); // The entry should be included in the import, so return it. return entry; } } /** * Reads the next entry from the LDIF source. * * @param checkSchema Indicates whether this reader should perform schema @@ -214,15 +458,15 @@ "should be included based on the include and exclude branches.", entryDN); } entriesRead++; entriesRead.incrementAndGet(); Message message = ERR_LDIF_SKIP.get(String.valueOf(entryDN)); logToSkipWriter(lines, message); entriesIgnored++; entriesIgnored.incrementAndGet(); continue; } else { entriesRead++; entriesRead.incrementAndGet(); } // Read the set of attributes from the entry. @@ -242,7 +486,7 @@ } catch (LDIFException e) { entriesRejected++; entriesRejected.incrementAndGet(); throw e; } @@ -296,7 +540,7 @@ } Message message = ERR_LDIF_SKIP.get(String.valueOf(entryDN)); logToSkipWriter(lines, message); entriesIgnored++; entriesIgnored.incrementAndGet(); continue; } } @@ -335,7 +579,7 @@ } logToRejectWriter(lines, m); entriesRejected++; entriesRejected.incrementAndGet(); continue; } } @@ -353,7 +597,7 @@ lastEntryLineNumber, invalidReason.toString()); logToRejectWriter(lines, message); entriesRejected++; entriesRejected.incrementAndGet(); throw new LDIFException(message, lastEntryLineNumber, true); } //Add any superior objectclass(s) missing in an entries @@ -407,7 +651,7 @@ String changeType = readChangeType(lines); ChangeRecordEntry entry = null; ChangeRecordEntry entry; if(changeType != null) { @@ -469,6 +713,11 @@ LinkedList<StringBuilder> lines = new LinkedList<StringBuilder>(); int lastLine = -1; if(reader == null) { return null; } while (true) { String line = reader.readLine(); @@ -852,10 +1101,10 @@ * @param entryDN The DN of the entry being decoded. * @param objectClasses The set of objectclasses decoded so far for * the current entry. * @param userAttributes The set of user attributes decoded so far * for the current entry. * @param operationalAttributes The set of operational attributes decoded so * far for the current entry. * @param userAttrBuilders The map of user attribute builders decoded * so far for the current entry. * @param operationalAttrBuilders The map of operational attribute builders * decoded so far for the current entry. * @param checkSchema Indicates whether to perform schema * validation for the attribute. * @@ -1142,7 +1391,7 @@ */ public void rejectLastEntry(Message message) { entriesRejected++; entriesRejected.incrementAndGet(); BufferedWriter rejectWriter = importConfig.getRejectWriter(); if (rejectWriter != null) @@ -1190,7 +1439,7 @@ */ public synchronized void rejectEntry(Entry e, Message message) { BufferedWriter rejectWriter = importConfig.getRejectWriter(); entriesRejected++; entriesRejected.incrementAndGet(); if (rejectWriter != null) { try { if ((message != null) && (message.length() > 0)) { @@ -1284,7 +1533,7 @@ */ public long getEntriesRead() { return entriesRead; return entriesRead.get(); } @@ -1297,7 +1546,7 @@ */ public long getEntriesIgnored() { return entriesIgnored; return entriesIgnored.get(); } @@ -1313,7 +1562,7 @@ */ public long getEntriesRejected() { return entriesRejected; return entriesRejected.get(); } @@ -1333,8 +1582,8 @@ LinkedList<StringBuilder> lines) throws LDIFException { DN newSuperiorDN = null; RDN newRDN = null; boolean deleteOldRDN = false; RDN newRDN; boolean deleteOldRDN; if(lines.isEmpty()) { @@ -1480,7 +1729,7 @@ List<RawModification> modifications = new ArrayList<RawModification>(); while(!lines.isEmpty()) { ModificationType modType = null; ModificationType modType; StringBuilder line = lines.remove(); Attribute attr = @@ -1748,7 +1997,7 @@ InputStream inputStream = null; ByteStringBuilder builder = null; ByteStringBuilder builder; try { builder = new ByteStringBuilder(); @@ -1881,5 +2130,209 @@ } } private void readAttribute(LinkedList<StringBuilder> lines, StringBuilder line, DN entryDN, Map<ObjectClass,String> objectClasses, Map<AttributeType,List<Attribute>> userAttributes, Map<AttributeType,List<Attribute>> operationalAttributes, boolean checkSchema) throws LDIFException { // Parse the attribute type description. int colonPos = parseColonPosition(lines, line); String attrDescr = line.substring(0, colonPos); final Attribute attribute = parseAttrDescription(attrDescr); final String attrName = attribute.getName(); final String lowerName = toLowerCase(attrName); // Now parse the attribute value. ByteString value = parseSingleValue(lines, line, entryDN, colonPos, attrName); // See if this is an objectclass or an attribute. Then get the // corresponding definition and add the value to the appropriate hash. if (lowerName.equals("objectclass")) { if (! importConfig.includeObjectClasses()) { if (debugEnabled()) { TRACER.debugVerbose("Skipping objectclass %s for entry %s due to " + "the import configuration.", value, entryDN); } return; } String ocName = value.toString(); String lowerOCName = toLowerCase(ocName); ObjectClass objectClass = DirectoryServer.getObjectClass(lowerOCName); if (objectClass == null) { objectClass = DirectoryServer.getDefaultObjectClass(ocName); } if (objectClasses.containsKey(objectClass)) { logError(WARN_LDIF_DUPLICATE_OBJECTCLASS.get( String.valueOf(entryDN), lastEntryLineNumber, ocName)); } else { objectClasses.put(objectClass, ocName); } } else { AttributeType attrType = DirectoryServer.getAttributeType(lowerName); if (attrType == null) { attrType = DirectoryServer.getDefaultAttributeType(attrName); } if (! importConfig.includeAttribute(attrType)) { if (debugEnabled()) { TRACER.debugVerbose("Skipping attribute %s for entry %s due to the " + "import configuration.", attrName, entryDN); } return; } //The attribute is not being ignored so check for binary option. if(checkSchema && !attrType.isBinary()) { if(attribute.hasOption("binary")) { Message message = ERR_LDIF_INVALID_ATTR_OPTION.get( String.valueOf(entryDN),lastEntryLineNumber, attrName); logToRejectWriter(lines, message); throw new LDIFException(message, lastEntryLineNumber,true); } } if (checkSchema && (DirectoryServer.getSyntaxEnforcementPolicy() != AcceptRejectWarn.ACCEPT)) { MessageBuilder invalidReason = new MessageBuilder(); if (! attrType.getSyntax().valueIsAcceptable(value, invalidReason)) { Message message = WARN_LDIF_VALUE_VIOLATES_SYNTAX.get( String.valueOf(entryDN), lastEntryLineNumber, value.toString(), attrName, invalidReason.toString()); if (DirectoryServer.getSyntaxEnforcementPolicy() == AcceptRejectWarn.WARN) { logError(message); } else { logToRejectWriter(lines, message); throw new LDIFException(message, lastEntryLineNumber, true); } } } AttributeValue attributeValue = AttributeValues.create(attrType, value); List<Attribute> attrList; if (attrType.isOperational()) { attrList = operationalAttributes.get(attrType); if (attrList == null) { AttributeBuilder builder = new AttributeBuilder(attribute, true); builder.add(attributeValue); attrList = new ArrayList<Attribute>(); attrList.add(builder.toAttribute()); operationalAttributes.put(attrType, attrList); return; } } else { attrList = userAttributes.get(attrType); if (attrList == null) { AttributeBuilder builder = new AttributeBuilder(attribute, true); builder.add(attributeValue); attrList = new ArrayList<Attribute>(); attrList.add(builder.toAttribute()); userAttributes.put(attrType, attrList); return; } } // Check to see if any of the attributes in the list have the same set of // options. If so, then try to add a value to that attribute. for (int i = 0; i < attrList.size(); i++) { Attribute a = attrList.get(i); if (a.optionsEqual(attribute.getOptions())) { if (a.contains(attributeValue)) { if (! checkSchema) { // If we're not doing schema checking, then it is possible that // the attribute type should use case-sensitive matching and the // values differ in capitalization. Only reject the proposed // value if we find another value that is exactly the same as the // one that was provided. for (AttributeValue v : a) { if (v.getValue().equals(attributeValue.getValue())) { Message message = WARN_LDIF_DUPLICATE_ATTR.get( String.valueOf(entryDN), lastEntryLineNumber, attrName, value.toString()); logToRejectWriter(lines, message); throw new LDIFException(message, lastEntryLineNumber, true); } } } else { Message message = WARN_LDIF_DUPLICATE_ATTR.get( String.valueOf(entryDN), lastEntryLineNumber, attrName, value.toString()); logToRejectWriter(lines, message); throw new LDIFException(message, lastEntryLineNumber, true); } } if (attrType.isSingleValue() && !a.isEmpty() && checkSchema) { Message message = ERR_LDIF_MULTIPLE_VALUES_FOR_SINGLE_VALUED_ATTR .get(String.valueOf(entryDN), lastEntryLineNumber, attrName); logToRejectWriter(lines, message); throw new LDIFException(message, lastEntryLineNumber, true); } AttributeBuilder builder = new AttributeBuilder(a); builder.add(attributeValue); attrList.set(i, builder.toAttribute()); return; } } // No set of matching options was found, so create a new one and // add it to the list. AttributeBuilder builder = new AttributeBuilder(attribute, true); builder.add(attributeValue); attrList.add(builder.toAttribute()); return; } } } opends/tests/unit-tests-testng/resource/config-changes.ldif
@@ -426,7 +426,6 @@ ds-cfg-db-directory-permissions: 700 ds-cfg-index-entry-limit: 1 ds-cfg-preload-time-limit: 0 seconds ds-cfg-import-queue-size: 100 ds-cfg-import-thread-count: 8 ds-cfg-entries-compressed: false ds-cfg-db-cache-percent: 2 @@ -658,7 +657,6 @@ ds-cfg-db-directory-permissions: 700 ds-cfg-index-entry-limit: 10 ds-cfg-preload-time-limit: 0 seconds ds-cfg-import-queue-size: 100 ds-cfg-import-thread-count: 8 ds-cfg-entries-compressed: false ds-cfg-db-cache-percent: 2 @@ -830,7 +828,6 @@ ds-cfg-db-directory-permissions: 700 ds-cfg-index-entry-limit: 10 ds-cfg-preload-time-limit: 0 seconds ds-cfg-import-queue-size: 100 ds-cfg-import-thread-count: 8 ds-cfg-entries-compressed: false ds-cfg-db-cache-percent: 2 @@ -997,7 +994,6 @@ ds-cfg-db-directory-permissions: 700 ds-cfg-index-entry-limit: 10 ds-cfg-preload-time-limit: 0 seconds ds-cfg-import-queue-size: 100 ds-cfg-import-thread-count: 8 ds-cfg-entries-compressed: false ds-cfg-db-cache-percent: 2 @@ -1198,7 +1194,6 @@ ds-cfg-db-directory-permissions: 700 ds-cfg-index-entry-limit: 13 ds-cfg-preload-time-limit: 0 seconds ds-cfg-import-queue-size: 100 ds-cfg-import-thread-count: 8 ds-cfg-entries-compressed: false ds-cfg-db-cache-percent: 2 @@ -1439,4 +1434,4 @@ ds-cfg-default-include-throwable-cause: true - replace: ds-cfg-default-throwable-stack-frames ds-cfg-default-throwable-stack-frames: 500 ds-cfg-default-throwable-stack-frames: 500 opends/tests/unit-tests-testng/src/server/org/opends/server/backends/jeb/TestImportJob.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Copyright 2006-2008 Sun Microsystems, Inc. * Copyright 2006-2009 Sun Microsystems, Inc. */ package org.opends.server.backends.jeb; @@ -282,7 +282,7 @@ TestCaseUtils.deleteDirectory(tempDir); } @Test @Test(enabled=false) public void testImportAll() throws Exception { TestCaseUtils.clearJEBackend(false, beID, null); @@ -365,7 +365,8 @@ } } @Test(dependsOnMethods = "testImportAll") //@Test(dependsOnMethods = "testImportAll") @Test(enabled=false) public void testImportPartial() throws Exception { ArrayList<String> fileList = new ArrayList<String>(); @@ -453,7 +454,8 @@ } } @Test(dependsOnMethods = "testImportPartial") //@Test(dependsOnMethods = "testImportPartial") @Test(enabled=false) public void testImportReplaceExisting() throws Exception { ByteArrayOutputStream rejectedEntries = new ByteArrayOutputStream(); @@ -506,7 +508,8 @@ } } @Test(dependsOnMethods = "testImportReplaceExisting") //@Test(dependsOnMethods = "testImportReplaceExisting") @Test(enabled=false) public void testImportNoParent() throws Exception { ByteArrayOutputStream rejectedEntries = new ByteArrayOutputStream(); @@ -529,7 +532,8 @@ assertTrue(rejectedEntries.toString().contains("uid=user.446,dc=importtest1,dc=com")); } @Test(dependsOnMethods = "testImportReplaceExisting") //@Test(dependsOnMethods = "testImportReplaceExisting") @Test(enabled=false) public void testImportAppend() throws Exception { LDIFImportConfig importConfig = new LDIFImportConfig(homeDirName + File.separator + "top.ldif"); @@ -599,7 +603,8 @@ } } @Test(dependsOnMethods = "testImportPartial") //@Test(dependsOnMethods = "testImportPartial") @Test(enabled=false) public void testImportNotReplaceExisting() throws Exception { ByteArrayOutputStream rejectedEntries = new ByteArrayOutputStream(); @@ -623,7 +628,8 @@ assertTrue(rejectedEntries.toString().contains("uid=user.446,dc=importtest1,dc=com")); } @Test(dependsOnMethods = "testImportPartial") //@Test(dependsOnMethods = "testImportPartial") @Test(enabled=false) public void testImportSkip() throws Exception { ArrayList<DN> excludeBranches = new ArrayList<DN>();