From 31e832ff7b784de050bfe98404829c3d966d47c2 Mon Sep 17 00:00:00 2001
From: dugan <dugan@localhost>
Date: Tue, 15 Dec 2009 00:10:26 +0000
Subject: [PATCH] Import scalabilty improvements.
---
opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java | 73
opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java | 711 +++++----
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java | 3352 ++++++++++++++++++++++++++++------------------
opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java | 120 +
4 files changed, 2,559 insertions(+), 1,697 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
index 6e26ed2..ca2f76e 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
@@ -29,9 +29,15 @@
import org.opends.server.backends.jeb.EntryID;
import org.opends.server.backends.jeb.JebFormat;
+import java.nio.ByteBuffer;
+
/**
- * An import ID set backed by an array of integers.
+ * This class manages the set of ID that are to be eventually added to an index
+ * database. It is responsible for determining if the number of IDs is above
+ * the configured ID limit. If the limit it reached, the class stops tracking
+ * individual IDs and marks the set as undefined. This class is not thread
+ * safe.
*/
public class ImportIDSet {
@@ -46,17 +52,19 @@
*/
private int count = 0;
-
//Boolean to keep track if the instance is defined or not.
private boolean isDefined=true;
-
//Size of the undefined if count is kept.
private long undefinedSize = 0;
//Key related to an ID set.
- private byte[] key;
+ private ByteBuffer key;
+
+ //The entry limit size.
private int limit = -1;
+
+ //Set to true if a count of ids above the entry limit should be kept.
private boolean doCount = false;
@@ -75,14 +83,16 @@
this.doCount = doCount;
}
+
/**
* Create an empty import instance.
- */
+ */
public ImportIDSet()
{
}
+
/**
* Clear the set so it can be reused again. The boolean indexParam specifies
* if the index parameters should be cleared also.
@@ -102,6 +112,7 @@
}
}
+
/**
* Return if an import ID set is defined or not.
*
@@ -112,6 +123,7 @@
return isDefined;
}
+
/**
* Return the undefined size of an import ID set.
*
@@ -122,6 +134,7 @@
return undefinedSize;
}
+
/**
* Set an import ID set to undefined.
*/
@@ -157,7 +170,7 @@
{
if(doCount)
{
- undefinedSize += importIDSet.size();
+ undefinedSize += importIDSet.size();
}
}
else if(!importIDSet.isDefined()) //other undefined
@@ -197,7 +210,8 @@
addEntryID(entryID.longValue());
}
- /**
+
+ /**
* Add the specified long value to an import ID set.
*
* @param l The long value to add to an import ID set.
@@ -209,7 +223,8 @@
}
return;
}
- if(isDefined() && ((count + 1) > limit)) {
+ if((l < 0) || (isDefined() && ((count + 1) > limit)))
+ {
isDefined = false;
array = null;
if(doCount) {
@@ -230,24 +245,24 @@
boolean dbUndefined = ((dBbytes[0] & 0x80) == 0x80);
if(dbUndefined && (!importIdSet.isDefined())) {
- undefinedSize = JebFormat.entryIDUndefinedSizeFromDatabase(dBbytes) +
- importIdSet.getUndefinedSize();
- isDefined=false;
+ undefinedSize = JebFormat.entryIDUndefinedSizeFromDatabase(dBbytes) +
+ importIdSet.getUndefinedSize();
+ isDefined=false;
} else if(dbUndefined && (importIdSet.isDefined())) {
- undefinedSize = JebFormat.entryIDUndefinedSizeFromDatabase(dBbytes) +
- importIdSet.size();
- isDefined=false;
+ undefinedSize = JebFormat.entryIDUndefinedSizeFromDatabase(dBbytes) +
+ importIdSet.size();
+ isDefined=false;
} else if(!importIdSet.isDefined()) {
- int dbSize = JebFormat.entryIDListFromDatabase(dBbytes).length;
- undefinedSize = dbSize + importIdSet.getUndefinedSize();
- isDefined = false;
- incrementLimitCount = true;
+ int dbSize = JebFormat.entryIDListFromDatabase(dBbytes).length;
+ undefinedSize = dbSize + importIdSet.getUndefinedSize();
+ isDefined = false;
+ incrementLimitCount = true;
} else {
array = JebFormat.entryIDListFromDatabase(dBbytes);
if(array.length + importIdSet.size() > limit) {
- undefinedSize = array.length + importIdSet.size();
- isDefined=false;
- incrementLimitCount=true;
+ undefinedSize = array.length + importIdSet.size();
+ isDefined=false;
+ incrementLimitCount=true;
} else {
count = array.length;
addAll(importIdSet);
@@ -525,11 +540,11 @@
public byte[] toDatabase()
{
if(isDefined) {
- return encode(null);
- } else {
- return JebFormat.entryIDUndefinedSizeToDatabase(undefinedSize);
- }
- }
+ return encode(null);
+ } else {
+ return JebFormat.entryIDUndefinedSizeToDatabase(undefinedSize);
+ }
+ }
private byte[] encode(byte[] bytes)
@@ -552,22 +567,24 @@
return bytes;
}
+
/**
* Set the DB key related to an import ID set.
*
* @param key Byte array containing the key.
*/
- public void setKey(byte[] key)
+ public void setKey(ByteBuffer key)
{
this.key = key;
}
+
/**
* Return the DB key related to an import ID set.
*
* @return The byte array containing the key.
*/
- public byte[] getKey()
+ public ByteBuffer getKey()
{
return key;
}
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index 86841d3..bec64a9 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -27,19 +27,16 @@
package org.opends.server.backends.jeb.importLDIF;
-
import static org.opends.messages.JebMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.ServerConstants.*;
-
import java.io.*;
import java.nio.*;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
-
import static org.opends.server.util.StaticUtils.getFileForPath;
import org.opends.messages.Message;
import org.opends.messages.Category;
@@ -53,56 +50,95 @@
import org.opends.server.types.*;
import org.opends.server.util.*;
import com.sleepycat.je.*;
+import com.sleepycat.util.PackedInteger;
/**
- * Performs LDIF import and rebuild of indexes.
+ * This class provides the engine that performs both importing of LDIF files and
+ * the rebuilding of indexes.
*/
public class Importer
{
- private final int DRAIN_TO = 3;
- private final int TIMER_INTERVAL = 10000;
- private final int KB = 1024;
- private final int MB = (KB * KB);
- private final int GB = (1000 * MB);
- private final int LDIF_READER_BUFFER_SIZE = 2 * MB;
- private final int MIN_IMPORT_MEMORY_REQUIRED = 16 * MB;
- private final int MAX_BUFFER_SIZE = 48 * MB;
- private final int MIN_BUFFER_SIZE = 1024 * 100;
- private final int MIN_READ_AHEAD_CACHE_SIZE = 4096;
- private final int MAX_DB_CACHE_SIZE = 128 * MB;
- private final int MIN_DB_CACHE_SIZE = 16 * MB;
- private final int MAX_DB_LOG_BUFFER_BYTES = 100 * MB;
- private final int MEM_PCT_PHASE_1 = 45;
- private final int MEM_PCT_PHASE_2 = 55;
- private final int EXTRA_DB_CACHE_PCT = 30;
- private final int DN_STATE_CACHE_SIZE = 32 * KB;
+ private static final int TIMER_INTERVAL = 10000;
+ final static int KB = 1024;
+ private static final int MB = (KB * KB);
+ private static final String DEFAULT_TMP_DIR = "import-tmp";
+ private static final String TMPENV_DIR = "tmp-env";
- private final String DIRECT_PROPERTY = "import.directphase2";
+ //Defaults for DB cache.
+ private static final int MAX_DB_CACHE_SIZE = 8 * MB;
+ private static final int MAX_DB_LOG_SIZE = 10 * MB;
+
+ //Defaults for LDIF reader buffers, min memory required to import and default
+ //size for byte buffers.
+ private static final int READER_WRITER_BUFFER_SIZE = 2 * MB;
+ private static final int MIN_IMPORT_MEMORY_REQUIRED = 12 * MB;
+ private static final int BYTE_BUFFER_CAPACITY = 128;
+
+ //Min and MAX sizes of phase one buffer.
+ private static final int MAX_BUFFER_SIZE = 48 * MB;
+ private static final int MIN_BUFFER_SIZE = 64 * KB;
+
+ //Min size of phase two read-ahead cache.
+ private static final int MIN_READ_AHEAD_CACHE_SIZE = 1 * KB;
+
+ //Set aside this much for the JVM from free memory.
+ private static final int JVM_MEM_PCT = 45;
+
+ //Percent of import memory to use for temporary environment if the
+ //skip DN validation flag isn't specified.
+ private static final int TMPENV_MEM_PCT = 50;
+ //Small heap threshold used to give more memory to JVM to attempt OOM errors.
+ private static final int SMALL_HEAP_SIZE = 256 * MB;
+
+ //The DN attribute type.
private static AttributeType dnType;
- private static IndexBuffer.DNComparator dnComparator
+
+ //Comparators for DN and indexes respectively.
+ private static final IndexBuffer.DNComparator dnComparator
= new IndexBuffer.DNComparator();
private static final IndexBuffer.IndexComparator indexComparator =
new IndexBuffer.IndexComparator();
+ //Phase one buffer and imported entries counts.
private final AtomicInteger bufferCount = new AtomicInteger(0);
private final AtomicLong importCount = new AtomicLong(0);
+
+ //Phase one buffer size in bytes.
+ private int bufferSize;
+
+ //Temp scratch directory.
private final File tempDir;
+
+ //Index and thread counts.
private final int indexCount, threadCount;
+
+ //Set to true when validation is skipped.
private final boolean skipDNValidation;
- private final LDIFImportConfig importConfiguration;
- private final ByteBuffer directBuffer;
+
+ //Temporary environment used when DN validation is done in first phase.
+ private final TmpEnv tmpEnv;
+
+ //Root container.
private RootContainer rootContainer;
+
+ //Import configuration.
+ private final LDIFImportConfig importConfiguration;
+
+ //LDIF reader.
private LDIFReader reader;
- private int bufferSize, indexBufferCount;
+
+ //Migrated entry count.
private int migratedCount;
- private long dbCacheSize = 0, dbLogBufferSize = 0;
- //The executor service used for the sort tasks.
- private ExecutorService sortService;
+ //Size in bytes of temporary env and DB cache.
+ private long tmpEnvCacheSize = 0, dbCacheSize = MAX_DB_CACHE_SIZE;
- //The executor service used for the index processing tasks.
- private ExecutorService indexProcessService;
+ //The executor service used for the buffer sort tasks.
+ private ExecutorService bufferSortService;
+
+ //The executor service used for the scratch file processing tasks.
+ private ExecutorService scratchFileWriterService;
//Queue of free index buffers -- used to re-cycle index buffers;
private final BlockingQueue<IndexBuffer> freeBufferQueue =
@@ -118,27 +154,44 @@
private final List<IndexManager> indexMgrList =
new LinkedList<IndexManager>();
+ //Map of DB containers to DN-based index managers. Used to start phase 2.
+ private final List<IndexManager> DNIndexMgrList =
+ new LinkedList<IndexManager>();
+
//Futures used to indicate when the index file writers are done flushing
//their work queues and have exited. End of phase one.
- private final List<Future<?>> indexWriterFutures;
+ private final List<Future<?>> scratchFileWriterFutures;
- //List of index file writer tasks. Used to signal stopIndexWriterTasks to the
- //index file writer tasks when the LDIF file has been done.
- private final List<IndexFileWriterTask> indexWriterList;
+ //List of index file writer tasks. Used to signal stopScratchFileWriters to
+ //the index file writer tasks when the LDIF file has been done.
+ private final List<ScratchFileWriterTask> scratchFileWriterList;
//Map of DNs to Suffix objects.
private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
-
+ //Map of container ids to database containers.
private final ConcurrentHashMap<Integer, DatabaseContainer> idContainerMap =
new ConcurrentHashMap<Integer, DatabaseContainer>();
+ //Map of container ids to entry containers
private final ConcurrentHashMap<Integer, EntryContainer> idECMap =
new ConcurrentHashMap<Integer, EntryContainer>();
+ //Used to synchronize when a scratch file index writer is first setup.
private final Object synObj = new Object();
- private final RebuildManager rebuildManager;
+
+ //Rebuld index manager used when rebuilding indexes.
+ private final RebuildIndexManager rebuildManager;
+
+ //Set to true if the backend was cleared.
+ private boolean clearedBackend = false;
+
+ //Used to shutdown import if an error occurs in phase one.
+ private volatile boolean isPhaseOneCanceled = false;
+
+ //Number of phase one buffers
+ private int phaseOneBufferCount;
static
{
@@ -148,26 +201,23 @@
}
}
- private void initialize()
- {
-
- }
-
+ //Rebuild-index instance.
private
Importer(RebuildConfig rebuildConfig, LocalDBBackendCfg cfg,
EnvironmentConfig envConfig) throws IOException,
InitializationException, JebException, ConfigException
{
- this.importConfiguration = null;
- this.threadCount = 1;
- this.rebuildManager = new RebuildManager(rebuildConfig, cfg);
+ importConfiguration = null;
+ tmpEnv = null;
+ threadCount = 1;
+ rebuildManager = new RebuildIndexManager(rebuildConfig, cfg);
indexCount = rebuildManager.getIndexCount();
- indexWriterList = new ArrayList<IndexFileWriterTask>(indexCount);
- indexWriterFutures = new CopyOnWriteArrayList<Future<?>>();
+ scratchFileWriterList = new ArrayList<ScratchFileWriterTask>(indexCount);
+ scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
File parentDir;
if(rebuildConfig.getTmpDirectory() == null)
{
- parentDir = getFileForPath("import-tmp");
+ parentDir = getFileForPath(DEFAULT_TMP_DIR);
}
else
{
@@ -188,16 +238,6 @@
}
}
skipDNValidation = true;
- String propString = System.getProperty(DIRECT_PROPERTY);
- if(propString != null)
- {
- int directSize = Integer.valueOf(propString);
- directBuffer = ByteBuffer.allocateDirect(directSize);
- }
- else
- {
- directBuffer = null;
- }
if(envConfig != null)
{
initializeDBEnv(envConfig);
@@ -218,9 +258,9 @@
private Importer(LDIFImportConfig importConfiguration,
LocalDBBackendCfg localDBBackendCfg,
EnvironmentConfig envConfig) throws IOException,
- InitializationException
+ InitializationException, DatabaseException
{
- this.rebuildManager = null;
+ rebuildManager = null;
this.importConfiguration = importConfiguration;
if(importConfiguration.getThreadCount() == 0)
{
@@ -231,20 +271,23 @@
threadCount = importConfiguration.getThreadCount();
}
indexCount = localDBBackendCfg.listLocalDBIndexes().length + 2;
-
-
- indexWriterList = new ArrayList<IndexFileWriterTask>(indexCount);
- indexWriterFutures = new CopyOnWriteArrayList<Future<?>>();
+ if(!importConfiguration.appendToExistingData()) {
+ if(importConfiguration.clearBackend() ||
+ localDBBackendCfg.getBaseDN().size() <= 1) {
+ clearedBackend = true;
+ }
+ }
+ scratchFileWriterList = new ArrayList<ScratchFileWriterTask>(indexCount);
+ scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
File parentDir;
if(importConfiguration.getTmpDirectory() == null)
{
- parentDir = getFileForPath("import-tmp");
+ parentDir = getFileForPath(DEFAULT_TMP_DIR);
}
else
{
parentDir = getFileForPath(importConfiguration.getTmpDirectory());
}
-
tempDir = new File(parentDir, localDBBackendCfg.getBackendId());
if(!tempDir.exists() && !tempDir.mkdirs())
{
@@ -260,19 +303,22 @@
}
}
skipDNValidation = importConfiguration.getSkipDNValidation();
- String propString = System.getProperty(DIRECT_PROPERTY);
- if(propString != null)
+ initializeDBEnv(envConfig);
+ //Set up temporary environment.
+ if(!skipDNValidation)
{
- int directSize = Integer.valueOf(propString);
- directBuffer = ByteBuffer.allocateDirect(directSize);
+ File p = getFileForPath(localDBBackendCfg.getDBDirectory());
+ File envPath = new File(p, TMPENV_DIR);
+ envPath.mkdirs();
+ tmpEnv = new TmpEnv(envPath);
}
else
{
- directBuffer = null;
+ tmpEnv = null;
}
- initializeDBEnv(envConfig);
}
+
/**
* Return and import LDIF instance using the specified arguments.
*
@@ -316,42 +362,31 @@
return new Importer(rebuildCfg, localDBBackendCfg, envCfg);
}
- private void getBufferSizes(long availMem, int buffers)
+
+ private boolean getBufferSizes(long availMem)
{
- long memory = availMem - (MAX_DB_CACHE_SIZE + MAX_DB_LOG_BUFFER_BYTES);
- bufferSize = (int) (memory/buffers);
+ boolean maxBuf = false;
+ long memory = availMem - (MAX_DB_CACHE_SIZE + MAX_DB_LOG_SIZE);
+ bufferSize = (int) (memory/ phaseOneBufferCount);
if(bufferSize >= MIN_BUFFER_SIZE)
{
- dbCacheSize = MAX_DB_CACHE_SIZE;
- dbLogBufferSize = MAX_DB_LOG_BUFFER_BYTES;
if(bufferSize > MAX_BUFFER_SIZE)
{
bufferSize = MAX_BUFFER_SIZE;
+ maxBuf = true;
}
}
- else
+ else if(bufferSize < MIN_BUFFER_SIZE)
{
- memory = availMem - MIN_DB_CACHE_SIZE - (MIN_DB_CACHE_SIZE * 7) / 100;
- bufferSize = (int) (memory/buffers);
- dbCacheSize = MIN_DB_CACHE_SIZE;
- if(bufferSize < MIN_BUFFER_SIZE)
- {
- Message message =
- NOTE_JEB_IMPORT_LDIF_BUFF_SIZE_LESS_DEFAULT.get(MIN_BUFFER_SIZE);
- logError(message);
- bufferSize = MIN_BUFFER_SIZE;
- }
- else
- {
- long constrainedMemory = memory - (buffers * MIN_BUFFER_SIZE);
- bufferSize = (int) ((buffers * MIN_BUFFER_SIZE) +
- (constrainedMemory * 50/100));
- bufferSize /= buffers;
- dbCacheSize = MIN_DB_CACHE_SIZE + (constrainedMemory * 50/100);
- }
+ Message message =
+ NOTE_JEB_IMPORT_LDIF_BUFF_SIZE_LESS_DEFAULT.get(MIN_BUFFER_SIZE);
+ logError(message);
+ bufferSize = MIN_BUFFER_SIZE;
}
+ return maxBuf;
}
+
/**
* Return the suffix instance in the specified map that matches the specified
* DN.
@@ -376,6 +411,40 @@
return suffix;
}
+
+ private long getTmpEnvironmentMemory(long availableMemoryImport)
+ {
+ int tmpMemPct = TMPENV_MEM_PCT;
+ tmpEnvCacheSize = (availableMemoryImport * tmpMemPct) / 100;
+ availableMemoryImport -= tmpEnvCacheSize;
+ if(!clearedBackend)
+ {
+ long additionalDBCache = (tmpEnvCacheSize * 85) / 100;
+ tmpEnvCacheSize -= additionalDBCache;
+ dbCacheSize += additionalDBCache;
+ }
+ return availableMemoryImport;
+ }
+
+
+ //Used for large heap sizes when the buffer max size has been identified. Any
+ //extra memory can be given to the temporary environment in that case.
+ private void adjustTmpEnvironmentMemory(long availableMemoryImport)
+ {
+ long additionalMem = availableMemoryImport -
+ (phaseOneBufferCount * MAX_BUFFER_SIZE);
+ tmpEnvCacheSize += additionalMem;
+ if(!clearedBackend)
+ {
+ //The DN cache probably needs to be smaller and the DB cache bigger
+ //because the dn2id is checked if the backend has not been cleared.
+ long additionalDBCache = (tmpEnvCacheSize * 85) / 100;
+ tmpEnvCacheSize -= additionalDBCache;
+ dbCacheSize += additionalDBCache;
+ }
+ }
+
+
/**
* Calculate buffer sizes and initialize JEB properties based on memory.
*
@@ -387,21 +456,29 @@
throws InitializationException
{
Message message;
+ phaseOneBufferCount = 2 * (indexCount * threadCount);
Runtime runTime = Runtime.getRuntime();
- long freeMemory = runTime.freeMemory();
- long maxMemory = runTime.maxMemory();
- long totMemory = runTime.totalMemory();
- long totFreeMemory = (freeMemory + (maxMemory - totMemory));
- long extraDBCache = 0;
- long availableMemoryImport = (totFreeMemory * MEM_PCT_PHASE_1) / 100;
- if(!skipDNValidation && (totFreeMemory > GB)){
- extraDBCache = (availableMemoryImport * EXTRA_DB_CACHE_PCT) / 100;
- availableMemoryImport -= extraDBCache;
- }
- int phaseOneBuffers = 2 * (indexCount * threadCount);
- message = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(availableMemoryImport,
- phaseOneBuffers);
- logError(message);
+ long totFreeMemory = runTime.freeMemory() +
+ (runTime.maxMemory() - runTime.totalMemory());
+ int importMemPct = (100 - JVM_MEM_PCT);
+ if(totFreeMemory <= SMALL_HEAP_SIZE)
+ {
+ importMemPct -= 15;
+ }
+ if(rebuildManager != null)
+ {
+ importMemPct -= 15;
+ }
+ long availableMemoryImport = (totFreeMemory * importMemPct) / 100;
+ if(!skipDNValidation)
+ {
+ availableMemoryImport = getTmpEnvironmentMemory(availableMemoryImport);
+ }
+ boolean maxBuffers = getBufferSizes(availableMemoryImport);
+ if(!skipDNValidation && maxBuffers)
+ {
+ adjustTmpEnvironmentMemory(availableMemoryImport);
+ }
if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) == null)
{
if (availableMemoryImport < MIN_IMPORT_MEMORY_REQUIRED)
@@ -410,30 +487,30 @@
throw new InitializationException(message);
}
}
- getBufferSizes(availableMemoryImport, phaseOneBuffers);
- dbCacheSize += extraDBCache;
- if(!skipDNValidation)
- {
- envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
- }
- envConfig.setConfigParam("je.maxMemory", Long.toString(dbCacheSize));
- message =
- NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize, bufferSize);
+ message = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(availableMemoryImport,
+ phaseOneBufferCount);
logError(message);
- if(dbLogBufferSize != 0)
+ if(tmpEnvCacheSize > 0)
{
- envConfig.setConfigParam("je.log.totalBufferBytes",
- Long.toString(dbLogBufferSize));
- message = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(dbLogBufferSize);
- logError(message);
+ message = NOTE_JEB_IMPORT_LDIF_TMP_ENV_MEM.get(tmpEnvCacheSize);
+ logError(message);
}
+ envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
+ envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY,
+ Long.toString(dbCacheSize));
+ message = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize,
+ bufferSize);
+ logError(message);
+ envConfig.setConfigParam(EnvironmentConfig.LOG_TOTAL_BUFFER_BYTES,
+ Long.toString(MAX_DB_LOG_SIZE));
+ message = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(MAX_DB_LOG_SIZE);
+ logError(message);
}
- private void initializeIndexBuffers(int threadCount)
+ private void initializeIndexBuffers()
{
- indexBufferCount = 2 * (indexCount * threadCount);
- for(int i = 0; i < indexBufferCount; i++)
+ for(int i = 0; i < phaseOneBufferCount; i++)
{
IndexBuffer b = IndexBuffer.createIndexBuffer(bufferSize);
freeBufferQueue.add(b);
@@ -441,7 +518,6 @@
}
-
private void initializeSuffixes() throws DatabaseException, JebException,
ConfigException, InitializationException
{
@@ -451,6 +527,61 @@
if(suffix != null)
{
dnSuffixMap.put(ec.getBaseDN(), suffix);
+ generateIndexID(suffix);
+ }
+ }
+ }
+
+
+ //Mainly used to support multiple suffixes. Each index in each suffix gets
+ //an unique ID to identify which DB it needs to go to in phase two processing.
+ private void generateIndexID(Suffix suffix)
+ {
+ for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
+ suffix.getAttrIndexMap().entrySet()) {
+ AttributeIndex attributeIndex = mapEntry.getValue();
+ DatabaseContainer container;
+ if((container=attributeIndex.getEqualityIndex()) != null) {
+ int id = System.identityHashCode(container);
+ idContainerMap.putIfAbsent(id, container);
+ }
+ if((container=attributeIndex.getPresenceIndex()) != null) {
+ int id = System.identityHashCode(container);
+ idContainerMap.putIfAbsent(id, container);
+ }
+ if((container=attributeIndex.getSubstringIndex()) != null) {
+ int id = System.identityHashCode(container);
+ idContainerMap.putIfAbsent(id, container);
+ }
+ if((container=attributeIndex.getOrderingIndex()) != null) {
+ int id = System.identityHashCode(container);
+ idContainerMap.putIfAbsent(id, container);
+ }
+ if((container=attributeIndex.getApproximateIndex()) != null) {
+ int id = System.identityHashCode(container);
+ idContainerMap.putIfAbsent(id, container);
+ }
+ Map<String,Collection<Index>> extensibleMap =
+ attributeIndex.getExtensibleIndexes();
+ if(!extensibleMap.isEmpty()) {
+ Collection<Index> subIndexes =
+ attributeIndex.getExtensibleIndexes().get(
+ EXTENSIBLE_INDEXER_ID_SUBSTRING);
+ if(subIndexes != null) {
+ for(DatabaseContainer subIndex : subIndexes) {
+ int id = System.identityHashCode(subIndex);
+ idContainerMap.putIfAbsent(id, subIndex);
+ }
+ }
+ Collection<Index> sharedIndexes =
+ attributeIndex.getExtensibleIndexes().get(
+ EXTENSIBLE_INDEXER_ID_SHARED);
+ if(sharedIndexes !=null) {
+ for(DatabaseContainer sharedIndex : sharedIndexes) {
+ int id = System.identityHashCode(sharedIndex);
+ idContainerMap.putIfAbsent(id, sharedIndex);
+ }
+ }
}
}
}
@@ -619,8 +750,8 @@
InterruptedException, ExecutionException
{
this.rootContainer = rootContainer;
- this.reader = new LDIFReader(importConfiguration, rootContainer,
- LDIF_READER_BUFFER_SIZE);
+ reader = new LDIFReader(importConfiguration, rootContainer,
+ READER_WRITER_BUFFER_SIZE);
try
{
Message message =
@@ -631,14 +762,29 @@
logError(message);
initializeSuffixes();
long startTime = System.currentTimeMillis();
- processPhaseOne();
- processPhaseTwo();
+ phaseOne();
+ long phaseOneFinishTime = System.currentTimeMillis();
+ if(!skipDNValidation)
+ {
+ tmpEnv.shutdown();
+ }
+ if(isPhaseOneCanceled)
+ {
+ throw new InterruptedException("Import processing canceled.");
+ }
+ long phaseTwoTime = System.currentTimeMillis();
+ phaseTwo();
+ long phaseTwoFinishTime = System.currentTimeMillis();
setIndexesTrusted();
switchContainers();
tempDir.delete();
long finishTime = System.currentTimeMillis();
long importTime = (finishTime - startTime);
float rate = 0;
+ message = NOTE_JEB_IMPORT_PHASE_STATS.get(importTime,
+ (phaseOneFinishTime - startTime),
+ (phaseTwoFinishTime - phaseTwoTime));
+ logError(message);
if (importTime > 0)
rate = 1000f * reader.getEntriesRead() / importTime;
message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(),
@@ -704,17 +850,16 @@
}
- private void processPhaseOne() throws InterruptedException, ExecutionException
+ private void phaseOne() throws InterruptedException, ExecutionException
{
- initializeIndexBuffers(threadCount);
+ initializeIndexBuffers();
FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
Timer timer = new Timer();
timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
- indexProcessService = Executors.newFixedThreadPool(2 * indexCount);
- sortService = Executors.newFixedThreadPool(threadCount);
+ scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
+ bufferSortService = Executors.newFixedThreadPool(threadCount);
ExecutorService execService = Executors.newFixedThreadPool(threadCount);
List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
-
tasks.add(new MigrateExistingTask());
List<Future<Void>> results = execService.invokeAll(tasks);
for (Future<Void> result : results) {
@@ -724,7 +869,6 @@
}
tasks.clear();
results.clear();
-
if (importConfiguration.appendToExistingData() &&
importConfiguration.replaceExistingEntries())
{
@@ -753,28 +897,31 @@
if(!result.isDone()) {
result.get();
}
- stopIndexWriterTasks();
- for (Future<?> result : indexWriterFutures)
+ stopScratchFileWriters();
+ for (Future<?> result : scratchFileWriterFutures)
{
if(!result.isDone()) {
result.get();
}
}
- indexWriterList.clear();
- indexWriterFutures.clear();
+ //Try to clear as much memory as possible.
+ scratchFileWriterList.clear();
+ scratchFileWriterFutures.clear();
indexKeyQueMap.clear();
execService.shutdown();
freeBufferQueue.clear();
- sortService.shutdown();
+ bufferSortService.shutdown();
+ scratchFileWriterService.shutdown();
timer.cancel();
}
- private void processPhaseTwo() throws InterruptedException
+ private void phaseTwo() throws InterruptedException, JebException,
+ ExecutionException
{
SecondPhaseProgressTask progress2Task =
- new SecondPhaseProgressTask(indexMgrList, reader.getEntriesRead());
+ new SecondPhaseProgressTask(reader.getEntriesRead());
Timer timer2 = new Timer();
timer2.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL, TIMER_INTERVAL);
processIndexFiles();
@@ -782,83 +929,91 @@
}
-
- private void processIndexFiles() throws InterruptedException
+ private int getBufferCount(int dbThreads)
{
- List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(indexCount);
+ int c = 0;
+ int buffers = 0;
+ //Count DN buffers first, since they are processed first.
+ while(c < DNIndexMgrList.size() && c < dbThreads)
+ {
+ buffers += DNIndexMgrList.get(c++).getBufferList().size();
+ }
+ while(c < indexMgrList.size() && c < dbThreads)
+ {
+ buffers += indexMgrList.get(c++).getBufferList().size();
+ }
+ return buffers;
+ }
+
+
+ private void processIndexFiles() throws InterruptedException,
+ JebException, ExecutionException
+ {
+ ExecutorService dbService;
if(bufferCount.get() == 0)
{
return;
}
- int cacheSize = cacheSizeFromFreeMemory();
- int p = 0;
- int offSet = 0;
- if(directBuffer != null)
+ int dbThreads = Runtime.getRuntime().availableProcessors();
+ if(dbThreads < 4)
{
- cacheSize = cacheSizeFromDirectMemory();
+ dbThreads = 4;
}
- for(IndexManager idxMgr : indexMgrList)
+ int readAheadSize = cacheSizeFromFreeMemory(getBufferCount(dbThreads));
+ List<Future<Void>> futures = new LinkedList<Future<Void>>();
+ dbService = Executors.newFixedThreadPool(dbThreads);
+ //Start DN processing first.
+ for(IndexManager dnMgr : DNIndexMgrList)
{
- if(directBuffer != null)
- {
- int cacheSizes = cacheSize * idxMgr.getBufferList().size();
- offSet += cacheSizes;
- directBuffer.limit(offSet);
- directBuffer.position(p);
- ByteBuffer b = directBuffer.slice();
- tasks.add(new IndexWriteDBTask(idxMgr, b, cacheSize));
- p += cacheSizes;
- }
- else
- {
- tasks.add(new IndexWriteDBTask(idxMgr, null, cacheSize));
- }
+ futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, readAheadSize)));
}
- List<Future<Void>> results = indexProcessService.invokeAll(tasks);
- for (Future<Void> result : results)
- assert result.isDone();
- indexProcessService.shutdown();
+ for(IndexManager mgr : indexMgrList)
+ {
+ futures.add(dbService.submit(new IndexDBWriteTask(mgr, readAheadSize)));
+ }
+ for (Future<Void> result : futures)
+ if(!result.isDone()) {
+ result.get();
+ }
+ dbService.shutdown();
}
- private int cacheSizeFromDirectMemory()
- {
- int cacheSize = directBuffer.capacity()/bufferCount.get();
- if(cacheSize > bufferSize)
- {
- cacheSize = bufferSize;
- }
- Message message =
- NOTE_JEB_IMPORT_LDIF_DIRECT_MEM_REPORT.get(bufferCount.get(), cacheSize);
- logError(message);
- return cacheSize;
- }
-
- private int cacheSizeFromFreeMemory()
+ private int cacheSizeFromFreeMemory(int buffers)
{
Runtime runTime = Runtime.getRuntime();
+ runTime.gc();
+ runTime.gc();
long freeMemory = runTime.freeMemory();
long maxMemory = runTime.maxMemory();
long totMemory = runTime.totalMemory();
long totFreeMemory = (freeMemory + (maxMemory - totMemory));
- long availMemory = (totFreeMemory * MEM_PCT_PHASE_2) / 100;
- int averageBufferSize = (int)(availMemory / bufferCount.get());
+ int importMemPct = (100 - JVM_MEM_PCT);
+ //For very small heaps, give more memory to the JVM.
+ if(totFreeMemory <= SMALL_HEAP_SIZE)
+ {
+ importMemPct -= 35;
+ }
+ long availableMemory = (totFreeMemory * importMemPct) / 100;
+ int averageBufferSize = (int)(availableMemory /buffers);
int cacheSize = Math.max(MIN_READ_AHEAD_CACHE_SIZE, averageBufferSize);
+ //Cache size is never larger than the buffer size.
if(cacheSize > bufferSize)
{
cacheSize = bufferSize;
}
Message message =
- NOTE_JEB_IMPORT_LDIF_INDIRECT_MEM_REPORT.get(bufferCount.get(), cacheSize);
+ NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get(availableMemory,
+ cacheSize, buffers);
logError(message);
return cacheSize;
}
- private void stopIndexWriterTasks()
+ private void stopScratchFileWriters()
{
IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0);
- for(IndexFileWriterTask task : indexWriterList)
+ for(ScratchFileWriterTask task : scratchFileWriterList)
{
task.queue.add(indexBuffer);
}
@@ -871,6 +1026,9 @@
private final class MigrateExcludedTask extends ImportTask
{
+ /**
+ * {@inheritDoc}
+ */
public Void call() throws Exception
{
for(Suffix suffix : dnSuffixMap.values()) {
@@ -905,8 +1063,9 @@
end[0] = (byte) (end[0] + 1);
while(status == OperationStatus.SUCCESS &&
- comparator.compare(key.getData(), end) < 0 &&
- !importConfiguration.isCancelled()) {
+ comparator.compare(key.getData(), end) < 0 &&
+ !importConfiguration.isCancelled() &&
+ !isPhaseOneCanceled) {
EntryID id = new EntryID(data);
Entry entry = entryContainer.getID2Entry().get(null,
id, LockMode.DEFAULT);
@@ -917,20 +1076,17 @@
}
}
}
+ cursor.close();
+ flushIndexBuffers();
}
catch (Exception e)
{
message =
ERR_JEB_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR.get(e.getMessage());
logError(message);
+ isPhaseOneCanceled =true;
throw e;
}
- finally
- {
- cursor.close();
- flushIndexBuffers();
- closeCursors();
- }
}
}
return null;
@@ -944,6 +1100,9 @@
private final class MigrateExistingTask extends ImportTask
{
+ /**
+ * {@inheritDoc}
+ */
public Void call() throws Exception
{
for(Suffix suffix : dnSuffixMap.values()) {
@@ -963,7 +1122,7 @@
try {
status = cursor.getFirst(key, data, lockMode);
while(status == OperationStatus.SUCCESS &&
- !importConfiguration.isCancelled()) {
+ !importConfiguration.isCancelled() && !isPhaseOneCanceled) {
DN dn = DN.decode(ByteString.wrap(key.getData()));
if(!suffix.getIncludeBranches().contains(dn)) {
EntryID id = new EntryID(data);
@@ -995,20 +1154,17 @@
status = cursor.getSearchKeyRange(key, data, lockMode);
}
}
+ cursor.close();
+ flushIndexBuffers();
}
catch(Exception e)
{
message =
ERR_JEB_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR.get(e.getMessage());
logError(message);
+ isPhaseOneCanceled =true;
throw e;
}
- finally
- {
- cursor.close();
- flushIndexBuffers();
- closeCursors();
- }
}
}
return null;
@@ -1016,16 +1172,17 @@
}
/**
- * Task to handle append/replace combination.
+ * Task to perform append/replace processing.
*/
private class AppendReplaceTask extends ImportTask
{
- private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
- private final Set<byte[]> deleteKeySet = new HashSet<byte[]>();
+ private final Set<byte[]> insertKeySet = new HashSet<byte[]>(),
+ deleteKeySet = new HashSet<byte[]>();
private final EntryInformation entryInfo = new EntryInformation();
private Entry oldEntry;
private EntryID entryID;
+
/**
* {@inheritDoc}
*/
@@ -1035,7 +1192,7 @@
{
while (true)
{
- if (importConfiguration.isCancelled())
+ if (importConfiguration.isCancelled() || isPhaseOneCanceled)
{
IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0);
freeBufferQueue.add(indexBuffer);
@@ -1052,13 +1209,13 @@
processEntry(entry, suffix);
}
flushIndexBuffers();
- closeCursors();
}
catch(Exception e)
{
Message message =
ERR_JEB_IMPORT_LDIF_APPEND_REPLACE_TASK_ERR.get(e.getMessage());
logError(message);
+ isPhaseOneCanceled = true;
throw e;
}
return null;
@@ -1067,7 +1224,7 @@
void processEntry(Entry entry, Suffix suffix)
throws DatabaseException, ConfigException, DirectoryException,
- JebException
+ JebException, InterruptedException
{
DN entryDN = entry.getDN();
@@ -1081,20 +1238,12 @@
{
if(!skipDNValidation)
{
- if(!processParent(entryDN, entryID, entry, suffix))
+ if(!dnSanityCheck(entryDN, entry, suffix))
{
suffix.removePending(entryDN);
return;
}
- if(!suffix.getDN2ID().insert(null, entryDN, entryID))
- {
- suffix.removePending(entryDN);
- Message message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
- reader.rejectEntry(entry, message);
- return;
- }
suffix.removePending(entryDN);
- processID2SC(entryID, entry, suffix);
}
else
{
@@ -1119,9 +1268,11 @@
}
}
+
void
processAllIndexes(Suffix suffix, Entry entry, EntryID entryID) throws
- DatabaseException, DirectoryException, JebException, ConfigException
+ DatabaseException, DirectoryException, JebException,
+ ConfigException, InterruptedException
{
for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
@@ -1131,23 +1282,28 @@
Index index;
if((index=attributeIndex.getEqualityIndex()) != null) {
processAttribute(index, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.EQUALITY));
+ new IndexKey(attributeType, ImportIndexType.EQUALITY,
+ index.getIndexEntryLimit()));
}
if((index=attributeIndex.getPresenceIndex()) != null) {
processAttribute(index, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.PRESENCE));
+ new IndexKey(attributeType, ImportIndexType.PRESENCE,
+ index.getIndexEntryLimit()));
}
if((index=attributeIndex.getSubstringIndex()) != null) {
processAttribute(index, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.SUBSTRING));
+ new IndexKey(attributeType, ImportIndexType.SUBSTRING,
+ index.getIndexEntryLimit()));
}
if((index=attributeIndex.getOrderingIndex()) != null) {
processAttribute(index, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.ORDERING));
+ new IndexKey(attributeType, ImportIndexType.ORDERING,
+ index.getIndexEntryLimit()));
}
if((index=attributeIndex.getApproximateIndex()) != null) {
processAttribute(index, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.APPROXIMATE));
+ new IndexKey(attributeType, ImportIndexType.APPROXIMATE,
+ index.getIndexEntryLimit()));
}
for(VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes()) {
Transaction transaction = null;
@@ -1162,7 +1318,8 @@
if(subIndexes != null) {
for(Index subIndex: subIndexes) {
processAttribute(subIndex, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.EX_SUBSTRING));
+ new IndexKey(attributeType, ImportIndexType.EX_SUBSTRING,
+ subIndex.getIndexEntryLimit()));
}
}
Collection<Index> sharedIndexes =
@@ -1171,7 +1328,8 @@
if(sharedIndexes !=null) {
for(Index sharedIndex:sharedIndexes) {
processAttribute(sharedIndex, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.EX_SHARED));
+ new IndexKey(attributeType, ImportIndexType.EX_SHARED,
+ sharedIndex.getIndexEntryLimit()));
}
}
}
@@ -1179,12 +1337,10 @@
}
-
void processAttribute(Index index, Entry entry, EntryID entryID,
IndexKey indexKey) throws DatabaseException,
- ConfigException
+ ConfigException, InterruptedException
{
-
if(oldEntry != null)
{
deleteKeySet.clear();
@@ -1204,18 +1360,19 @@
}
-
/**
- * This task processes the LDIF file during phase 1.
+ * This task performs phase reading and processing of the entries read from
+ * the LDIF file(s). This task is used if the append flag wasn't specified.
*/
private class ImportTask implements Callable<Void>
{
-
- private final
- Map<IndexKey, IndexBuffer> indexBufferMap =
- new HashMap<IndexKey, IndexBuffer>();
+ private final Map<IndexKey, IndexBuffer> indexBufferMap =
+ new HashMap<IndexKey, IndexBuffer>();
private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
private final EntryInformation entryInfo = new EntryInformation();
+ private DatabaseEntry keyEntry = new DatabaseEntry(),
+ valEntry = new DatabaseEntry();
+
/**
* {@inheritDoc}
@@ -1226,14 +1383,13 @@
{
while (true)
{
- if (importConfiguration.isCancelled())
+ if (importConfiguration.isCancelled() || isPhaseOneCanceled)
{
IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0);
freeBufferQueue.add(indexBuffer);
return null;
}
Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
-
if (entry == null)
{
break;
@@ -1243,151 +1399,82 @@
processEntry(entry, entryID, suffix);
}
flushIndexBuffers();
- closeCursors();
}
catch (Exception e)
{
Message message =
ERR_JEB_IMPORT_LDIF_IMPORT_TASK_ERR.get(e.getMessage());
logError(message);
+ isPhaseOneCanceled = true;
throw e;
}
-
return null;
}
- void closeCursors() throws DatabaseException
- {
- if(!skipDNValidation)
- {
- for(Suffix suffix : dnSuffixMap.values())
- {
- suffix.getEntryContainer().getID2Children().closeCursor();
- suffix.getEntryContainer().getID2Subtree().closeCursor();
- }
- }
- }
-
-
void processEntry(Entry entry, EntryID entryID, Suffix suffix)
throws DatabaseException, ConfigException, DirectoryException,
- JebException
+ JebException, InterruptedException
{
DN entryDN = entry.getDN();
if(!skipDNValidation)
{
- if(!processParent(entryDN, entryID, entry, suffix))
+ if(!dnSanityCheck(entryDN, entry, suffix))
{
suffix.removePending(entryDN);
return;
}
- if(!suffix.getDN2ID().insert(null, entryDN, entryID))
- {
- suffix.removePending(entryDN);
- Message message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
- reader.rejectEntry(entry, message);
- return;
- }
- suffix.removePending(entryDN);
- processID2SC(entryID, entry, suffix);
}
- else
- {
- processDN2ID(suffix, entryDN, entryID);
- suffix.removePending(entryDN);
- }
+ suffix.removePending(entryDN);
+ processDN2ID(suffix, entryDN, entryID);
processDN2URI(suffix, null, entry);
- suffix.getID2Entry().put(null, entryID, entry);
processIndexes(suffix, entry, entryID);
+ suffix.getID2Entry().put(null, entryID, entry);
importCount.getAndIncrement();
}
- boolean processParent(DN entryDN, EntryID entryID, Entry entry,
- Suffix suffix) throws DatabaseException
+ //Examine the DN for duplicates and missing parents.
+ boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
+ throws JebException, InterruptedException
{
- EntryID parentID = null;
- DN parentDN =
- suffix.getEntryContainer().getParentWithinBase(entryDN);
- DN2ID dn2id = suffix.getDN2ID();
- if(dn2id.get(null, entryDN, LockMode.DEFAULT) != null)
+ //If the backend was not cleared, then the dn2id needs to checked first
+ //for DNs that might not exist in the DN cache. If the DN is not in
+ //the suffixes dn2id DB, then the dn cache is used.
+ if(!clearedBackend)
{
- Message message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
- reader.rejectEntry(entry, message);
- return false;
- }
-
- if (parentDN != null) {
- parentID = suffix.getParentID(parentDN);
- if (parentID == null) {
- dn2id.remove(null, entryDN);
- Message message =
- ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
- reader.rejectEntry(entry, message);
+ EntryID id = suffix.getDN2ID().get(null, entryDN, LockMode.DEFAULT);
+ if(id != null || !tmpEnv.insert(entryDN, keyEntry, valEntry) )
+ {
+ Message message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
+ reader.rejectEntry(entry, message);
return false;
}
}
- ArrayList<EntryID> IDs;
- if (parentDN != null && suffix.getParentDN() != null &&
- parentDN.equals(suffix.getParentDN())) {
- IDs = new ArrayList<EntryID>(suffix.getIDs());
- IDs.set(0, entryID);
- }
- else
+ else if(!tmpEnv.insert(entryDN, keyEntry, valEntry))
{
- EntryID nodeID;
- IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
- IDs.add(entryID);
- if (parentID != null)
- {
- IDs.add(parentID);
- EntryContainer entryContainer = suffix.getEntryContainer();
- for (DN dn = entryContainer.getParentWithinBase(parentDN); dn != null;
- dn = entryContainer.getParentWithinBase(dn)) {
- if((nodeID = suffix.getParentID(dn)) == null) {
- return false;
- } else {
- IDs.add(nodeID);
- }
- }
+ Message message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
+ reader.rejectEntry(entry, message);
+ return false;
+ }
+ //Perform parent checking.
+ DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN);
+ if (parentDN != null) {
+ if (!suffix.isParentProcessed(parentDN, tmpEnv, clearedBackend)) {
+ Message message =
+ ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
+ reader.rejectEntry(entry, message);
+ return false;
}
}
- suffix.setParentDN(parentDN);
- suffix.setIDs(IDs);
- entry.setAttachment(IDs);
return true;
}
- void processID2SC(EntryID entryID, Entry entry, Suffix suffix)
- throws DatabaseException
- {
- Set<byte[]> childKeySet = new HashSet<byte[]>();
- Set<byte[]> subTreeKeySet = new HashSet<byte[]>();
- Index id2children = suffix.getEntryContainer().getID2Children();
- Index id2subtree = suffix.getEntryContainer().getID2Subtree();
- id2children.indexer.indexEntry(entry, childKeySet);
- id2subtree.indexer.indexEntry(entry, subTreeKeySet);
-
- DatabaseEntry dbKey = new DatabaseEntry();
- DatabaseEntry dbVal = new DatabaseEntry();
- ImportIDSet idSet = new ImportIDSet(1, id2children.getIndexEntryLimit(),
- id2children.getMaintainCount());
- idSet.addEntryID(entryID);
- id2children.insert(idSet, childKeySet, dbKey, dbVal);
-
- DatabaseEntry dbSubKey = new DatabaseEntry();
- DatabaseEntry dbSubVal = new DatabaseEntry();
- ImportIDSet idSubSet = new ImportIDSet(1, id2subtree.getIndexEntryLimit(),
- id2subtree.getMaintainCount());
- idSubSet.addEntryID(entryID);
- id2subtree.insert(idSubSet, subTreeKeySet, dbSubKey, dbSubVal);
- }
-
void
processIndexes(Suffix suffix, Entry entry, EntryID entryID) throws
- DatabaseException, DirectoryException, JebException, ConfigException
+ DatabaseException, DirectoryException, JebException,
+ ConfigException, InterruptedException
{
for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
suffix.getAttrIndexMap().entrySet()) {
@@ -1397,23 +1484,28 @@
Index index;
if((index=attributeIndex.getEqualityIndex()) != null) {
processAttribute(index, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.EQUALITY));
+ new IndexKey(attributeType, ImportIndexType.EQUALITY,
+ index.getIndexEntryLimit()));
}
if((index=attributeIndex.getPresenceIndex()) != null) {
processAttribute(index, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.PRESENCE));
+ new IndexKey(attributeType, ImportIndexType.PRESENCE,
+ index.getIndexEntryLimit()));
}
if((index=attributeIndex.getSubstringIndex()) != null) {
processAttribute(index, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.SUBSTRING));
+ new IndexKey(attributeType, ImportIndexType.SUBSTRING,
+ index.getIndexEntryLimit()));
}
if((index=attributeIndex.getOrderingIndex()) != null) {
processAttribute(index, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.ORDERING));
+ new IndexKey(attributeType, ImportIndexType.ORDERING,
+ index.getIndexEntryLimit()));
}
if((index=attributeIndex.getApproximateIndex()) != null) {
processAttribute(index, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.APPROXIMATE));
+ new IndexKey(attributeType, ImportIndexType.APPROXIMATE,
+ index.getIndexEntryLimit()));
}
for(VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes()) {
Transaction transaction = null;
@@ -1428,7 +1520,8 @@
if(subIndexes != null) {
for(Index subIndex: subIndexes) {
processAttribute(subIndex, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.EX_SUBSTRING));
+ new IndexKey(attributeType, ImportIndexType.EX_SUBSTRING,
+ subIndex.getIndexEntryLimit()));
}
}
Collection<Index> sharedIndexes =
@@ -1437,7 +1530,8 @@
if(sharedIndexes !=null) {
for(Index sharedIndex:sharedIndexes) {
processAttribute(sharedIndex, entry, entryID,
- new IndexKey(attributeType, ImportIndexType.EX_SHARED));
+ new IndexKey(attributeType, ImportIndexType.EX_SHARED,
+ sharedIndex.getIndexEntryLimit()));
}
}
}
@@ -1449,7 +1543,7 @@
void processAttribute(Index index, Entry entry, EntryID entryID,
IndexKey indexKey) throws DatabaseException,
- ConfigException
+ ConfigException, InterruptedException
{
insertKeySet.clear();
index.indexer.indexEntry(entry, insertKeySet);
@@ -1464,10 +1558,13 @@
ExecutionException
{
Set<Map.Entry<IndexKey, IndexBuffer>> set = indexBufferMap.entrySet();
- for(Map.Entry<IndexKey, IndexBuffer> e : set)
+ Iterator<Map.Entry<IndexKey, IndexBuffer>> setIterator = set.iterator();
+ while(setIterator.hasNext())
{
+ Map.Entry<IndexKey, IndexBuffer> e = setIterator.next();
IndexKey indexKey = e.getKey();
IndexBuffer indexBuffer = e.getValue();
+ setIterator.remove();
ImportIndexType indexType = indexKey.getIndexType();
if(indexType.equals(ImportIndexType.DN))
{
@@ -1478,7 +1575,9 @@
indexBuffer.setComparator(indexComparator);
}
indexBuffer.setIndexKey(indexKey);
- Future<Void> future = sortService.submit(new SortTask(indexBuffer));
+ indexBuffer.setDiscard();
+ Future<Void> future =
+ bufferSortService.submit(new SortTask(indexBuffer));
future.get();
}
}
@@ -1488,7 +1587,7 @@
processKey(DatabaseContainer container, byte[] key, EntryID entryID,
IndexBuffer.ComparatorBuffer<byte[]> comparator, IndexKey indexKey,
boolean insert)
- throws ConfigException
+ throws ConfigException, InterruptedException
{
IndexBuffer indexBuffer;
if(!indexBufferMap.containsKey(indexKey))
@@ -1500,45 +1599,51 @@
{
indexBuffer = indexBufferMap.get(indexKey);
}
- if(!indexBuffer.isSpaceAvailable(key))
+ if(!indexBuffer.isSpaceAvailable(key, entryID.longValue()))
{
indexBuffer.setComparator(comparator);
indexBuffer.setIndexKey(indexKey);
- sortService.submit(new SortTask(indexBuffer));
+ bufferSortService.submit(new SortTask(indexBuffer));
indexBuffer = getNewIndexBuffer();
indexBufferMap.remove(indexKey);
indexBufferMap.put(indexKey, indexBuffer);
}
int id = System.identityHashCode(container);
- idContainerMap.putIfAbsent(id, container);
indexBuffer.add(key, entryID, id, insert);
return id;
}
- IndexBuffer getNewIndexBuffer() throws ConfigException
+ IndexBuffer getNewIndexBuffer() throws ConfigException, InterruptedException
{
- IndexBuffer indexBuffer = freeBufferQueue.poll();
- if(indexBuffer.isPoison())
- {
- Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
- "Abort import - MPD");
- throw new ConfigException(message);
- }
+ IndexBuffer indexBuffer = freeBufferQueue.take();
+ if(indexBuffer == null)
+ {
+ Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
+ "Index buffer processing error.");
+ throw new InterruptedException(message.toString());
+ }
+ if(indexBuffer.isPoison())
+ {
+ Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
+ "Cancel processing received.");
+ throw new InterruptedException(message.toString());
+ }
return indexBuffer;
}
void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
- throws ConfigException
+ throws ConfigException, InterruptedException
{
DatabaseContainer dn2id = suffix.getDN2ID();
byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
int id = processKey(dn2id, dnBytes, entryID, dnComparator,
- new IndexKey(dnType, ImportIndexType.DN), true);
+ new IndexKey(dnType, ImportIndexType.DN, 1), true);
idECMap.putIfAbsent(id, suffix.getEntryContainer());
}
+
void processDN2URI(Suffix suffix, Entry oldEntry, Entry newEntry)
throws DatabaseException
{
@@ -1556,59 +1661,57 @@
/**
- * The task reads the temporary index files and writes their results to the
- * index database.
+ * This task reads sorted records from the temporary index scratch files,
+ * processes the records and writes the results to the index database. The
+ * DN index is treated differently then non-DN indexes.
*/
- private final class IndexWriteDBTask implements Callable<Void>
+ private final class IndexDBWriteTask implements Callable<Void>
{
private final IndexManager indexMgr;
private final DatabaseEntry dbKey, dbValue;
private final int cacheSize;
- private ByteBuffer directBuffer = null;
private final Map<Integer, DNState> dnStateMap =
- new HashMap<Integer, DNState>();
+ new HashMap<Integer, DNState>();
private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>();
- public IndexWriteDBTask(IndexManager indexMgr, ByteBuffer b, int cacheSize)
+
+ public IndexDBWriteTask(IndexManager indexMgr, int cacheSize)
{
this.indexMgr = indexMgr;
- directBuffer = b;
this.dbKey = new DatabaseEntry();
this.dbValue = new DatabaseEntry();
this.cacheSize = cacheSize;
}
+
private SortedSet<Buffer> initializeBuffers() throws IOException
{
- int p = 0;
- int offSet = cacheSize;
SortedSet<Buffer> bufferSet = new TreeSet<Buffer>();
for(Buffer b : indexMgr.getBufferList())
{
- if(directBuffer != null)
- {
- directBuffer.position(p);
- directBuffer.limit(offSet);
- ByteBuffer slice = directBuffer.slice();
- b.initializeCache(indexMgr, slice, cacheSize);
- p += cacheSize;
- offSet += cacheSize;
- }
- else
- {
- b.initializeCache(indexMgr, null, cacheSize);
- }
+ b.initializeCache(indexMgr, null, cacheSize);
bufferSet.add(b);
}
indexMgr.getBufferList().clear();
return bufferSet;
}
+
+ /**
+ * {@inheritDoc}
+ */
public Void call() throws Exception
{
- byte[] cKey = null;
+ ByteBuffer cKey = null;
ImportIDSet cInsertIDSet = new ImportIDSet(),
cDeleteIDSet = new ImportIDSet();
+ Thread.setDefaultUncaughtExceptionHandler(
+ new DefaultExceptionHandler());
+ indexMgr.setStarted();
+ Message message =
+ NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(indexMgr.getFileName(),
+ indexMgr.getBufferList().size());
+ logError(message);
Integer cIndexID = null;
try
{
@@ -1621,8 +1724,15 @@
bufferSet.remove(b);
if(cKey == null)
{
+ cKey = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
cIndexID = b.getIndexID();
- cKey = b.getKey();
+ cKey.clear();
+ if(b.getKeyLen() > cKey.capacity())
+ {
+ cKey = ByteBuffer.allocate(b.getKeyLen());
+ }
+ cKey.flip();
+ b.getKey(cKey);
cInsertIDSet.merge(b.getInsertIDSet());
cDeleteIDSet.merge(b.getDeleteIDSet());
cInsertIDSet.setKey(cKey);
@@ -1635,7 +1745,13 @@
addToDB(cInsertIDSet, cDeleteIDSet, cIndexID);
indexMgr.incrementKeyCount();
cIndexID = b.getIndexID();
- cKey = b.getKey();
+ cKey.clear();
+ if(b.getKeyLen() > cKey.capacity())
+ {
+ cKey = ByteBuffer.allocate(b.getKeyLen());
+ }
+ cKey.flip();
+ b.getKey(cKey);
cInsertIDSet.clear(true);
cDeleteIDSet.clear(true);
cInsertIDSet.merge(b.getInsertIDSet());
@@ -1663,9 +1779,9 @@
}
catch (Exception e)
{
- Message message =
+ message =
ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR.get(indexMgr.getFileName(),
- e.getMessage());
+ e.getMessage());
logError(message);
e.printStackTrace();
throw e;
@@ -1677,7 +1793,7 @@
private void cleanUP() throws DatabaseException, DirectoryException,
IOException
{
- if(indexMgr.isDN2ID() && skipDNValidation)
+ if(indexMgr.isDN2ID())
{
for(DNState dnState : dnStateMap.values())
{
@@ -1701,41 +1817,44 @@
indexMgr.deleteIndexFile();
}
- private void addToDB(ImportIDSet insRec, ImportIDSet delRec, int indexID)
- throws InterruptedException, DatabaseException, DirectoryException
+
+ private void addToDB(ImportIDSet insertSet, ImportIDSet deleteSet,
+ int indexID) throws InterruptedException,
+ DatabaseException, DirectoryException
{
if(!indexMgr.isDN2ID())
{
Index index;
- if((delRec.size() > 0) || (!delRec.isDefined()))
+ if((deleteSet.size() > 0) || (!deleteSet.isDefined()))
{
- dbKey.setData(delRec.getKey());
+ dbKey.setData(deleteSet.getKey().array(), 0,
+ deleteSet.getKey().limit());
index = (Index)idContainerMap.get(indexID);
- index.delete(dbKey, delRec, dbValue);
+ index.delete(dbKey, deleteSet, dbValue);
if(!indexMap.containsKey(indexID))
{
indexMap.put(indexID, index);
}
}
-
-
- if((insRec.size() > 0) || (!insRec.isDefined()))
+ if((insertSet.size() > 0) || (!insertSet.isDefined()))
{
- dbKey.setData(insRec.getKey());
+ dbKey.setData(insertSet.getKey().array(), 0,
+ insertSet.getKey().limit());
index = (Index)idContainerMap.get(indexID);
- index.insert(dbKey, insRec, dbValue);
+ index.insert(dbKey, insertSet, dbValue);
if(!indexMap.containsKey(indexID))
{
indexMap.put(indexID, index);
}
}
}
- else if(skipDNValidation)
+ else
{
- addDN2ID(insRec, indexID);
+ addDN2ID(insertSet, indexID);
}
}
+
private void addDN2ID(ImportIDSet record, Integer indexID)
throws DatabaseException, DirectoryException
{
@@ -1749,7 +1868,6 @@
{
dnState = dnStateMap.get(indexID);
}
-
if(!dnState.checkParent(record))
{
return;
@@ -1764,6 +1882,8 @@
*/
class DNState
{
+ private final int DN_STATE_CACHE_SIZE = 64 * KB;
+
private DN parentDN, lastDN;
private EntryID parentID, lastID, entryID;
private final DatabaseEntry DNKey, DNValue;
@@ -1775,6 +1895,7 @@
private final int childLimit, subTreeLimit;
private final boolean childDoCount, subTreeDoCount;
+
DNState(EntryContainer entryContainer)
{
this.entryContainer = entryContainer;
@@ -1796,11 +1917,13 @@
private boolean checkParent(ImportIDSet record) throws DirectoryException,
DatabaseException
{
- DNKey.setData(record.getKey());
+ DN dn = DN.decode(new String(record.getKey().array(), 0 ,
+ record.getKey().limit()));
+ DNKey.setData(record.getKey().array(), 0 , record.getKey().limit());
byte[] v = record.toDatabase();
long v1 = JebFormat.entryIDFromDatabase(v);
DNValue.setData(v);
- DN dn = DN.decode(ByteString.wrap(DNKey.getData()));
+
entryID = new EntryID(v1);
//Bypass the cache for append data, lookup the parent in DN2ID and
//return.
@@ -1886,6 +2009,7 @@
}
}
+
private EntryID getParentID(DN dn) throws DatabaseException
{
EntryID nodeID;
@@ -1903,6 +2027,7 @@
return nodeID;
}
+
private void id2SubTree(EntryID childID)
throws DatabaseException, DirectoryException
{
@@ -1950,6 +2075,7 @@
}
}
+
private void flushMapToDB(Map<byte[], ImportIDSet> map, Index index,
boolean clearMap)
throws DatabaseException, DirectoryException
@@ -1968,6 +2094,7 @@
}
}
+
public void flush() throws DatabaseException, DirectoryException
{
flushMapToDB(id2childTree, entryContainer.getID2Children(), false);
@@ -1978,37 +2105,46 @@
/**
- * This task writes the temporary index files using the sorted buffers read
- * from a blocking queue.
+ * This task writes the temporary scratch index files using the sorted
+ * buffers read from a blocking queue private to each index.
*/
- private final class IndexFileWriterTask implements Runnable
+ private final class ScratchFileWriterTask implements Callable<Void>
{
+ private final int DRAIN_TO = 3;
private final IndexManager indexMgr;
private final BlockingQueue<IndexBuffer> queue;
private final ByteArrayOutputStream insetByteStream =
new ByteArrayOutputStream(2 * bufferSize);
private final ByteArrayOutputStream deleteByteStream =
new ByteArrayOutputStream(2 * bufferSize);
+ private final byte[] tmpArray = new byte[8];
+ private int insertKeyCount = 0, deleteKeyCount = 0;
private final DataOutputStream dataStream;
private long bufferCount = 0;
private final File file;
private final SortedSet<IndexBuffer> indexSortedSet;
private boolean poisonSeen = false;
+ ByteBuffer keyBuf = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
- public IndexFileWriterTask(BlockingQueue<IndexBuffer> queue,
- IndexManager indexMgr) throws FileNotFoundException
+
+ public ScratchFileWriterTask(BlockingQueue<IndexBuffer> queue,
+ IndexManager indexMgr) throws FileNotFoundException
{
this.queue = queue;
file = indexMgr.getFile();
this.indexMgr = indexMgr;
BufferedOutputStream bufferedStream =
- new BufferedOutputStream(new FileOutputStream(file), 2 * MB);
+ new BufferedOutputStream(new FileOutputStream(file),
+ READER_WRITER_BUFFER_SIZE);
dataStream = new DataOutputStream(bufferedStream);
indexSortedSet = new TreeSet<IndexBuffer>();
}
- public void run()
+ /**
+ * {@inheritDoc}
+ */
+ public Void call() throws Exception
{
long offset = 0;
List<IndexBuffer> l = new LinkedList<IndexBuffer>();
@@ -2027,9 +2163,12 @@
bufferLen = writeIndexBuffers(l);
for(IndexBuffer id : l)
{
- id.reset();
+ if(!id.isDiscard())
+ {
+ id.reset();
+ freeBufferQueue.add(id);
+ }
}
- freeBufferQueue.addAll(l);
l.clear();
}
else
@@ -2039,8 +2178,11 @@
break;
}
bufferLen = writeIndexBuffer(indexBuffer);
- indexBuffer.reset();
- freeBufferQueue.add(indexBuffer);
+ if(!indexBuffer.isDiscard())
+ {
+ indexBuffer.reset();
+ freeBufferQueue.add(indexBuffer);
+ }
}
offset += bufferLen;
indexMgr.addBuffer(new Buffer(beginOffset, offset, bufferCount));
@@ -2052,16 +2194,22 @@
}
}
}
- dataStream.close();
- indexMgr.setFileLength();
}
- catch (IOException e)
+ catch (Exception e)
{
Message message =
ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(file.getName(),
e.getMessage());
logError(message);
+ isPhaseOneCanceled = true;
+ throw e;
}
+ finally
+ {
+ dataStream.close();
+ indexMgr.setFileLength();
+ }
+ return null;
}
@@ -2070,8 +2218,8 @@
int numberKeys = indexBuffer.getNumberKeys();
indexBuffer.setPosition(-1);
long bufferLen = 0;
- insetByteStream.reset();
- deleteByteStream.reset();
+ insetByteStream.reset(); insertKeyCount = 0;
+ deleteByteStream.reset(); deleteKeyCount = 0;
for(int i = 0; i < numberKeys; i++)
{
if(indexBuffer.getPosition() == -1)
@@ -2079,35 +2227,39 @@
indexBuffer.setPosition(i);
if(indexBuffer.isInsert(i))
{
- insetByteStream.write(indexBuffer.getIDBytes(i));
+ indexBuffer.writeID(insetByteStream, i);
+ insertKeyCount++;
}
else
{
- deleteByteStream.write(indexBuffer.getIDBytes(i));
+ indexBuffer.writeID(deleteByteStream, i);
+ deleteKeyCount++;
}
continue;
}
if(!indexBuffer.compare(i))
{
- bufferLen += indexBuffer.writeRecord(insetByteStream,
- deleteByteStream, dataStream);
+ bufferLen += writeRecord(indexBuffer);
indexBuffer.setPosition(i);
- insetByteStream.reset();
- deleteByteStream.reset();
+ insetByteStream.reset();insertKeyCount = 0;
+ deleteByteStream.reset();deleteKeyCount = 0;
}
if(indexBuffer.isInsert(i))
{
- insetByteStream.write(indexBuffer.getIDBytes(i));
+ if(insertKeyCount++ <= indexMgr.getLimit())
+ {
+ indexBuffer.writeID(insetByteStream, i);
+ }
}
else
{
- deleteByteStream.write(indexBuffer.getIDBytes(i));
+ indexBuffer.writeID(deleteByteStream, i);
+ deleteKeyCount++;
}
}
if(indexBuffer.getPosition() != -1)
{
- bufferLen += indexBuffer.writeRecord(insetByteStream, deleteByteStream,
- dataStream);
+ bufferLen += writeRecord(indexBuffer);
}
return bufferLen;
}
@@ -2118,8 +2270,8 @@
{
long id = 0;
long bufferLen = 0;
- insetByteStream.reset();
- deleteByteStream.reset();
+ insetByteStream.reset(); insertKeyCount = 0;
+ deleteByteStream.reset(); deleteKeyCount = 0;
for(IndexBuffer b : buffers)
{
if(b.isPoison())
@@ -2141,45 +2293,54 @@
indexSortedSet.remove(b);
if(saveKey == null)
{
- saveKey = b.getKeyBytes();
+ saveKey = b.getKey();
saveIndexID = b.getIndexID();
if(b.isInsert(b.getPosition()))
{
- insetByteStream.write(b.getIDBytes(b.getPosition()));
+ b.writeID(insetByteStream, b.getPosition());
+ insertKeyCount++;
}
else
{
- deleteByteStream.write(b.getIDBytes(b.getPosition()));
+ b.writeID(deleteByteStream, b.getPosition());
+ deleteKeyCount++;
}
}
else
{
if(!b.compare(saveKey, saveIndexID))
{
- bufferLen += IndexBuffer.writeRecord(saveKey, saveIndexID,
- insetByteStream, deleteByteStream, dataStream);
+ bufferLen += writeRecord(saveKey, saveIndexID);
insetByteStream.reset();
deleteByteStream.reset();
- saveKey = b.getKeyBytes();
+ insertKeyCount = 0;
+ deleteKeyCount = 0;
+ saveKey = b.getKey();
saveIndexID = b.getIndexID();
if(b.isInsert(b.getPosition()))
{
- insetByteStream.write(b.getIDBytes(b.getPosition()));
+ b.writeID(insetByteStream, b.getPosition());
+ insertKeyCount++;
}
else
{
- deleteByteStream.write(b.getIDBytes(b.getPosition()));
+ b.writeID(deleteByteStream, b.getPosition());
+ deleteKeyCount++;
}
}
else
{
if(b.isInsert(b.getPosition()))
{
- insetByteStream.write(b.getIDBytes(b.getPosition()));
+ if(insertKeyCount++ <= indexMgr.getLimit())
+ {
+ b.writeID(insetByteStream, b.getPosition());
+ }
}
else
{
- deleteByteStream.write(b.getIDBytes(b.getPosition()));
+ b.writeID(deleteByteStream, b.getPosition());
+ deleteKeyCount++;
}
}
}
@@ -2191,13 +2352,71 @@
}
if(saveKey != null)
{
- bufferLen += IndexBuffer.writeRecord(saveKey, saveIndexID,
- insetByteStream, deleteByteStream, dataStream);
+ bufferLen += writeRecord(saveKey, saveIndexID);
}
return bufferLen;
}
+
+
+ private int writeByteStreams() throws IOException
+ {
+ if(insertKeyCount > indexMgr.getLimit())
+ {
+ insertKeyCount = 1;
+ insetByteStream.reset();
+ PackedInteger.writeInt(tmpArray, 0, -1);
+ insetByteStream.write(tmpArray, 0, 1);
+ }
+ int insertSize = PackedInteger.getWriteIntLength(insertKeyCount);
+ PackedInteger.writeInt(tmpArray, 0, insertKeyCount);
+ dataStream.write(tmpArray, 0, insertSize);
+ if(insetByteStream.size() > 0)
+ {
+ insetByteStream.writeTo(dataStream);
+ }
+ int deleteSize = PackedInteger.getWriteIntLength(deleteKeyCount);
+ PackedInteger.writeInt(tmpArray, 0, deleteKeyCount);
+ dataStream.write(tmpArray, 0, deleteSize);
+ if(deleteByteStream.size() > 0)
+ {
+ deleteByteStream.writeTo(dataStream);
+ }
+ return insertSize + deleteSize;
+ }
+
+
+ private int writeHeader(int indexID, int keySize) throws IOException
+ {
+ dataStream.writeInt(indexID);
+ int packedSize = PackedInteger.getWriteIntLength(keySize);
+ PackedInteger.writeInt(tmpArray, 0, keySize);
+ dataStream.write(tmpArray, 0, packedSize);
+ return packedSize;
+ }
+
+
+ private int writeRecord(IndexBuffer b) throws IOException
+ {
+ int keySize = b.getKeySize();
+ int packedSize = writeHeader(b.getIndexID(), keySize);
+ b.writeKey(dataStream);
+ packedSize += writeByteStreams();
+ return (packedSize + keySize + insetByteStream.size() +
+ deleteByteStream.size() + 4);
+ }
+
+
+ private int writeRecord(byte[] k, int indexID) throws IOException
+ {
+ int packedSize = writeHeader(indexID, k.length);
+ dataStream.write(k);
+ packedSize += writeByteStreams();
+ return (packedSize + k.length + insetByteStream.size() +
+ deleteByteStream.size() + 4);
+ }
}
+
/**
* This task main function is to sort the index buffers given to it from
* the import tasks reading the LDIF file. It will also create a index
@@ -2221,8 +2440,9 @@
public Void call() throws Exception
{
if (importConfiguration != null &&
- importConfiguration.isCancelled())
+ importConfiguration.isCancelled() || isPhaseOneCanceled)
{
+ isPhaseOneCanceled =true;
return null;
}
indexBuffer.sort();
@@ -2255,14 +2475,23 @@
{
isDN = true;
}
- IndexManager indexMgr = new IndexManager(indexKey.getName(), isDN);
- indexMgrList.add(indexMgr);
+ IndexManager indexMgr = new IndexManager(indexKey.getName(), isDN,
+ indexKey.getEntryLimit());
+ if(isDN)
+ {
+ DNIndexMgrList.add(indexMgr);
+ }
+ else
+ {
+ indexMgrList.add(indexMgr);
+ }
BlockingQueue<IndexBuffer> newQue =
- new ArrayBlockingQueue<IndexBuffer>(indexBufferCount);
- IndexFileWriterTask indexWriter =
- new IndexFileWriterTask(newQue, indexMgr);
- indexWriterList.add(indexWriter);
- indexWriterFutures.add(indexProcessService.submit(indexWriter));
+ new ArrayBlockingQueue<IndexBuffer>(phaseOneBufferCount);
+ ScratchFileWriterTask indexWriter =
+ new ScratchFileWriterTask(newQue, indexMgr);
+ scratchFileWriterList.add(indexWriter);
+ scratchFileWriterFutures.add(
+ scratchFileWriterService.submit(indexWriter));
indexKeyQueMap.put(indexKey, newQue);
}
}
@@ -2278,12 +2507,11 @@
private final long begin, end, id;
private long offset;
private ByteBuffer cache;
- private int keyLen, idLen, limit;
- private byte[] key;
+ private int limit;;
private ImportIDSet insertIDSet = null, deleteIDSet = null;
private Integer indexID = null;
private boolean doCount;
- private Comparator<byte[]> comparator;
+ private ByteBuffer keyBuf = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY);
public Buffer(long begin, long end, long id)
@@ -2309,6 +2537,7 @@
}
loadCache();
cache.flip();
+ keyBuf.flip();
}
@@ -2349,9 +2578,20 @@
}
}
- public byte[] getKey()
+ public int getKeyLen()
{
- return key;
+ return keyBuf.limit();
+ }
+
+ public void getKey(ByteBuffer b)
+ {
+ keyBuf.get(b.array(), 0, keyBuf.limit());
+ b.limit(keyBuf.limit());
+ }
+
+ ByteBuffer getKeyBuf()
+ {
+ return keyBuf;
}
public ImportIDSet getInsertIDSet()
@@ -2376,7 +2616,10 @@
try {
getNextRecord();
} catch(IOException ex) {
- System.out.println("MPD need some error message");
+ Message message = ERR_JEB_IO_ERROR.get(ex.getMessage());
+ logError(message);
+ ex.printStackTrace();
+ System.exit(1);
}
}
return indexID;
@@ -2400,7 +2643,6 @@
Index index = (Index) idContainerMap.get(indexID);
limit = index.getIndexEntryLimit();
doCount = index.getMaintainCount();
- comparator = index.getComparator();
if(insertIDSet == null)
{
insertIDSet = new ImportIDSet(128, limit, doCount);
@@ -2409,7 +2651,6 @@
}
else
{
- comparator = ((DN2ID) idContainerMap.get(indexID)).getComparator();
if(insertIDSet == null)
{
insertIDSet = new ImportIDSet(1, limit, doCount);
@@ -2418,24 +2659,13 @@
}
}
+
private int getInt() throws IOException
{
ensureData(4);
return cache.getInt();
}
- private long getLong() throws IOException
- {
- ensureData(8);
- return cache.getLong();
- }
-
- private void getBytes(byte[] b) throws IOException
- {
- ensureData(b.length);
- cache.get(b);
- }
-
private void getNextIndexID() throws IOException, BufferUnderflowException
{
indexID = getInt();
@@ -2443,28 +2673,50 @@
private void getNextKey() throws IOException, BufferUnderflowException
{
- keyLen = getInt();
- key = new byte[keyLen];
- getBytes(key);
+ ensureData(20);
+ byte[] ba = cache.array();
+ int p = cache.position();
+ int len = PackedInteger.getReadIntLength(ba, p);
+ int keyLen = PackedInteger.readInt(ba, p);
+ cache.position(p + len);
+ if(keyLen > keyBuf.capacity())
+ {
+ keyBuf = ByteBuffer.allocate(keyLen);
+ }
+ ensureData(keyLen);
+ keyBuf.clear();
+ cache.get(keyBuf.array(), 0, keyLen);
+ keyBuf.limit(keyLen);
}
private void getNextIDSet(boolean insert)
throws IOException, BufferUnderflowException
{
- idLen = getInt();
- int idCount = idLen/8;
-
+ ensureData(20);
+ int p = cache.position();
+ byte[] ba = cache.array();
+ int len = PackedInteger.getReadIntLength(ba, p);
+ int keyCount = PackedInteger.readInt(ba, p);
+ p += len;
+ cache.position(p);
if(insert)
{
- insertIDSet.clear(false);
+ insertIDSet.clear(false);
}
else
{
- deleteIDSet.clear(false);
+ deleteIDSet.clear(false);
}
- for(int i = 0; i < idCount; i++)
+ for(int k = 0; k < keyCount; k++)
{
- long l = getLong();
+ if(ensureData(9))
+ {
+ p = cache.position();
+ }
+ len = PackedInteger.getReadLongLength(ba, p);
+ long l = PackedInteger.readLong(ba, p);
+ p += len;
+ cache.position(p);
if(insert)
{
insertIDSet.addEntryID(l);
@@ -2477,32 +2729,45 @@
}
- private void ensureData(int len) throws IOException
+ private boolean ensureData(int len) throws IOException
{
+ boolean ret = false;
if(cache.remaining() == 0)
{
cache.clear();
loadCache();
cache.flip();
+ ret = true;
}
else if(cache.remaining() < len)
{
cache.compact();
loadCache();
cache.flip();
+ ret = true;
}
+ return ret;
}
- private int compare(byte[] cKey, Integer cIndexID)
+ private int compare(ByteBuffer cKey, Integer cIndexID)
{
-
- int returnCode;
- if(key == null)
+ int returnCode, rc = 0;
+ if(keyBuf.limit() == 0)
{
getIndexID();
}
- if(comparator.compare(key, cKey) != 0) {
+ if(indexMgr.isDN2ID())
+ {
+ rc = dnComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
+ cKey.array(), cKey.limit());
+ }
+ else
+ {
+ rc = indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
+ cKey.array(), cKey.limit());
+ }
+ if(rc != 0) {
returnCode = 1;
}
else
@@ -2520,14 +2785,26 @@
{
return 0;
}
- if(key == null) {
+ if(keyBuf.limit() == 0) {
getIndexID();
}
- if(o.getKey() == null)
+ if(o.getKeyBuf().limit() == 0)
{
o.getIndexID();
}
- int returnCode = comparator.compare(key, o.getKey());
+ int returnCode = 0;
+ byte[] oKey = o.getKeyBuf().array();
+ int oLen = o.getKeyBuf().limit();
+ if(indexMgr.isDN2ID())
+ {
+ returnCode = dnComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
+ oKey, oLen);
+ }
+ else
+ {
+ returnCode = indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(),
+ oKey, oLen);
+ }
if(returnCode == 0)
{
if(indexID.intValue() == o.getIndexID().intValue())
@@ -2562,9 +2839,16 @@
}
}
+
/**
- * The index manager class is used to carry information about index processing
- * from phase 1 to phase 2.
+ * The index manager class has several functions:
+ *
+ * 1. It used to carry information about index processing created in phase
+ * one to phase two.
+ *
+ * 2. It collects statistics about phase two processing for each index.
+ *
+ * 3. It manages opening and closing the scratch index files.
*/
private final class IndexManager
{
@@ -2572,69 +2856,89 @@
private RandomAccessFile rFile = null;
private final List<Buffer> bufferList = new LinkedList<Buffer>();
private long fileLength, bytesRead = 0;
- private boolean done = false;
+ private boolean done = false, started = false;
private long totalDNS;
private AtomicInteger keyCount = new AtomicInteger(0);
private final String fileName;
private final boolean isDN;
+ private final int limit;
- public IndexManager(String fileName, boolean isDN)
+
+ IndexManager(String fileName, boolean isDN, int limit)
{
file = new File(tempDir, fileName);
this.fileName = fileName;
this.isDN = isDN;
+ this.limit = limit;
}
- public void openIndexFile() throws FileNotFoundException
+
+ void openIndexFile() throws FileNotFoundException
{
rFile = new RandomAccessFile(file, "r");
}
+
public FileChannel getChannel()
{
return rFile.getChannel();
}
+
public void addBuffer(Buffer o)
{
this.bufferList.add(o);
}
+
public List<Buffer> getBufferList()
{
return bufferList;
}
+
public File getFile()
{
return file;
}
+
public boolean deleteIndexFile()
{
- return file.delete();
+ return file.delete();
}
+
public void close() throws IOException
{
- rFile.close();
+ rFile.close();
}
+
public void setFileLength()
{
this.fileLength = file.length();
}
+
public void addBytesRead(int bytesRead)
{
this.bytesRead += bytesRead;
}
+
public void setDone()
{
this.done = true;
}
+
+ public void setStarted()
+ {
+ started = true;
+ }
+
+
public void addTotDNCount(int delta)
{
this.totalDNS += delta;
@@ -2646,53 +2950,86 @@
return totalDNS;
}
+
public boolean isDN2ID()
{
return isDN;
}
+
public void printStats(long deltaTime)
{
- if(!done)
+ if(!done && started)
{
float rate = 1000f * keyCount.getAndSet(0) / deltaTime;
Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(fileName,
- (fileLength - bytesRead), rate);
+ (fileLength - bytesRead), rate);
logError(message);
}
}
+
public void incrementKeyCount()
{
keyCount.incrementAndGet();
}
+
public String getFileName()
{
return fileName;
}
+
+
+ public int getLimit()
+ {
+ return limit;
+ }
}
/**
- * The rebuild manager handles all rebuild index related tasks.
+ * The rebuild index manager handles all rebuild index related processing.
*/
- class RebuildManager extends ImportTask {
+ class RebuildIndexManager extends ImportTask {
+ //Rebuild index configuration.
private final RebuildConfig rebuildConfig;
+
+ //Local DB backend configuration.
private final LocalDBBackendCfg cfg;
+
+ //Map of index keys to indexes.
private final Map<IndexKey, Index> indexMap =
new LinkedHashMap<IndexKey, Index>();
+
+ //Map of index keys to extensible indexes.
private final Map<IndexKey, Collection<Index>> extensibleIndexMap =
new LinkedHashMap<IndexKey, Collection<Index>>();
+
+ //List of VLV indexes.
private final List<VLVIndex> vlvIndexes = new LinkedList<VLVIndex>();
+
+ //The DN2ID index.
private DN2ID dn2id = null;
+
+ //The DN2URI index.
private DN2URI dn2uri = null;
+
+ //Total entries to be processed.
private long totalEntries =0;
+
+ //Total entries processed.
private final AtomicLong entriesProcessed = new AtomicLong(0);
+
+ //The suffix instance.
private Suffix suffix = null;
+
+ //Set to true if the rebuild all flag was specified.
private final boolean rebuildAll;
- private EntryContainer ec;
+
+ //The entry container.
+ private EntryContainer entryContainer;
/**
@@ -2702,23 +3039,26 @@
* @param rebuildConfig The rebuild configuration to use.
* @param cfg The local DB configuration to use.
*/
- public RebuildManager(RebuildConfig rebuildConfig, LocalDBBackendCfg cfg)
+ public RebuildIndexManager(RebuildConfig rebuildConfig,
+ LocalDBBackendCfg cfg)
{
this.rebuildConfig = rebuildConfig;
this.cfg = cfg;
- this.rebuildAll = rebuildConfig.isRebuildAll();
+ rebuildAll = rebuildConfig.isRebuildAll();
}
+
/**
- * Initialize a rebuild manager to start rebuilding indexes.
+ * Initialize a rebuild index manager.
*
* @throws ConfigException If an configuration error occurred.
* @throws InitializationException If an initialization error occurred.
*/
public void initialize() throws ConfigException, InitializationException
{
- ec = rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
- suffix = Suffix.createSuffixContext(ec, null, null, null);
+ entryContainer =
+ rootContainer.getEntryContainer(rebuildConfig.getBaseDN());
+ suffix = Suffix.createSuffixContext(entryContainer, null, null, null);
if(suffix == null)
{
Message msg = ERR_JEB_REBUILD_SUFFIX_ERROR.get(rebuildConfig.
@@ -2727,6 +3067,7 @@
}
}
+
/**
* Print start message.
*
@@ -2752,13 +3093,14 @@
logError(message);
}
+
/**
* Print stop message.
*
* @param startTime The time the rebuild started.
*/
public void printStopMessage(long startTime)
- {
+ {
long finishTime = System.currentTimeMillis();
long totalTime = (finishTime - startTime);
float rate = 0;
@@ -2767,10 +3109,10 @@
rate = 1000f* entriesProcessed.get() / totalTime;
}
Message message =
- NOTE_JEB_REBUILD_FINAL_STATUS.get(entriesProcessed.get(),
- totalTime/1000, rate);
- logError(message);
- }
+ NOTE_JEB_REBUILD_FINAL_STATUS.get(entriesProcessed.get(),
+ totalTime/1000, rate);
+ logError(message);
+ }
/**
@@ -2778,34 +3120,45 @@
*/
public Void call() throws Exception
{
- ID2Entry id2entry = ec.getID2Entry();
+ ID2Entry id2entry = entryContainer.getID2Entry();
Cursor cursor = id2entry.openCursor(null, CursorConfig.READ_COMMITTED);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
LockMode lockMode = LockMode.DEFAULT;
OperationStatus status;
try {
- for (status = cursor.getFirst(key, data, lockMode);
- status == OperationStatus.SUCCESS;
- status = cursor.getNext(key, data, lockMode))
- {
- EntryID entryID = new EntryID(key);
- Entry entry = ID2Entry.entryFromDatabase(
- ByteString.wrap(data.getData()),
- ec.getRootContainer().getCompressedSchema());
- processEntry(entry, entryID);
- entriesProcessed.getAndIncrement();
+ for (status = cursor.getFirst(key, data, lockMode);
+ status == OperationStatus.SUCCESS;
+ status = cursor.getNext(key, data, lockMode))
+ {
+ if(isPhaseOneCanceled)
+ {
+ return null;
+ }
+ EntryID entryID = new EntryID(key);
+ Entry entry = ID2Entry.entryFromDatabase(
+ ByteString.wrap(data.getData()),
+ entryContainer.getRootContainer().getCompressedSchema());
+ processEntry(entry, entryID);
+ entriesProcessed.getAndIncrement();
+ }
+ flushIndexBuffers();
+ cursor.close();
}
- flushIndexBuffers();
- cursor.close();
- } catch (Exception e) {
- e.printStackTrace();
+ catch (Exception e)
+ {
+ Message message =
+ ERR_JEB_IMPORT_LDIF_REBUILD_INDEX_TASK_ERR.get(e.getMessage());
+ logError(message);
+ isPhaseOneCanceled = true;
+ throw e;
}
return null;
- }
+ }
+
/**
- * Perform the index rebuild.
+ * Perform rebuild index processing.
*
* @throws DatabaseException If an database error occurred.
* @throws InterruptedException If an interrupted error occurred.
@@ -2813,19 +3166,24 @@
* @throws JebException If an JEB error occurred.
*/
public void rebuldIndexes() throws DatabaseException, InterruptedException,
- ExecutionException, JebException
- {
- processPhaseOne();
- processPhaseTwo();
- if(rebuildAll)
- {
- setAllIndexesTrusted();
- }
- else
- {
- setRebuildListIndexesTrusted();
- }
- }
+ ExecutionException, JebException
+ {
+ phaseOne();
+ if(isPhaseOneCanceled)
+ {
+ throw new InterruptedException("Rebuild Index canceled.");
+ }
+ phaseTwo();
+ if(rebuildAll)
+ {
+ setAllIndexesTrusted();
+ }
+ else
+ {
+ setRebuildListIndexesTrusted();
+ }
+ }
+
private void setRebuildListIndexesTrusted() throws JebException
{
@@ -2877,693 +3235,759 @@
}
}
+
private void setAllIndexesTrusted() throws JebException
- {
- try {
- suffix.setIndexesTrusted();
- }
- catch (DatabaseException ex)
- {
- Message message =
- NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
- throw new JebException(message);
- }
- }
-
- private void processPhaseOne() throws DatabaseException,
- InterruptedException, ExecutionException {
- if(rebuildAll)
- {
- clearAllIndexes();
- }
- else
- {
- clearRebuildListIndexes();
- }
- initializeIndexBuffers(threadCount);
- RBFirstPhaseProgressTask progressTask = new RBFirstPhaseProgressTask();
- Timer timer = new Timer();
- timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
- indexProcessService = Executors.newFixedThreadPool(2 * indexCount);
- sortService = Executors.newFixedThreadPool(threadCount);
- ExecutorService execService = Executors.newFixedThreadPool(threadCount);
- List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
- for (int i = 0; i < threadCount; i++)
- {
- tasks.add(this);
- }
- List<Future<Void>> results = execService.invokeAll(tasks);
- for (Future<Void> result : results) {
- if(!result.isDone()) {
- result.get();
- }
- }
- stopIndexWriterTasks();
- for (Future<?> result : indexWriterFutures)
- {
- if(!result.isDone()) {
- result.get();
- }
- }
- tasks.clear();
- results.clear();
- execService.shutdown();
- freeBufferQueue.clear();
- sortService.shutdown();
- timer.cancel();
- }
-
-
- private void processPhaseTwo() throws InterruptedException
- {
- SecondPhaseProgressTask progress2Task =
- new SecondPhaseProgressTask(indexMgrList, entriesProcessed.get());
- Timer timer2 = new Timer();
- timer2.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL, TIMER_INTERVAL);
- processIndexFiles();
- timer2.cancel();
- }
-
- private int getIndexCount() throws ConfigException, JebException
- {
- int indexCount;
- if(!rebuildAll)
{
- indexCount = getRebuildListIndexCount(cfg);
+ try {
+ suffix.setIndexesTrusted();
+ }
+ catch (DatabaseException ex)
+ {
+ Message message =
+ NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
+ throw new JebException(message);
+ }
}
- else
+
+
+ private void phaseOne() throws DatabaseException,
+ InterruptedException, ExecutionException {
+ if(rebuildAll)
+ {
+ clearAllIndexes();
+ }
+ else
+ {
+ clearRebuildListIndexes();
+ }
+ initializeIndexBuffers();
+ RebuildFirstPhaseProgressTask progressTask =
+ new RebuildFirstPhaseProgressTask();
+ Timer timer = new Timer();
+ timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
+ scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
+ bufferSortService = Executors.newFixedThreadPool(threadCount);
+ ExecutorService rebuildIndexService =
+ Executors.newFixedThreadPool(threadCount);
+ List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
+ for (int i = 0; i < threadCount; i++)
+ {
+ tasks.add(this);
+ }
+ List<Future<Void>> results = rebuildIndexService.invokeAll(tasks);
+ for (Future<Void> result : results) {
+ if(!result.isDone()) {
+ result.get();
+ }
+ }
+ stopScratchFileWriters();
+ for (Future<?> result : scratchFileWriterFutures)
+ {
+ if(!result.isDone()) {
+ result.get();
+ }
+ }
+ //Try to clear as much memory as possible.
+ tasks.clear();
+ results.clear();
+ rebuildIndexService.shutdown();
+ freeBufferQueue.clear();
+ bufferSortService.shutdown();
+ timer.cancel();
+ }
+
+
+ private void phaseTwo() throws InterruptedException, JebException,
+ ExecutionException
{
- indexCount = getAllIndexesCount(cfg);
+ SecondPhaseProgressTask progressTask =
+ new SecondPhaseProgressTask(entriesProcessed.get());
+ Timer timer2 = new Timer();
+ timer2.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
+ processIndexFiles();
+ timer2.cancel();
}
- return indexCount;
- }
-
- private int getAllIndexesCount(LocalDBBackendCfg cfg)
- {
- int indexCount = cfg.listLocalDBIndexes().length;
- indexCount += cfg.listLocalDBVLVIndexes().length;
- indexCount += 4;
- return indexCount;
- }
-
- private int getRebuildListIndexCount(LocalDBBackendCfg cfg)
- throws JebException, ConfigException
- {
- int indexCount = 0;
- List<String> rebuildList = rebuildConfig.getRebuildList();
- if(!rebuildList.isEmpty())
- {
- for (String index : rebuildList)
- {
- String lowerName = index.toLowerCase();
- if (lowerName.equals("dn2id"))
- {
- indexCount += 3;
- }
- else if (lowerName.equals("dn2uri"))
- {
- indexCount++;
- }
- else if (lowerName.startsWith("vlv."))
- {
- if(lowerName.length() < 5)
- {
- Message msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName);
- throw new JebException(msg);
- }
- indexCount++;
- } else if(lowerName.equals("id2subtree") ||
- lowerName.equals("id2children"))
- {
- Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
- throw new JebException(msg);
- }
- else
- {
- String[] attrIndexParts = lowerName.split("\\.");
- if((attrIndexParts.length <= 0) || (attrIndexParts.length > 3))
- {
- Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
- throw new JebException(msg);
- }
- AttributeType attrType =
- DirectoryServer.getAttributeType(attrIndexParts[0]);
- if (attrType == null)
- {
- Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
- throw new JebException(msg);
- }
- if(attrIndexParts.length != 1)
- {
- if(attrIndexParts.length == 2)
- {
- if(attrIndexParts[1].equals("presence"))
- {
- indexCount++;
- }
- else if(attrIndexParts[1].equals("equality"))
- {
- indexCount++;
- }
- else if(attrIndexParts[1].equals("substring"))
- {
- indexCount++;
- }
- else if(attrIndexParts[1].equals("ordering"))
- {
- indexCount++;
- }
- else if(attrIndexParts[1].equals("approximate"))
- {
- indexCount++;
- } else {
- Message msg =
- ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
- throw new JebException(msg);
- }
- }
- else
- {
- boolean found = false;
- String s = attrIndexParts[1] + "." + attrIndexParts[2];
- for (String idx : cfg.listLocalDBIndexes())
- {
- LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
- if (indexCfg.getIndexType().
- contains(LocalDBIndexCfgDefn.IndexType.EXTENSIBLE))
- {
- Set<String> extensibleRules =
- indexCfg.getIndexExtensibleMatchingRule();
- for(String exRule : extensibleRules)
- {
- if(exRule.equalsIgnoreCase(s))
- {
- found = true;
- break;
- }
- }
- }
- if(found)
- {
- break;
- }
- }
- if(!found) {
- Message msg =
- ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
- throw new JebException(msg);
- }
- indexCount++;
- }
- }
- else
- {
- for (String idx : cfg.listLocalDBIndexes())
- {
- if(!idx.equalsIgnoreCase(index))
- {
- continue;
- }
- LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
- if(indexCfg.getIndexType().
- contains(LocalDBIndexCfgDefn.IndexType.EQUALITY))
- {
- indexCount++;
- }
- if(indexCfg.getIndexType().
- contains(LocalDBIndexCfgDefn.IndexType.ORDERING))
- {
- indexCount++;
- }
- if(indexCfg.getIndexType().
- contains(LocalDBIndexCfgDefn.IndexType.PRESENCE))
- {
- indexCount++;
- }
- if(indexCfg.getIndexType().
- contains(LocalDBIndexCfgDefn.IndexType.SUBSTRING))
- {
- indexCount++;
- }
- if(indexCfg.getIndexType().
- contains(LocalDBIndexCfgDefn.IndexType.APPROXIMATE))
- {
- indexCount++;
- }
- if (indexCfg.getIndexType().
- contains(LocalDBIndexCfgDefn.IndexType.EXTENSIBLE))
- {
- Set<String> extensibleRules =
- indexCfg.getIndexExtensibleMatchingRule();
- boolean shared = false;
- for(String exRule : extensibleRules)
- {
- if(exRule.endsWith(".sub"))
- {
- indexCount++;
- }
- else
- {
- if(!shared)
- {
- shared=true;
- indexCount++;
- }
- }
- }
- }
- }
- }
- }
- }
- }
- return indexCount;
- }
-
- private void clearRebuildListIndexes() throws DatabaseException
- {
- List<String> rebuildList = rebuildConfig.getRebuildList();
- if(!rebuildList.isEmpty())
- {
- for (String index : rebuildList)
- {
- String lowerName = index.toLowerCase();
- if (lowerName.equals("dn2id"))
- {
- clearDN2IDIndexes(ec);
- }
- else if (lowerName.equals("dn2uri"))
- {
- clearDN2URI(ec);
- }
- else if (lowerName.startsWith("vlv."))
- {
- clearVLVIndex(lowerName.substring(4), ec);
- }
- else
- {
- String[] attrIndexParts = lowerName.split("\\.");
- AttributeType attrType =
- DirectoryServer.getAttributeType(attrIndexParts[0]);
- AttributeIndex attrIndex = ec.getAttributeIndex(attrType);
-
- if(attrIndexParts.length != 1)
- {
- Index partialAttrIndex;
- if(attrIndexParts[1].equals("presence"))
- {
- partialAttrIndex = attrIndex.getPresenceIndex();
- ec.clearDatabase(partialAttrIndex);
- IndexKey indexKey =
- new IndexKey(attrType, ImportIndexType.PRESENCE);
- indexMap.put(indexKey, partialAttrIndex);
- }
- else if(attrIndexParts[1].equals("equality"))
- {
- partialAttrIndex = attrIndex.getEqualityIndex();
- ec.clearDatabase(partialAttrIndex);
- IndexKey indexKey =
- new IndexKey(attrType, ImportIndexType.EQUALITY);
- indexMap.put(indexKey, partialAttrIndex);
- }
- else if(attrIndexParts[1].equals("substring"))
- {
- partialAttrIndex = attrIndex.getSubstringIndex();
- ec.clearDatabase(partialAttrIndex);
- IndexKey indexKey =
- new IndexKey(attrType, ImportIndexType.SUBSTRING);
- indexMap.put(indexKey, partialAttrIndex);
- }
- else if(attrIndexParts[1].equals("ordering"))
- {
- partialAttrIndex = attrIndex.getOrderingIndex();
- ec.clearDatabase(partialAttrIndex);
- IndexKey indexKey =
- new IndexKey(attrType, ImportIndexType.ORDERING);
- indexMap.put(indexKey, partialAttrIndex);
- }
- else if(attrIndexParts[1].equals("approximate"))
- {
- partialAttrIndex = attrIndex.getApproximateIndex();
- ec.clearDatabase(partialAttrIndex);
- IndexKey indexKey =
- new IndexKey(attrType, ImportIndexType.APPROXIMATE);
- indexMap.put(indexKey, partialAttrIndex);
- }
- else
- {
- String dbPart = "shared";
- if(attrIndexParts[2].startsWith("sub"))
- {
- dbPart = "substring";
- }
- StringBuilder nameBldr = new StringBuilder();
- nameBldr.append(ec.getDatabasePrefix());
- nameBldr.append("_");
- nameBldr.append(attrIndexParts[0]);
- nameBldr.append(".");
- nameBldr.append(attrIndexParts[1]);
- nameBldr.append(".");
- nameBldr.append(dbPart);
- String indexName = nameBldr.toString();
- Map<String,Collection<Index>> extensibleMap =
- attrIndex.getExtensibleIndexes();
- if(!extensibleMap.isEmpty()) {
- Collection<Index> subIndexes =
- attrIndex.getExtensibleIndexes().get(
- EXTENSIBLE_INDEXER_ID_SUBSTRING);
- if(subIndexes != null) {
- for(Index subIndex : subIndexes) {
- String name = subIndex.getName();
- if(name.equalsIgnoreCase(indexName))
- {
- ec.clearDatabase(subIndex);
- Collection<Index> substring = new ArrayList<Index>();
- substring.add(subIndex);
- extensibleIndexMap.put(new IndexKey(attrType,
- ImportIndexType.EX_SUBSTRING),substring);
- break;
- }
- }
- Collection<Index> sharedIndexes = attrIndex.
- getExtensibleIndexes().get(EXTENSIBLE_INDEXER_ID_SHARED);
- if(sharedIndexes !=null) {
- for(Index sharedIndex : sharedIndexes) {
- String name = sharedIndex.getName();
- if(name.equalsIgnoreCase(indexName))
- {
- ec.clearDatabase(sharedIndex);
- Collection<Index> shared = new ArrayList<Index>();
- shared.add(sharedIndex);
- extensibleIndexMap.put(new IndexKey(attrType,
- ImportIndexType.EX_SHARED), shared);
- break;
- }
- }
- }
- }
- }
- }
- }
- else
- {
- clearAttributeIndexes(attrIndex, attrType, ec);
- }
- }
- }
- }
- }
- private void clearAllIndexes() throws DatabaseException
- {
- for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
- suffix.getAttrIndexMap().entrySet()) {
- AttributeType attributeType = mapEntry.getKey();
- AttributeIndex attributeIndex = mapEntry.getValue();
- clearAttributeIndexes(attributeIndex, attributeType, ec);
- }
- for(VLVIndex vlvIndex : suffix.getEntryContainer().getVLVIndexes()) {
- ec.clearDatabase(vlvIndex);
- }
- clearDN2IDIndexes(ec);
- if(ec.getDN2URI() != null)
- {
- clearDN2URI(ec);
- }
- }
-
- private void clearVLVIndex(String name, EntryContainer ec)
- throws DatabaseException
- {
- VLVIndex vlvIndex = ec.getVLVIndex(name);
- ec.clearDatabase(vlvIndex);
- vlvIndexes.add(vlvIndex);
- }
-
- private void clearDN2URI(EntryContainer ec) throws DatabaseException
- {
- ec.clearDatabase(ec.getDN2URI());
- dn2uri = ec.getDN2URI();
- }
-
- private void clearDN2IDIndexes(EntryContainer ec) throws DatabaseException
- {
- ec.clearDatabase(ec.getDN2ID());
- ec.clearDatabase(ec.getID2Children());
- ec.clearDatabase(ec.getID2Subtree());
- dn2id = ec.getDN2ID();
- }
-
- private void clearAttributeIndexes(AttributeIndex attrIndex,
- AttributeType attrType, EntryContainer ec)
- throws DatabaseException
- {
- Index partialAttrIndex;
- if(attrIndex.getSubstringIndex() != null)
- {
- partialAttrIndex = attrIndex.getSubstringIndex();
- ec.clearDatabase(partialAttrIndex);
- IndexKey indexKey =
- new IndexKey(attrType, ImportIndexType.SUBSTRING);
- indexMap.put(indexKey, partialAttrIndex);
- }
- if(attrIndex.getOrderingIndex() != null)
- {
- partialAttrIndex = attrIndex.getOrderingIndex();
- ec.clearDatabase(partialAttrIndex);
- IndexKey indexKey =
- new IndexKey(attrType, ImportIndexType.ORDERING);
- indexMap.put(indexKey, partialAttrIndex);
- }
- if(attrIndex.getEqualityIndex() != null)
- {
- partialAttrIndex = attrIndex.getEqualityIndex();
- ec.clearDatabase(partialAttrIndex);
- IndexKey indexKey =
- new IndexKey(attrType, ImportIndexType.EQUALITY);
- indexMap.put(indexKey, partialAttrIndex);
- }
- if(attrIndex.getPresenceIndex() != null)
- {
- partialAttrIndex = attrIndex.getPresenceIndex();
- ec.clearDatabase(partialAttrIndex);
- IndexKey indexKey =
- new IndexKey(attrType, ImportIndexType.PRESENCE);
- indexMap.put(indexKey, partialAttrIndex);
-
- }
- if(attrIndex.getApproximateIndex() != null)
- {
- partialAttrIndex = attrIndex.getApproximateIndex();
- ec.clearDatabase(partialAttrIndex);
- IndexKey indexKey =
- new IndexKey(attrType, ImportIndexType.APPROXIMATE);
- indexMap.put(indexKey, partialAttrIndex);
- }
- Map<String,Collection<Index>> extensibleMap =
- attrIndex.getExtensibleIndexes();
- if(!extensibleMap.isEmpty()) {
- Collection<Index> subIndexes =
- attrIndex.getExtensibleIndexes().get(
- EXTENSIBLE_INDEXER_ID_SUBSTRING);
- if(subIndexes != null) {
- for(Index subIndex : subIndexes) {
- ec.clearDatabase(subIndex);
- }
- extensibleIndexMap.put(new IndexKey(attrType,
- ImportIndexType.EX_SUBSTRING), subIndexes);
- }
- Collection<Index> sharedIndexes =
- attrIndex.getExtensibleIndexes().get(EXTENSIBLE_INDEXER_ID_SHARED);
- if(sharedIndexes !=null) {
- for(Index sharedIndex : sharedIndexes) {
- ec.clearDatabase(sharedIndex);
- }
- extensibleIndexMap.put(new IndexKey(attrType,
- ImportIndexType.EX_SHARED), sharedIndexes);
- }
- }
- }
+ private int getIndexCount() throws ConfigException, JebException
+ {
+ int indexCount;
+ if(!rebuildAll)
+ {
+ indexCount = getRebuildListIndexCount(cfg);
+ }
+ else
+ {
+ indexCount = getAllIndexesCount(cfg);
+ }
+ return indexCount;
+ }
- private
- void processEntry(Entry entry, EntryID entryID) throws DatabaseException,
- ConfigException, DirectoryException, JebException
- {
- if(dn2id != null)
- {
+ private int getAllIndexesCount(LocalDBBackendCfg cfg)
+ {
+ int indexCount = cfg.listLocalDBIndexes().length;
+ indexCount += cfg.listLocalDBVLVIndexes().length;
+ //Add four for: DN, id2subtree, id2children and dn2uri.
+ indexCount += 4;
+ return indexCount;
+ }
+
+
+ private int getRebuildListIndexCount(LocalDBBackendCfg cfg)
+ throws JebException, ConfigException
+ {
+ int indexCount = 0;
+ List<String> rebuildList = rebuildConfig.getRebuildList();
+ if(!rebuildList.isEmpty())
+ {
+ for (String index : rebuildList)
+ {
+ String lowerName = index.toLowerCase();
+ if (lowerName.equals("dn2id"))
+ {
+ indexCount += 3;
+ }
+ else if (lowerName.equals("dn2uri"))
+ {
+ indexCount++;
+ }
+ else if (lowerName.startsWith("vlv."))
+ {
+ if(lowerName.length() < 5)
+ {
+ Message msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName);
+ throw new JebException(msg);
+ }
+ indexCount++;
+ } else if(lowerName.equals("id2subtree") ||
+ lowerName.equals("id2children"))
+ {
+ Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+ throw new JebException(msg);
+ }
+ else
+ {
+ String[] attrIndexParts = lowerName.split("\\.");
+ if((attrIndexParts.length <= 0) || (attrIndexParts.length > 3))
+ {
+ Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+ throw new JebException(msg);
+ }
+ AttributeType attrType =
+ DirectoryServer.getAttributeType(attrIndexParts[0]);
+ if (attrType == null)
+ {
+ Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+ throw new JebException(msg);
+ }
+ if(attrIndexParts.length != 1)
+ {
+ if(attrIndexParts.length == 2)
+ {
+ if(attrIndexParts[1].equals("presence"))
+ {
+ indexCount++;
+ }
+ else if(attrIndexParts[1].equals("equality"))
+ {
+ indexCount++;
+ }
+ else if(attrIndexParts[1].equals("substring"))
+ {
+ indexCount++;
+ }
+ else if(attrIndexParts[1].equals("ordering"))
+ {
+ indexCount++;
+ }
+ else if(attrIndexParts[1].equals("approximate"))
+ {
+ indexCount++;
+ } else {
+ Message msg =
+ ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+ throw new JebException(msg);
+ }
+ }
+ else
+ {
+ boolean found = false;
+ String s = attrIndexParts[1] + "." + attrIndexParts[2];
+ for (String idx : cfg.listLocalDBIndexes())
+ {
+ LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
+ if (indexCfg.getIndexType().
+ contains(LocalDBIndexCfgDefn.IndexType.EXTENSIBLE))
+ {
+ Set<String> extensibleRules =
+ indexCfg.getIndexExtensibleMatchingRule();
+ for(String exRule : extensibleRules)
+ {
+ if(exRule.equalsIgnoreCase(s))
+ {
+ found = true;
+ break;
+ }
+ }
+ }
+ if(found)
+ {
+ break;
+ }
+ }
+ if(!found) {
+ Message msg =
+ ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
+ throw new JebException(msg);
+ }
+ indexCount++;
+ }
+ }
+ else
+ {
+ for (String idx : cfg.listLocalDBIndexes())
+ {
+ if(!idx.equalsIgnoreCase(index))
+ {
+ continue;
+ }
+ LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
+ if(indexCfg.getIndexType().
+ contains(LocalDBIndexCfgDefn.IndexType.EQUALITY))
+ {
+ indexCount++;
+ }
+ if(indexCfg.getIndexType().
+ contains(LocalDBIndexCfgDefn.IndexType.ORDERING))
+ {
+ indexCount++;
+ }
+ if(indexCfg.getIndexType().
+ contains(LocalDBIndexCfgDefn.IndexType.PRESENCE))
+ {
+ indexCount++;
+ }
+ if(indexCfg.getIndexType().
+ contains(LocalDBIndexCfgDefn.IndexType.SUBSTRING))
+ {
+ indexCount++;
+ }
+ if(indexCfg.getIndexType().
+ contains(LocalDBIndexCfgDefn.IndexType.APPROXIMATE))
+ {
+ indexCount++;
+ }
+ if (indexCfg.getIndexType().
+ contains(LocalDBIndexCfgDefn.IndexType.EXTENSIBLE))
+ {
+ Set<String> extensibleRules =
+ indexCfg.getIndexExtensibleMatchingRule();
+ boolean shared = false;
+ for(String exRule : extensibleRules)
+ {
+ if(exRule.endsWith(".sub"))
+ {
+ indexCount++;
+ }
+ else
+ {
+ if(!shared)
+ {
+ shared=true;
+ indexCount++;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return indexCount;
+ }
+
+
+ private void clearRebuildListIndexes() throws DatabaseException
+ {
+ List<String> rebuildList = rebuildConfig.getRebuildList();
+ if(!rebuildList.isEmpty())
+ {
+ for (String index : rebuildList)
+ {
+ String lowerName = index.toLowerCase();
+ if (lowerName.equals("dn2id"))
+ {
+ clearDN2IDIndexes();
+ }
+ else if (lowerName.equals("dn2uri"))
+ {
+ clearDN2URI();
+ }
+ else if (lowerName.startsWith("vlv."))
+ {
+ clearVLVIndex(lowerName.substring(4));
+ }
+ else
+ {
+ String[] attrIndexParts = lowerName.split("\\.");
+ AttributeType attrType =
+ DirectoryServer.getAttributeType(attrIndexParts[0]);
+ AttributeIndex attrIndex =
+ entryContainer.getAttributeIndex(attrType);
+ if(attrIndexParts.length != 1)
+ {
+ Index partialAttrIndex;
+ if(attrIndexParts[1].equals("presence"))
+ {
+ partialAttrIndex = attrIndex.getPresenceIndex();
+ int id = System.identityHashCode(partialAttrIndex);
+ idContainerMap.putIfAbsent(id, partialAttrIndex);
+ entryContainer.clearDatabase(partialAttrIndex);
+ IndexKey indexKey =
+ new IndexKey(attrType, ImportIndexType.PRESENCE,
+ partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+ }
+ else if(attrIndexParts[1].equals("equality"))
+ {
+ partialAttrIndex = attrIndex.getEqualityIndex();
+ int id = System.identityHashCode(partialAttrIndex);
+ idContainerMap.putIfAbsent(id, partialAttrIndex);
+ entryContainer.clearDatabase(partialAttrIndex);
+ IndexKey indexKey =
+ new IndexKey(attrType, ImportIndexType.EQUALITY,
+ partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+ }
+ else if(attrIndexParts[1].equals("substring"))
+ {
+ partialAttrIndex = attrIndex.getSubstringIndex();
+ int id = System.identityHashCode(partialAttrIndex);
+ idContainerMap.putIfAbsent(id, partialAttrIndex);
+ entryContainer.clearDatabase(partialAttrIndex);
+ IndexKey indexKey =
+ new IndexKey(attrType, ImportIndexType.SUBSTRING,
+ partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+ }
+ else if(attrIndexParts[1].equals("ordering"))
+ {
+ partialAttrIndex = attrIndex.getOrderingIndex();
+ int id = System.identityHashCode(partialAttrIndex);
+ idContainerMap.putIfAbsent(id, partialAttrIndex);
+ entryContainer.clearDatabase(partialAttrIndex);
+ IndexKey indexKey =
+ new IndexKey(attrType, ImportIndexType.ORDERING,
+ partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+ }
+ else if(attrIndexParts[1].equals("approximate"))
+ {
+ partialAttrIndex = attrIndex.getApproximateIndex();
+ int id = System.identityHashCode(partialAttrIndex);
+ idContainerMap.putIfAbsent(id, partialAttrIndex);
+ entryContainer.clearDatabase(partialAttrIndex);
+ IndexKey indexKey =
+ new IndexKey(attrType, ImportIndexType.APPROXIMATE,
+ partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+ }
+ else
+ {
+ String dbPart = "shared";
+ if(attrIndexParts[2].startsWith("sub"))
+ {
+ dbPart = "substring";
+ }
+ StringBuilder nameBldr = new StringBuilder();
+ nameBldr.append(entryContainer.getDatabasePrefix());
+ nameBldr.append("_");
+ nameBldr.append(attrIndexParts[0]);
+ nameBldr.append(".");
+ nameBldr.append(attrIndexParts[1]);
+ nameBldr.append(".");
+ nameBldr.append(dbPart);
+ String indexName = nameBldr.toString();
+ Map<String,Collection<Index>> extensibleMap =
+ attrIndex.getExtensibleIndexes();
+ if(!extensibleMap.isEmpty()) {
+ Collection<Index> subIndexes =
+ attrIndex.getExtensibleIndexes().get(
+ EXTENSIBLE_INDEXER_ID_SUBSTRING);
+ if(subIndexes != null) {
+ for(Index subIndex : subIndexes) {
+ String name = subIndex.getName();
+ if(name.equalsIgnoreCase(indexName))
+ {
+ entryContainer.clearDatabase(subIndex);
+ int id = System.identityHashCode(subIndex);
+ idContainerMap.putIfAbsent(id, subIndex);
+ Collection<Index> substring = new ArrayList<Index>();
+ substring.add(subIndex);
+ extensibleIndexMap.put(new IndexKey(attrType,
+ ImportIndexType.EX_SUBSTRING, 0),substring);
+ break;
+ }
+ }
+ Collection<Index> sharedIndexes =
+ attrIndex.getExtensibleIndexes().
+ get(EXTENSIBLE_INDEXER_ID_SHARED);
+ if(sharedIndexes !=null) {
+ for(Index sharedIndex : sharedIndexes) {
+ String name = sharedIndex.getName();
+ if(name.equalsIgnoreCase(indexName))
+ {
+ entryContainer.clearDatabase(sharedIndex);
+ Collection<Index> shared = new ArrayList<Index>();
+ int id = System.identityHashCode(sharedIndex);
+ idContainerMap.putIfAbsent(id, sharedIndex);
+ shared.add(sharedIndex);
+ extensibleIndexMap.put(new IndexKey(attrType,
+ ImportIndexType.EX_SHARED, 0), shared);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ clearAttributeIndexes(attrIndex, attrType);
+ }
+ }
+ }
+ }
+ }
+
+
+ private void clearAllIndexes() throws DatabaseException
+ {
+ for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
+ suffix.getAttrIndexMap().entrySet()) {
+ AttributeType attributeType = mapEntry.getKey();
+ AttributeIndex attributeIndex = mapEntry.getValue();
+ clearAttributeIndexes(attributeIndex, attributeType);
+ }
+ for(VLVIndex vlvIndex : suffix.getEntryContainer().getVLVIndexes()) {
+ entryContainer.clearDatabase(vlvIndex);
+ }
+ clearDN2IDIndexes();
+ if(entryContainer.getDN2URI() != null)
+ {
+ clearDN2URI();
+ }
+ }
+
+
+ private void clearVLVIndex(String name)
+ throws DatabaseException
+ {
+ VLVIndex vlvIndex = entryContainer.getVLVIndex(name);
+ entryContainer.clearDatabase(vlvIndex);
+ vlvIndexes.add(vlvIndex);
+ }
+
+
+ private void clearDN2URI() throws DatabaseException
+ {
+ entryContainer.clearDatabase(entryContainer.getDN2URI());
+ dn2uri = entryContainer.getDN2URI();
+ }
+
+
+ private void clearDN2IDIndexes() throws DatabaseException
+ {
+ entryContainer.clearDatabase(entryContainer.getDN2ID());
+ entryContainer.clearDatabase(entryContainer.getID2Children());
+ entryContainer.clearDatabase(entryContainer.getID2Subtree());
+ dn2id = entryContainer.getDN2ID();
+ }
+
+
+ private void clearAttributeIndexes(AttributeIndex attrIndex,
+ AttributeType attrType)
+ throws DatabaseException
+ {
+ Index partialAttrIndex;
+ if(attrIndex.getSubstringIndex() != null)
+ {
+ partialAttrIndex = attrIndex.getSubstringIndex();
+ int id = System.identityHashCode(partialAttrIndex);
+ idContainerMap.putIfAbsent(id, partialAttrIndex);
+ entryContainer.clearDatabase(partialAttrIndex);
+ IndexKey indexKey =
+ new IndexKey(attrType, ImportIndexType.SUBSTRING,
+ partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+ }
+ if(attrIndex.getOrderingIndex() != null)
+ {
+ partialAttrIndex = attrIndex.getOrderingIndex();
+ int id = System.identityHashCode(partialAttrIndex);
+ idContainerMap.putIfAbsent(id, partialAttrIndex);
+ entryContainer.clearDatabase(partialAttrIndex);
+ IndexKey indexKey =
+ new IndexKey(attrType, ImportIndexType.ORDERING,
+ partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+ }
+ if(attrIndex.getEqualityIndex() != null)
+ {
+ partialAttrIndex = attrIndex.getEqualityIndex();
+ int id = System.identityHashCode(partialAttrIndex);
+ idContainerMap.putIfAbsent(id, partialAttrIndex);
+ entryContainer.clearDatabase(partialAttrIndex);
+ IndexKey indexKey =
+ new IndexKey(attrType, ImportIndexType.EQUALITY,
+ partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+ }
+ if(attrIndex.getPresenceIndex() != null)
+ {
+ partialAttrIndex = attrIndex.getPresenceIndex();
+ int id = System.identityHashCode(partialAttrIndex);
+ idContainerMap.putIfAbsent(id, partialAttrIndex);
+ entryContainer.clearDatabase(partialAttrIndex);
+ IndexKey indexKey =
+ new IndexKey(attrType, ImportIndexType.PRESENCE,
+ partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+
+ }
+ if(attrIndex.getApproximateIndex() != null)
+ {
+ partialAttrIndex = attrIndex.getApproximateIndex();
+ int id = System.identityHashCode(partialAttrIndex);
+ idContainerMap.putIfAbsent(id, partialAttrIndex);
+ entryContainer.clearDatabase(partialAttrIndex);
+ IndexKey indexKey =
+ new IndexKey(attrType, ImportIndexType.APPROXIMATE,
+ partialAttrIndex.getIndexEntryLimit());
+ indexMap.put(indexKey, partialAttrIndex);
+ }
+ Map<String,Collection<Index>> extensibleMap =
+ attrIndex.getExtensibleIndexes();
+ if(!extensibleMap.isEmpty()) {
+ Collection<Index> subIndexes =
+ attrIndex.getExtensibleIndexes().get(
+ EXTENSIBLE_INDEXER_ID_SUBSTRING);
+ if(subIndexes != null) {
+ for(Index subIndex : subIndexes) {
+ entryContainer.clearDatabase(subIndex);
+ int id = System.identityHashCode(subIndex);
+ idContainerMap.putIfAbsent(id, subIndex);
+ }
+ extensibleIndexMap.put(new IndexKey(attrType,
+ ImportIndexType.EX_SUBSTRING, 0), subIndexes);
+ }
+ Collection<Index> sharedIndexes =
+ attrIndex.getExtensibleIndexes().
+ get(EXTENSIBLE_INDEXER_ID_SHARED);
+ if(sharedIndexes !=null) {
+ for(Index sharedIndex : sharedIndexes) {
+ entryContainer.clearDatabase(sharedIndex);
+ int id = System.identityHashCode(sharedIndex);
+ idContainerMap.putIfAbsent(id, sharedIndex);
+ }
+ extensibleIndexMap.put(new IndexKey(attrType,
+ ImportIndexType.EX_SHARED, 0), sharedIndexes);
+ }
+ }
+ }
+
+
+ private
+ void processEntry(Entry entry, EntryID entryID) throws DatabaseException,
+ ConfigException, DirectoryException, JebException,
+ InterruptedException
+ {
+ if(dn2id != null)
+ {
processDN2ID(suffix, entry.getDN(), entryID);
- }
- if(dn2uri != null)
- {
+ }
+ if(dn2uri != null)
+ {
processDN2URI(suffix, null, entry);
- }
- processIndexes(entry, entryID);
- processExtensibleIndexes(entry, entryID);
- processVLVIndexes(entry, entryID);
- }
-
- private void processVLVIndexes(Entry entry, EntryID entryID)
- throws DatabaseException, JebException, DirectoryException
- {
- for(VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes()) {
- Transaction transaction = null;
- vlvIdx.addEntry(transaction, entryID, entry);
- }
- }
+ }
+ processIndexes(entry, entryID);
+ processExtensibleIndexes(entry, entryID);
+ processVLVIndexes(entry, entryID);
+ }
- private void processExtensibleIndexes(Entry entry, EntryID entryID) throws
- DatabaseException, DirectoryException, JebException, ConfigException
- {
- for(Map.Entry<IndexKey, Collection<Index>> mapEntry :
- this.extensibleIndexMap.entrySet()) {
- IndexKey key = mapEntry.getKey();
- AttributeType attrType = key.getType();
- if(entry.hasAttribute(attrType)) {
- Collection<Index> indexes = mapEntry.getValue();
- for(Index index : indexes) {
+ private void processVLVIndexes(Entry entry, EntryID entryID)
+ throws DatabaseException, JebException, DirectoryException
+ {
+ for(VLVIndex vlvIdx : suffix.getEntryContainer().getVLVIndexes()) {
+ Transaction transaction = null;
+ vlvIdx.addEntry(transaction, entryID, entry);
+ }
+ }
+
+
+ private
+ void processExtensibleIndexes(Entry entry, EntryID entryID) throws
+ DatabaseException, DirectoryException, JebException,
+ ConfigException, InterruptedException
+ {
+ for(Map.Entry<IndexKey, Collection<Index>> mapEntry :
+ this.extensibleIndexMap.entrySet()) {
+ IndexKey key = mapEntry.getKey();
+ AttributeType attrType = key.getAttributeType();
+ if(entry.hasAttribute(attrType)) {
+ Collection<Index> indexes = mapEntry.getValue();
+ for(Index index : indexes) {
processAttribute(index, entry, entryID, key);
- }
- }
- }
- }
+ }
+ }
+ }
+ }
- private void
- processIndexes(Entry entry, EntryID entryID) throws
- DatabaseException, DirectoryException, JebException, ConfigException
- {
- for(Map.Entry<IndexKey, Index> mapEntry :
- indexMap.entrySet()) {
- IndexKey key = mapEntry.getKey();
- AttributeType attrType = key.getType();
- if(entry.hasAttribute(attrType)) {
- ImportIndexType indexType = key.getIndexType();
- Index index = mapEntry.getValue();
- if(indexType == ImportIndexType.SUBSTRING)
- {
- processAttribute(index, entry, entryID,
- new IndexKey(attrType, ImportIndexType.SUBSTRING));
- }
- else
- {
+ private void
+ processIndexes(Entry entry, EntryID entryID) throws
+ DatabaseException, DirectoryException, JebException,
+ ConfigException, InterruptedException
+ {
+ for(Map.Entry<IndexKey, Index> mapEntry :
+ indexMap.entrySet()) {
+ IndexKey key = mapEntry.getKey();
+ AttributeType attrType = key.getAttributeType();
+ if(entry.hasAttribute(attrType)) {
+ ImportIndexType indexType = key.getIndexType();
+ Index index = mapEntry.getValue();
+ if(indexType == ImportIndexType.SUBSTRING)
+ {
processAttribute(index, entry, entryID,
- new IndexKey(attrType, indexType));
- }
- }
- }
- }
+ new IndexKey(attrType, ImportIndexType.SUBSTRING,
+ index.getIndexEntryLimit()));
+ }
+ else
+ {
+ processAttribute(index, entry, entryID,
+ new IndexKey(attrType, indexType,
+ index.getIndexEntryLimit()));
+ }
+ }
+ }
+ }
- /**
- * Return the number of entries processed by the rebuild manager.
- *
- * @return The number of entries processed.
- */
- public long getEntriesProcess()
- {
- return this.entriesProcessed.get();
- }
- /**
- * Return the total number of entries to process by the rebuild manager.
- *
- * @return The total number for entries to process.
- */
- public long getTotEntries()
- {
- return this.totalEntries;
- }
+ /**
+ * Return the number of entries processed by the rebuild manager.
+ *
+ * @return The number of entries processed.
+ */
+ public long getEntriesProcess()
+ {
+ return this.entriesProcessed.get();
+ }
+
+
+ /**
+ * Return the total number of entries to process by the rebuild manager.
+ *
+ * @return The total number for entries to process.
+ */
+ public long getTotEntries()
+ {
+ return this.totalEntries;
+ }
}
/**
- * This class reports progress of the rebuild job at fixed intervals.
- */
- class RBFirstPhaseProgressTask extends TimerTask
- {
- /**
- * The number of records that had been processed at the time of the
- * previous progress report.
- */
- private long previousProcessed = 0;
-
- /**
- * The time in milliseconds of the previous progress report.
- */
- private long previousTime;
-
- /**
- * The environment statistics at the time of the previous report.
- */
- private EnvironmentStats prevEnvStats;
+ * This class reports progress of rebuild index processing at fixed
+ * intervals.
+ */
+ class RebuildFirstPhaseProgressTask extends TimerTask
+ {
+ /**
+ * The number of records that had been processed at the time of the
+ * previous progress report.
+ */
+ private long previousProcessed = 0;
/**
- * Create a new verify progress task.
- * @throws DatabaseException An error occurred while accessing the JE
- * database.
- */
- public RBFirstPhaseProgressTask() throws DatabaseException
- {
- previousTime = System.currentTimeMillis();
- prevEnvStats =
- rootContainer.getEnvironmentStats(new StatsConfig());
- }
+ * The time in milliseconds of the previous progress report.
+ */
+ private long previousTime;
- /**
- * The action to be performed by this timer task.
- */
- public void run()
- {
- long latestTime = System.currentTimeMillis();
- long deltaTime = latestTime - previousTime;
+ /**
+ * The environment statistics at the time of the previous report.
+ */
+ private EnvironmentStats prevEnvStats;
- if (deltaTime == 0)
- {
- return;
- }
- long currentRBProcessed = rebuildManager.getEntriesProcess();
- long deltaCount = (currentRBProcessed - previousProcessed);
- float rate = 1000f*deltaCount / deltaTime;
- float completed = 0;
- if(rebuildManager.getTotEntries() > 0)
- {
- completed = 100f*currentRBProcessed / rebuildManager.getTotEntries();
- }
- Message message = NOTE_JEB_REBUILD_PROGRESS_REPORT.get(
- completed, currentRBProcessed, rebuildManager.getTotEntries(), rate);
- logError(message);
- try
- {
- Runtime runtime = Runtime.getRuntime();
- long freeMemory = runtime.freeMemory() / MB;
- EnvironmentStats envStats =
- rootContainer.getEnvironmentStats(new StatsConfig());
- long nCacheMiss =
- envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
+ /**
+ * Create a new rebuild index progress task.
+ *
+ * @throws DatabaseException If an error occurred while accessing the JE
+ * database.
+ */
+ public RebuildFirstPhaseProgressTask() throws DatabaseException
+ {
+ previousTime = System.currentTimeMillis();
+ prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
+ }
- float cacheMissRate = 0;
- if (deltaCount > 0)
- {
- cacheMissRate = nCacheMiss/(float)deltaCount;
- }
- message = NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT.get(
- freeMemory, cacheMissRate);
- logError(message);
- prevEnvStats = envStats;
- }
- catch (DatabaseException e)
- {
+ /**
+ * The action to be performed by this timer task.
+ */
+ public void run()
+ {
+ long latestTime = System.currentTimeMillis();
+ long deltaTime = latestTime - previousTime;
- }
- previousProcessed = currentRBProcessed;
- previousTime = latestTime;
- }
- }
+ if (deltaTime == 0)
+ {
+ return;
+ }
+ long entriesProcessed = rebuildManager.getEntriesProcess();
+ long deltaCount = (entriesProcessed - previousProcessed);
+ float rate = 1000f*deltaCount / deltaTime;
+ float completed = 0;
+ if(rebuildManager.getTotEntries() > 0)
+ {
+ completed = 100f*entriesProcessed / rebuildManager.getTotEntries();
+ }
+ Message message = NOTE_JEB_REBUILD_PROGRESS_REPORT.get(completed,
+ entriesProcessed, rebuildManager.getTotEntries(), rate);
+ logError(message);
+ try
+ {
+ Runtime runtime = Runtime.getRuntime();
+ long freeMemory = runtime.freeMemory() / MB;
+ EnvironmentStats envStats =
+ rootContainer.getEnvironmentStats(new StatsConfig());
+ long nCacheMiss =
+ envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
+
+ float cacheMissRate = 0;
+ if (deltaCount > 0)
+ {
+ cacheMissRate = nCacheMiss/(float)deltaCount;
+ }
+ message = NOTE_JEB_REBUILD_CACHE_AND_MEMORY_REPORT.get(
+ freeMemory, cacheMissRate);
+ logError(message);
+ prevEnvStats = envStats;
+ }
+ catch (DatabaseException e)
+ {
+
+ }
+ previousProcessed = entriesProcessed;
+ previousTime = latestTime;
+ }
+ }
/**
- * This class reports progress of the import job at fixed intervals.
+ * This class reports progress of first phase of import processing at
+ * fixed intervals.
*/
private final class FirstPhaseProgressTask extends TimerTask
{
@@ -3591,7 +4015,7 @@
private long evictionEntryCount = 0;
- /**
+ /**
* Create a new import progress task.
*/
public FirstPhaseProgressTask()
@@ -3610,100 +4034,112 @@
- /**
- * The action to be performed by this timer task.
- */
- @Override
- public void run()
+ /**
+ * The action to be performed by this timer task.
+ */
+ @Override
+ public void run()
+ {
+ long latestCount = reader.getEntriesRead() + 0;
+ long deltaCount = (latestCount - previousCount);
+ long latestTime = System.currentTimeMillis();
+ long deltaTime = latestTime - previousTime;
+ Message message;
+ if (deltaTime == 0)
{
- long latestCount = reader.getEntriesRead() + 0;
- long deltaCount = (latestCount - previousCount);
- long latestTime = System.currentTimeMillis();
- long deltaTime = latestTime - previousTime;
- Message message;
- if (deltaTime == 0)
- {
- return;
- }
- long entriesRead = reader.getEntriesRead();
- long entriesIgnored = reader.getEntriesIgnored();
- long entriesRejected = reader.getEntriesRejected();
- float rate = 1000f * deltaCount / deltaTime;
- message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get(entriesRead,
- entriesIgnored, entriesRejected, 0, rate);
- logError(message);
- try
- {
- Runtime runTime = Runtime.getRuntime();
- long freeMemory = runTime.freeMemory() / MB;
- EnvironmentStats environmentStats =
- rootContainer.getEnvironmentStats(new StatsConfig());
- long nCacheMiss = environmentStats.getNCacheMiss() -
- previousStats.getNCacheMiss();
-
- float cacheMissRate = 0;
- if (deltaCount > 0)
- {
- cacheMissRate = nCacheMiss / (float) deltaCount;
- }
- message =
- NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory,
- cacheMissRate);
- logError(message);
- long evictPasses = environmentStats.getNEvictPasses();
- long evictNodes = environmentStats.getNNodesExplicitlyEvicted();
- long evictBinsStrip = environmentStats.getNBINsStripped();
- long cleanerRuns = environmentStats.getNCleanerRuns();
- long cleanerDeletions = environmentStats.getNCleanerDeletions();
- long cleanerEntriesRead =
- environmentStats.getNCleanerEntriesRead();
- long cleanerINCleaned = environmentStats.getNINsCleaned();
- long checkPoints = environmentStats.getNCheckpoints();
- if (evictPasses != 0)
- {
- if (!evicting)
- {
- evicting = true;
- evictionEntryCount = reader.getEntriesRead();
- message =
- NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED
- .get(evictionEntryCount);
- logError(message);
- }
- message =
- NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(
- evictPasses, evictNodes, evictBinsStrip);
- logError(message);
- }
- if (cleanerRuns != 0)
- {
- message =
- NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
- cleanerDeletions, cleanerEntriesRead,
- cleanerINCleaned);
- logError(message);
- }
- if (checkPoints > 1)
- {
- message =
- NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
- logError(message);
- }
- previousStats = environmentStats;
- }
- catch (DatabaseException e)
- {
- // Unlikely to happen and not critical.
- }
- previousCount = latestCount;
- previousTime = latestTime;
+ return;
}
+ long entriesRead = reader.getEntriesRead();
+ long entriesIgnored = reader.getEntriesIgnored();
+ long entriesRejected = reader.getEntriesRejected();
+ float rate = 1000f * deltaCount / deltaTime;
+ message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get(entriesRead,
+ entriesIgnored, entriesRejected, 0, rate);
+ logError(message);
+ try
+ {
+ Runtime runTime = Runtime.getRuntime();
+ long freeMemory = runTime.freeMemory()/MB;
+ EnvironmentStats environmentStats;
+
+ //If first phase skip DN validation is specified use the root container
+ //stats, else use the temporary environment stats.
+ if(skipDNValidation)
+ {
+ environmentStats =
+ rootContainer.getEnvironmentStats(new StatsConfig());
+ }
+ else
+ {
+ environmentStats = tmpEnv.getEnvironmentStats(new StatsConfig());
+ }
+ long nCacheMiss = environmentStats.getNCacheMiss() -
+ previousStats.getNCacheMiss();
+
+ float cacheMissRate = 0;
+ if (deltaCount > 0)
+ {
+ cacheMissRate = nCacheMiss / (float) deltaCount;
+ }
+ message =
+ NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory,
+ cacheMissRate);
+ logError(message);
+ long evictPasses = environmentStats.getNEvictPasses();
+ long evictNodes = environmentStats.getNNodesExplicitlyEvicted();
+ long evictBinsStrip = environmentStats.getNBINsStripped();
+ long cleanerRuns = environmentStats.getNCleanerRuns();
+ long cleanerDeletions = environmentStats.getNCleanerDeletions();
+ long cleanerEntriesRead =
+ environmentStats.getNCleanerEntriesRead();
+ long cleanerINCleaned = environmentStats.getNINsCleaned();
+ long checkPoints = environmentStats.getNCheckpoints();
+ if (evictPasses != 0)
+ {
+ if (!evicting)
+ {
+ evicting = true;
+ evictionEntryCount = reader.getEntriesRead();
+ message =
+ NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED
+ .get(evictionEntryCount);
+ logError(message);
+ }
+ message =
+ NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(
+ evictPasses, evictNodes, evictBinsStrip);
+ logError(message);
+ }
+ if (cleanerRuns != 0)
+ {
+ message =
+ NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
+ cleanerDeletions, cleanerEntriesRead,
+ cleanerINCleaned);
+ logError(message);
+ }
+ if (checkPoints > 1)
+ {
+ message =
+ NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
+ logError(message);
+ }
+ previousStats = environmentStats;
+ }
+ catch (DatabaseException e)
+ {
+ // Unlikely to happen and not critical.
+ }
+ previousCount = latestCount;
+ previousTime = latestTime;
+ }
}
/**
- * This class reports progress of the import job at fixed intervals.
+ * This class reports progress of the second phase of import processing at
+ * fixed intervals.
*/
class SecondPhaseProgressTask extends TimerTask
{
@@ -3726,19 +4162,16 @@
// Determines if eviction has been detected.
private boolean evicting = false;
- private final List<IndexManager> indexMgrList;
private long latestCount;
/**
* Create a new import progress task.
- * @param indexMgrList List of index managers.
+ *
* @param latestCount The latest count of entries processed in phase one.
*/
- public SecondPhaseProgressTask (List<IndexManager> indexMgrList,
- long latestCount)
+ public SecondPhaseProgressTask (long latestCount)
{
previousTime = System.currentTimeMillis();
- this.indexMgrList = indexMgrList;
this.latestCount = latestCount;
try
{
@@ -3826,6 +4259,12 @@
previousCount = latestCount;
previousTime = latestTime;
+ //Do DN index managers first.
+ for(IndexManager indexMgrDN : DNIndexMgrList)
+ {
+ indexMgrDN.printStats(deltaTime);
+ }
+ //Do non-DN index managers.
for(IndexManager indexMgr : indexMgrList)
{
indexMgr.printStats(deltaTime);
@@ -3836,6 +4275,8 @@
/**
* A class to hold information about the entry determined by the LDIF reader.
+ * Mainly the suffix the entry belongs under and the ID assigned to it by the
+ * reader.
*
*/
public class EntryInformation
@@ -3941,56 +4382,62 @@
* process multiple suffix index elements into a single queue and/or maps
* based on both attribute type and index type
* (ie., cn.equality, sn.equality,...).
- *
- * It tries to perform some optimization if the index is a sub-string index.
*/
public class IndexKey {
- private final AttributeType type;
+ private final AttributeType attributeType;
private final ImportIndexType indexType;
+ private final int entryLimit;
/**
- * Create index key instance using the specified attribute type, index type.
+ * Create index key instance using the specified attribute type, index type
+ * and index entry limit.
*
- * @param type The attribute type.
+ * @param attributeType The attribute type.
* @param indexType The index type.
+ * @param entryLimit The entry limit for the index.
*/
- IndexKey(AttributeType type, ImportIndexType indexType)
+ IndexKey(AttributeType attributeType, ImportIndexType indexType,
+ int entryLimit)
{
- this.type = type;
+ this.attributeType = attributeType;
this.indexType = indexType;
+ this.entryLimit = entryLimit;
}
/**
* An equals method that uses both the attribute type and the index type.
+ * Only returns {@code true} if the attribute type and index type are
+ * equal.
*
* @param obj the object to compare.
- * @return <CODE>true</CODE> if the objects are equal.
+ * @return {@code true} if the objects are equal, or {@code false} if they
+ * are not.
*/
public boolean equals(Object obj)
{
- boolean returnCode = false;
if (obj instanceof IndexKey) {
IndexKey oKey = (IndexKey) obj;
- if(type.equals(oKey.getType()) &&
- indexType.equals(oKey.getIndexType()))
+ if(attributeType.equals(oKey.getAttributeType()) &&
+ indexType.equals(oKey.getIndexType()))
{
- returnCode = true;
+ return true;
}
}
- return returnCode;
+ return false;
}
/**
* A hash code method that adds the hash codes of the attribute type and
* index type and returns that value.
*
- * @return The combined hash values.
+ * @return The combined hash values of attribute type hash code and the
+ * index type hash code.
*/
public int hashCode()
{
- return type.hashCode() + indexType.hashCode();
+ return attributeType.hashCode() + indexType.hashCode();
}
/**
@@ -3998,13 +4445,14 @@
*
* @return The attribute type.
*/
- public AttributeType getType()
+ public AttributeType getAttributeType()
{
- return type;
+ return attributeType;
}
/**
* Return the index type.
+ *
* @return The index type.
*/
public ImportIndexType getIndexType()
@@ -4015,14 +4463,286 @@
/**
* Return the index key name, which is the attribute type primary name,
* a period, and the index type name. Used for building file names and
- * output.
+ * progress output.
*
* @return The index key name.
*/
public String getName()
{
- return type.getPrimaryName() + "." +
+ return attributeType.getPrimaryName() + "." +
StaticUtils.toLowerCase(indexType.name());
}
+
+ /**
+ * Return the entry limit associated with the index.
+ *
+ * @return The entry limit.
+ */
+ public int getEntryLimit()
+ {
+ return entryLimit;
+ }
+ }
+
+
+ /**
+ * The temporary enviroment will be shared when multiple suffixes are being
+ * processed. This interface is used by those suffix instance to do parental
+ * checking of the DN cache.
+ */
+ public static interface DNCache {
+
+ /**
+ * Returns {@code true} if the specified DN is contained in the DN cache,
+ * or {@code false} otherwise.
+ *
+ * @param dn The DN to check the presence of.
+ * @return {@code true} if the cache contains the DN, or {@code false} if it
+ * is not.
+ * @throws DatabaseException If an error occurs reading the database.
+ */
+ public boolean contains(DN dn) throws DatabaseException;
+ }
+
+ /**
+ * Temporary environment used to check DN's when DN validation is performed
+ * during phase one processing. It is deleted after phase one processing.
+ */
+
+ public final class TmpEnv implements DNCache
+ {
+ private String envPath;
+ private Environment environment;
+ private static final String DB_NAME = "dn_cache";
+ private Database dnCache;
+
+ /**
+ * Create a temporary DB environment and database to be used as a cache of
+ * DNs when DN validation is performed in phase one processing.
+ *
+ * @param envPath The file path to create the enviroment under.
+ * @throws DatabaseException If an error occurs either creating the
+ * environment or the DN database.
+ */
+ public TmpEnv(File envPath) throws DatabaseException
+ {
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
+ envConfig.setReadOnly(false);
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(false);
+ envConfig.setConfigParam(EnvironmentConfig.ENV_IS_LOCKING, "true");
+ envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CHECKPOINTER, "false");
+ envConfig.setConfigParam(EnvironmentConfig.EVICTOR_LRU_ONLY, "false");
+ envConfig.setConfigParam(EnvironmentConfig.EVICTOR_NODES_PER_SCAN, "128");
+ envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY,
+ Long.toString(tmpEnvCacheSize));
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setAllowCreate(true);
+ dbConfig.setTransactional(false);
+ dbConfig.setTemporary(true);
+ environment = new Environment(envPath, envConfig);
+ dnCache = environment.openDatabase(null, DB_NAME, dbConfig);
+ this.envPath = envPath.getPath();
+ }
+
+ private static final long FNV_INIT = 0xcbf29ce484222325L;
+ private static final long FNV_PRIME = 0x100000001b3L;
+
+ //Hash the DN bytes. Uses the FNV-1a hash.
+ private byte[] hashCode(byte[] b)
+ {
+ long hash = FNV_INIT;
+ for (int i = 0; i < b.length; i++)
+ {
+ hash ^= b[i];
+ hash *= FNV_PRIME;
+ }
+ return JebFormat.entryIDToDatabase(hash);
+ }
+
+ /**
+ * Shutdown the temporary environment.
+ * @throws JebException If error occurs.
+ */
+ public void shutdown() throws JebException
+ {
+ dnCache.close();
+ environment.close();
+ EnvManager.removeFiles(envPath);
+ }
+
+ /**
+ * Insert the specified DN into the DN cache. It will return {@code true} if
+ * the DN does not already exist in the cache and was inserted, or
+ * {@code false} if the DN exists already in the cache.
+ *
+ * @param dn The DN to insert in the cache.
+ * @param val A database entry to use in the insert.
+ * @param key A database entry to use in the insert.
+ * @return {@code true} if the DN was inserted in the cache, or
+ * {@code false} if the DN exists in the cache already and could
+ * not be inserted.
+ * @throws JebException If an error occurs accessing the database.
+ */
+ public boolean insert(DN dn, DatabaseEntry val, DatabaseEntry key)
+ throws JebException
+ {
+ byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
+ int len = PackedInteger.getWriteIntLength(dnBytes.length);
+ byte[] dataBytes = new byte[dnBytes.length + len];
+ int pos = PackedInteger.writeInt(dataBytes, 0, dnBytes.length);
+ System.arraycopy(dnBytes, 0, dataBytes, pos, dnBytes.length);
+ val.setData(dataBytes);
+ key.setData(hashCode(dnBytes));
+ return insert(key, val, dnBytes);
+ }
+
+ private boolean insert(DatabaseEntry key, DatabaseEntry val, byte[] dnBytes)
+ throws JebException
+ {
+ boolean inserted = true;
+ Cursor cursor = null;
+ try
+ {
+ cursor = dnCache.openCursor(null, CursorConfig.DEFAULT);
+ OperationStatus status = cursor.putNoOverwrite(key, val);
+ if(status == OperationStatus.KEYEXIST)
+ {
+ DatabaseEntry dns = new DatabaseEntry();
+ inserted = false;
+ status = cursor.getSearchKey(key, dns, LockMode.RMW);
+ if(status == OperationStatus.NOTFOUND)
+ {
+ Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
+ "Search DN cache failed.");
+ throw new JebException(message);
+ }
+ if(!isDNMatched(dns, dnBytes))
+ {
+ addDN(dns, cursor, dnBytes);
+ inserted = true;
+ }
+ }
+ }
+ finally
+ {
+ if(cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ return inserted;
+ }
+
+ //Add the DN to the DNs as because of a hash collision.
+ private void addDN(DatabaseEntry val, Cursor cursor,
+ byte[] dnBytes) throws JebException
+ {
+ int pos = 0;
+ byte[] bytes = val.getData();
+ int pLen = PackedInteger.getWriteIntLength(dnBytes.length);
+ int totLen = bytes.length + (pLen + dnBytes.length);
+ byte[] newRec = new byte[totLen];
+ System.arraycopy(bytes, 0, newRec, 0, bytes.length);
+ pos = bytes.length;
+ pos = PackedInteger.writeInt(newRec, pos, dnBytes.length);
+ System.arraycopy(dnBytes, 0, newRec, pos, dnBytes.length);
+ DatabaseEntry newVal = new DatabaseEntry(newRec);
+ OperationStatus status = cursor.putCurrent(newVal);
+ if(status != OperationStatus.SUCCESS)
+ {
+ Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
+ "Add of DN to DN cache failed.");
+ throw new JebException(message);
+ }
+ }
+
+ //Return true if the specified DN is in the DNs saved as a result of hash
+ //collisions.
+ private boolean isDNMatched(DatabaseEntry dns, byte[] dnBytes)
+ {
+ int pos = 0, len = 0;
+ byte[] bytes = dns.getData();
+ while(pos < dns.getData().length)
+ {
+ int pLen = PackedInteger.getReadIntLength(bytes, pos);
+ len = PackedInteger.readInt(bytes, pos);
+ if(dnComparator.compare(bytes, pos + pLen, len, dnBytes,
+ dnBytes.length) == 0)
+ {
+ return true;
+ }
+ pos += pLen + len;
+ }
+ return false;
+ }
+
+ /**
+ * Check if the specified DN is contained in the temporary DN cache.
+ *
+ * @param dn A DN check for.
+ * @return {@code true if the specified DN is in the temporary DN cache, or
+ * {@code false) if it is not.
+ */
+ public boolean contains(DN dn)
+ {
+ boolean dnExists = false;
+ Cursor cursor = null;
+ DatabaseEntry key = new DatabaseEntry();
+ byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
+ key.setData(hashCode(dnBytes));
+ try {
+ cursor = dnCache.openCursor(null, CursorConfig.DEFAULT);
+ DatabaseEntry dns = new DatabaseEntry();
+ OperationStatus status =
+ cursor.getSearchKey(key, dns, LockMode.DEFAULT);
+ if(status == OperationStatus.SUCCESS)
+ {
+ dnExists = isDNMatched(dns, dnBytes);
+ }
+ }
+ finally
+ {
+ if(cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ return dnExists;
+ }
+
+ /**
+ * Return temporary environment stats.
+ *
+ * @param statsConfig A stats configuration instance.
+ *
+ * @return Environment stats.
+ * @throws DatabaseException If an error occurs retrieving the stats.
+ */
+ public EnvironmentStats getEnvironmentStats(StatsConfig statsConfig)
+ throws DatabaseException
+ {
+ return environment.getStats(statsConfig);
+ }
+ }
+
+
+ /**
+ * Uncaught exception handler. Try and catch any uncaught exceptions, log
+ * them and print a stack trace.
+ */
+ public
+ class DefaultExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+ /**
+ * {@inheritDoc}
+ */
+ public void uncaughtException(Thread t, Throwable e) {
+ Message message = ERR_JEB_IMPORT_UNCAUGHT_EXCEPTION.get(e.getMessage());
+ logError(message);
+ e.printStackTrace();
+ System.exit(1);
+ }
}
}
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java
index 8ee2d6f..0b87116 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java
@@ -31,34 +31,40 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+
import org.opends.server.backends.jeb.*;
+import com.sleepycat.util.PackedInteger;
/**
- * This class is used to hold the keys read from the LDIF file during
- * phase 1. The keys are sorted and written to an temporary index file.
+ * This class represents a index buffer used to store the keys and entry IDs
+ * processed from the LDIF file during phase one of an import, or rebuild index
+ * process. Each key and ID is stored in a record in the buffer.
+ *
+ * The records in the buffer are eventually sorted, based on the key, when the
+ * maximum size of the buffer is reached and no more records will fit into the
+ * buffer. The buffer is the scheduled to be flushed to an indexes scratch file
+ * and then re-cycled by the import, or rebuild-index process.
+ *
+ * The records are packed as much as possible, to optimize the buffer space.
+ * This class is not thread safe.
*
*/
public class IndexBuffer implements Comparable<IndexBuffer> {
- /**
+ /**
* Enumeration used when sorting a buffer.
*/
private enum CompareOp {
LT, GT, LE, GE, EQ
}
- private static final int REC_OVERHEAD = 20;
+ //The record over head.
+ private static final int REC_OVERHEAD = 5;
- /**
- * Insert constant -- used when the key should be inserted in a DB.
- */
- public static final int INSERT = 0x0000;
-
- /**
- * Delete constant -- used when the key should be deleted from a DB.
- */
- public static final int DELETE = 0x0001;
+ //Buffer records are either insert records or delete records.
+ private static final byte DEL = 0, INS = 1;
//The size of a buffer.
private final int size;
@@ -67,12 +73,11 @@
private final byte buffer[];
//id is used to break a tie (keys equal) when the buffers are being merged
- //when writing.
+ //for writing to the index scratch file.
private long id;
- //Temporary buffers.
+ //Temporary buffer used to store integer values.
private final byte[] intBytes = new byte[4];
- private final byte[] idBytes = new byte[8];
/*
keyOffset - offSet where next key is written
@@ -82,13 +87,28 @@
private int keyOffset =0, recordOffset=0, bytesLeft = 0;
//keys - number of keys in the buffer
- //position - used to iterate over the buffer when writing to a file.
+ //position - used to iterate over the buffer when writing to a scratch file.
private int keys = 0, position = 0;
- //Various things needed to process a buffer.
+ //The comparator to use sort the keys.
private ComparatorBuffer<byte[]> comparator;
+
+ //This is used to make sure that an instance of this class is put on the
+ //correct scratch file writer work queue for processing.
private Importer.IndexKey indexKey;
+ //Initial capacity of re-usable buffer used in key compares.
+ private static final int CAP = 32;
+
+ //This buffer is reused during key compares. It's main purpose is to keep
+ //memory footprint as small as possible.
+ private ByteBuffer keyBuffer = ByteBuffer.allocate(CAP);
+
+ //Set to {@code true} if the buffer should not be recycled. Used when the
+ //importer/rebuild index process is doing phase one cleanup and flushing
+ //buffers not completed.
+ private boolean discard = false;
+
private IndexBuffer(int size) {
this.size = size;
@@ -97,6 +117,7 @@
this.recordOffset = size - 1;
}
+
/**
* Create an instance of a IndexBuffer using the specified size.
*
@@ -108,8 +129,9 @@
return new IndexBuffer(size);
}
+
/**
- * Reset an IndexBuffer so it can be re-used.
+ * Reset an IndexBuffer so it can be re-cycled.
*/
public void reset() {
bytesLeft = size;
@@ -121,6 +143,7 @@
indexKey = null;
}
+
/**
* Set the ID of a buffer to the specified value.
*
@@ -131,17 +154,6 @@
this.id = id;
}
- /**
- * Determines if a buffer is a poison buffer. A poison buffer is used to
- * shutdown work queues when the LDIF reader is completed. A poison buffer
- * has a 0 size.
- *
- * @return <CODE>True</CODE> if a buffer is a poison buffer.
- */
- public boolean isPoison()
- {
- return (size == 0);
- }
/**
* Return the ID of a buffer.
@@ -153,21 +165,59 @@
return this.id;
}
+
/**
- * Determine is there enough space available to write the specified byte array
- * in the buffer.
+ * Determines if a buffer is a poison buffer. A poison buffer is used to
+ * shutdown work queues when import/rebuild index phase one is completed.
+ * A poison buffer has a 0 size.
*
- * @param keyBytes The byte array to check space against.
- * @return <CODE>True</CODE> if there is space to write the byte array in a
- * buffer.
+ * @return {@code true} if a buffer is a poison buffer, or {@code false}
+ * otherwise.
*/
- public boolean isSpaceAvailable(byte[] keyBytes) {
- return (keyBytes.length + REC_OVERHEAD + 4) < bytesLeft;
+ public boolean isPoison()
+ {
+ return (size == 0);
}
+
+ /**
+ * Determines of a buffer should be re-cycled.
+ *
+ * @return {@code true} if buffer should be recycled, or {@code false} if it
+ * should not.
+ */
+ public boolean isDiscard()
+ {
+ return discard;
+ }
+
+
+ /**
+ * Set the discard flag to {@code true}.
+ */
+ public void setDiscard()
+ {
+ discard = true;
+ }
+
+
+ /**
+ * Returns {@code true} if there is enough space available to write the
+ * specified byte array in the buffer. It returns {@code false} otherwise.
+ *
+ * @param kBytes The byte array to check space against.
+ * @param id The id value to check space against.
+ * @return {@code true} if there is space to write the byte array in a
+ * buffer, or {@code false} otherwise.
+ */
+ public boolean isSpaceAvailable(byte[] kBytes, long id) {
+ return (getRecordSize(kBytes.length, id) + 4) < bytesLeft;
+ }
+
+
/**
* Set the comparator to be used in the buffer processing to the specified
- * value.
+ * comparator.
*
* @param comparator The comparator to set the buffer's comparator to.
*/
@@ -187,8 +237,9 @@
return position;
}
+
/**
- * Set a buffer's position value to the specified value.
+ * Set a buffer's position value to the specified position.
*
* @param position The value to set the position to.
*/
@@ -197,6 +248,7 @@
this.position = position;
}
+
/**
* Sort the buffer.
*/
@@ -204,6 +256,7 @@
sort(0, keys);
}
+
/**
* Add the specified key byte array and EntryID to the buffer.
*
@@ -212,65 +265,75 @@
* @param indexID The index ID the record belongs.
* @param insert <CODE>True</CODE> if key is an insert, false otherwise.
*/
+
public void add(byte[] keyBytes, EntryID IDEntry, int indexID,
boolean insert) {
- byte[] idBytes = JebFormat.entryIDToDatabase(IDEntry.longValue());
- recordOffset -= keyBytes.length + REC_OVERHEAD;
+ recordOffset = addRecord(keyBytes, IDEntry.longValue(), indexID, insert);
System.arraycopy(getIntBytes(recordOffset), 0, buffer, keyOffset, 4);
keyOffset += 4;
- System.arraycopy(getIntBytes(indexID), 0, buffer, recordOffset, 4);
- System.arraycopy(getIntBytes(keyBytes.length), 0, buffer,
- (recordOffset + 4), 4);
- System.arraycopy(keyBytes, 0, buffer, (recordOffset + 8), keyBytes.length);
- if(insert)
- {
- System.arraycopy(getIntBytes(INSERT), 0, buffer,
- (recordOffset + 8 + keyBytes.length), 4);
- }
- else
- {
- System.arraycopy(getIntBytes(DELETE), 0, buffer,
- (recordOffset + 8 + keyBytes.length), 4);
- }
- System.arraycopy(idBytes, 0, buffer,
- (recordOffset + 12 + keyBytes.length), 8);
bytesLeft = recordOffset - keyOffset;
keys++;
}
- /**
- * Return the byte array representing the entry ID
- * at the specified index value.
- *
- * @param index The index value to retrieve.
- * @return The byte array at the index value.
- */
- public byte[] getIDBytes(int index)
+ private int addRecord(byte[]key, long id, int indexID, boolean insert)
{
- int recOffset = getIntValue(index * 4);
- int keyLen = getIntValue(recOffset + 4);
- System.arraycopy(buffer, recOffset + 12 + keyLen, idBytes, 0, 8);
- return idBytes;
+ byte opType = INS;
+ int retOffset = recordOffset - getRecordSize(key.length, id);
+ int offSet = retOffset;
+ if(!insert)
+ {
+ opType = DEL;
+ }
+ buffer[offSet++] = opType;
+ System.arraycopy(getIntBytes(indexID), 0, buffer, offSet, 4);
+ offSet += 4;
+ offSet = PackedInteger.writeLong(buffer, offSet, id);
+ offSet = PackedInteger.writeInt(buffer, offSet, key.length);
+ System.arraycopy(key, 0, buffer, offSet, key.length);
+ return retOffset;
+ }
+
+
+ private int getRecordSize(int keyLen, long id)
+ {
+ return PackedInteger.getWriteIntLength(keyLen) + keyLen +
+ PackedInteger.getWriteLongLength(id) + REC_OVERHEAD;
}
/**
- * Return if the record specified by the index is an insert or not.
+ * Write record at specified index to the specified output stream. Used when
+ * when writing the index scratch files.
+
+ * @param stream The stream to write the record at the index to.
+ * @param index The index of the record to write.
+ */
+ public void writeID(ByteArrayOutputStream stream, int index)
+ {
+ int offSet = getIntegerValue(index * 4);
+ int len = PackedInteger.getReadLongLength(buffer, offSet + REC_OVERHEAD);
+ stream.write(buffer, offSet + REC_OVERHEAD, len);
+ }
+
+
+ /**
+ * Return {@code true} if the record specified by the index is an insert
+ * record, or {@code false} if it a delete record.
+ *
* @param index The index of the record.
*
- * @return <CODE>True</CODE> if the record is an insert, false otherwise.
+ * @return {@code true} if the record is an insert record, or {@code false}
+ * if it is a delete record.
*/
public boolean isInsert(int index)
{
boolean returnCode = true;
- int recOffset = getIntValue(index * 4);
- int keyLen = getIntValue(recOffset + 4);
- if(getIntValue(recOffset + 8 + keyLen) == DELETE)
+ int recOffset = getIntegerValue(index * 4);
+ if(buffer[recOffset] == DEL)
{
returnCode = false;
}
-
return returnCode;
}
@@ -282,105 +345,39 @@
*/
public int getKeySize()
{
- int recOffset = getIntValue(position * 4);
- return getIntValue(recOffset + 4);
- }
-
-
- private int getIndexID(int x)
- {
- return getIntValue(getIntValue(x * 4));
+ int offSet = getIntegerValue(position * 4) + REC_OVERHEAD;
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ return PackedInteger.readInt(buffer, offSet);
}
/**
- * Return index id associated with the current position's record.
- *
- * @return The index id.
- */
- public int getIndexID()
- {
- return getIntValue(getIntValue(position * 4));
- }
-
- /**
- * Write a record to the specified data output stream using the specified
- * parameters.
- *
- * @param key The key byte array.
- * @param indexID The index ID.
- * @param insertByteStream The byte stream containing insert ids.
- * @param deleteByteStream The byte stream containing delete ids.
- * @param dataStream The data output stream to write to.
- * @return The record size written.
- * @throws IOException If an I/O error occurs writing the record.
- */
- public static int writeRecord(byte[] key, int indexID,
- ByteArrayOutputStream insertByteStream,
- ByteArrayOutputStream deleteByteStream,
- DataOutputStream dataStream)
- throws IOException
- {
- dataStream.writeInt(indexID);
- dataStream.writeInt(key.length);
- dataStream.write(key);
- dataStream.writeInt(insertByteStream.size());
- if(insertByteStream.size() > 0)
- {
- insertByteStream.writeTo(dataStream);
- }
- dataStream.writeInt(deleteByteStream.size());
- if(deleteByteStream.size() > 0)
- {
- deleteByteStream.writeTo(dataStream);
- }
- return (key.length + insertByteStream.size() +
- deleteByteStream.size() + (REC_OVERHEAD - 4));
- }
-
- /**
- * Write a record to specified output stream using the record pointed to by
- * the current position and the specified byte stream of ids.
- *
- * @param insertByteStream The byte stream containing the ids.
- * @param deleteByteStream The byte stream containing the ids.
- * @param dataStream The data output stream to write to.
- * @return The record size written.
- *
- * @throws IOException If an I/O error occurs writing the record.
- */
- public int writeRecord(ByteArrayOutputStream insertByteStream,
- ByteArrayOutputStream deleteByteStream,
- DataOutputStream dataStream) throws IOException
- {
- int recOffset = getIntValue(position * 4);
- int indexID = getIntValue(recOffset);
- int keyLen = getIntValue(recOffset + 4);
- dataStream.writeInt(indexID);
- dataStream.writeInt(keyLen);
- dataStream.write(buffer, recOffset + 8, keyLen);
- dataStream.writeInt(insertByteStream.size());
- if(insertByteStream.size() > 0)
- {
- insertByteStream.writeTo(dataStream);
- }
- dataStream.writeInt(deleteByteStream.size());
- if(deleteByteStream.size() > 0)
- {
- deleteByteStream.writeTo(dataStream);
- }
- return (getKeySize() + insertByteStream.size() +
- deleteByteStream.size() + (REC_OVERHEAD - 4));
- }
-
- /**
- * Return the key value part of a record specified by the buffer position.
+ * Return the key value part of a record indicated by the current buffer
+ * position.
*
* @return byte array containing the key value.
*/
- public byte[] getKeyBytes()
+ public byte[] getKey()
{
- return getKeyBytes(position);
+ return getKey(position);
+ }
+
+ //Used to minimized memory usage when comparing keys.
+ private ByteBuffer getKeyBuf(int x)
+ {
+ keyBuffer.clear();
+ int offSet = getIntegerValue(x * 4) + REC_OVERHEAD;
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ int keyLen = PackedInteger.readInt(buffer, offSet);
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ //Re-allocate if the key is bigger than the capacity.
+ if(keyLen > keyBuffer.capacity())
+ {
+ keyBuffer = ByteBuffer.allocate(keyLen);
+ }
+ keyBuffer.put(buffer, offSet, keyLen);
+ keyBuffer.flip();
+ return keyBuffer;
}
@@ -390,98 +387,126 @@
* @param x index to return.
* @return byte array containing the key value.
*/
- private byte[] getKeyBytes(int x)
+ private byte[] getKey(int x)
{
- int recOffset = getIntValue(x * 4);
- int keyLen = getIntValue(recOffset + 4);
- byte[] keyBytes = new byte[keyLen];
- System.arraycopy(buffer, recOffset + 8, keyBytes, 0, keyLen);
- return keyBytes;
+ int offSet = getIntegerValue(x * 4) + REC_OVERHEAD;
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ int keyLen = PackedInteger.readInt(buffer, offSet);
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ byte[] key = new byte[keyLen];
+ System.arraycopy(buffer, offSet, key, 0, keyLen);
+ return key;
}
+
+ private int getIndexID(int x)
+ {
+ return getIntegerValue(getIntegerValue(x * 4) + 1);
+ }
+
+
+ /**
+ * Return index id associated with the current position's record.
+ *
+ * @return The index id.
+ */
+ public int getIndexID()
+ {
+ return getIntegerValue(getIntegerValue(position * 4) + 1);
+ }
+
+
private boolean is(int x, int y, CompareOp op)
{
- int xRecOffset = getIntValue(x * 4);
- int xIndexID = getIntValue(xRecOffset);
- int xKeyLen = getIntValue(xRecOffset + 4);
- int xKey = xRecOffset + 8;
- int yRecOffset = getIntValue(y * 4);
- int yIndexID = getIntValue(yRecOffset);
- int yKeyLen = getIntValue(yRecOffset + 4);
- int yKey = yRecOffset + 8;
+ int xoffSet = getIntegerValue(x * 4);
+ int xIndexID = getIntegerValue(xoffSet + 1);
+ xoffSet += REC_OVERHEAD;
+ xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet);
+ int xKeyLen = PackedInteger.readInt(buffer, xoffSet);
+ int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet;
+ int yoffSet = getIntegerValue(y * 4);
+ int yIndexID = getIntegerValue(yoffSet + 1);
+ yoffSet += REC_OVERHEAD;
+ yoffSet += PackedInteger.getReadIntLength(buffer, yoffSet);
+ int yKeyLen = PackedInteger.readInt(buffer, yoffSet);
+ int yKey = PackedInteger.getReadIntLength(buffer, yoffSet) + yoffSet;
return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen,
xIndexID, yKey, yKeyLen, yIndexID), op);
}
- private boolean is(int x, byte[] yBytes, CompareOp op, int yIndexID)
+ private boolean is(int x, byte[] yKey, CompareOp op, int yIndexID)
{
- int xRecOffset = getIntValue(x * 4);
- int xIndexID = getIntValue(xRecOffset);
- int xKeyLen = getIntValue(xRecOffset + 4);
- int xKey = xRecOffset + 8;
+ int xoffSet = getIntegerValue(x * 4);
+ int xIndexID = getIntegerValue(xoffSet + 1);
+ xoffSet += REC_OVERHEAD;
+ xoffSet += PackedInteger.getReadIntLength(buffer, xoffSet);
+ int xKeyLen = PackedInteger.readInt(buffer, xoffSet);
+ int xKey = PackedInteger.getReadIntLength(buffer, xoffSet) + xoffSet;
return evaluateReturnCode(comparator.compare(buffer, xKey, xKeyLen,
- xIndexID, yBytes, yIndexID), op);
+ xIndexID, yKey, yKey.length, yIndexID), op);
}
/**
* Compare the byte array at the current position with the specified one and
- * using the specified index id.
+ * using the specified index id. It will return {@code true} if the byte
+ * array at the current possition is equal to the specified byte array as
+ * determined by the comparator and the index ID is is equal. It will
+ * return {@code false} otherwise.
*
* @param b The byte array to compare.
* @param bIndexID The index key.
* @return <CODE>True</CODE> if the byte arrays are equal.
*/
- public boolean compare(byte[] b, int bIndexID)
+ public boolean compare(byte[]b, int bIndexID)
{
- boolean returnCode = false;
- int xRecOffset = getIntValue(position * 4);
- int xIndexID = getIntValue(xRecOffset);
- int xKeyLen = getIntValue(xRecOffset + 4);
- if( comparator.compare(buffer, xRecOffset + 8, xKeyLen, b) == 0)
+ int offset = getIntegerValue(position * 4);
+ int indexID = getIntegerValue(offset + 1);
+ offset += REC_OVERHEAD;
+ offset += PackedInteger.getReadIntLength(buffer, offset);
+ int keyLen = PackedInteger.readInt(buffer, offset);
+ int key = PackedInteger.getReadIntLength(buffer, offset) + offset;
+ if( comparator.compare(buffer, key, keyLen, b, b.length) == 0)
{
- if(xIndexID == bIndexID)
+ if(indexID == bIndexID)
{
- returnCode = true;
+ return true;
}
}
- return returnCode;
+ return false;
}
- /**
- * Compare the byte array at the current position with the byte array at the
- * specified index.
- *
- * @param i The index pointing to the byte array to compare.
- * @return <CODE>True</CODE> if the byte arrays are equal.
- */
- public boolean compare(int i)
- {
- return is(i, position, CompareOp.EQ);
- }
/**
- * Compare current IndexBuffer to the one in the specified argument. The key
- * at the value of position in both buffers are used in the compare.
+ * Compare current IndexBuffer to the specified index buffer using both the
+ * comparator and index ID of both buffers.
+ *
+ * The key at the value of position in both buffers are used in the compare.
*
* @param b The IndexBuffer to compare to.
* @return 0 if the buffers are equal, -1 if the current buffer is less
* than the specified buffer, or 1 if it is greater.
*/
- public int compareTo(IndexBuffer b) {
- byte[] key2 = b.getKeyBytes(b.getPosition());
- int xRecOffset = getIntValue(position * 4);
- int xIndexID = getIntValue(xRecOffset);
- int xLen = getIntValue(xRecOffset + 4);
- int returnCode = comparator.compare(buffer, xRecOffset + 8, xLen, key2);
+ public int compareTo(IndexBuffer b)
+ {
+ ByteBuffer keyBuf = b.getKeyBuf(b.position);
+ int offset = getIntegerValue(position * 4);
+ int indexID = getIntegerValue(offset + 1);
+ offset += REC_OVERHEAD;
+ offset += PackedInteger.getReadIntLength(buffer, offset);
+ int keyLen = PackedInteger.readInt(buffer, offset);
+ int key = PackedInteger.getReadIntLength(buffer, offset) + offset;
+ int returnCode = comparator.compare(buffer, key, keyLen, keyBuf.array(),
+ keyBuf.limit());
if(returnCode == 0)
{
int bIndexID = b.getIndexID();
- if(xIndexID == bIndexID)
+ if(indexID == bIndexID)
{
long otherBufferID = b.getBufferID();
- //Used in Remove.
+ //This is tested in a tree set remove when a buffer is removed from
+ //the tree set.
if(this.id == otherBufferID)
{
returnCode = 0;
@@ -495,7 +520,7 @@
returnCode = 1;
}
}
- else if(xIndexID < bIndexID)
+ else if(indexID < bIndexID)
{
returnCode = -1;
}
@@ -509,7 +534,39 @@
/**
- * Return the number of keys in an index buffer.
+ * Write a record to specified output stream using the record pointed to by
+ * the current position and the specified byte stream of ids.
+ *
+ * @param dataStream The data output stream to write to.
+ *
+ * @throws IOException If an I/O error occurs writing the record.
+ */
+ public void writeKey(DataOutputStream dataStream) throws IOException
+ {
+ int offSet = getIntegerValue(position * 4) + REC_OVERHEAD;
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ int keyLen = PackedInteger.readInt(buffer, offSet);
+ offSet += PackedInteger.getReadIntLength(buffer, offSet);
+ dataStream.write(buffer, offSet, keyLen);
+ }
+
+
+ /**
+ * Compare the byte array at the current position with the byte array at the
+ * specified index.
+ *
+ * @param i The index pointing to the byte array to compare.
+ * @return {@code true} if the byte arrays are equal, or {@code false}
+ * otherwise.
+ */
+ public boolean compare(int i)
+ {
+ return is(i, position, CompareOp.EQ);
+ }
+
+
+ /**
+ * Return the current number of keys.
*
* @return The number of keys currently in an index buffer.
*/
@@ -520,25 +577,29 @@
/**
- * Return if the buffer has more data. Used when iterating over the
- * buffer examining keys.
+ * Return {@code true} if the buffer has more data to process, or
+ * {@code false} otherwise. Used when iterating over the buffer writing the
+ * scratch index file.
*
- * @return <CODE>True</CODE> if the buffer has more data to process.
+ * @return {@code true} if a buffer has more data to process, or
+ * {@code false} otherwise.
*/
public boolean hasMoreData()
{
return (position + 1) < keys;
}
+
/**
- * Move to the next record in the buffer. Used when iterating over the
- * buffer examining keys.
+ * Advance the position pointer to the next record in the buffer. Used when
+ * iterating over the buffer examining keys.
*/
public void getNextRecord()
{
position++;
}
+
private byte[] getIntBytes(int val)
{
for (int i = 3; i >= 0; i--) {
@@ -548,7 +609,8 @@
return intBytes;
}
- private int getIntValue(int index)
+
+ private int getIntegerValue(int index)
{
int answer = 0;
for (int i = 0; i < 4; i++) {
@@ -560,7 +622,6 @@
}
-
private int med3(int a, int b, int c)
{
return (is(a,b, CompareOp.LT) ?
@@ -591,13 +652,13 @@
m = med3(l, m, n);
}
- byte[] mKey = getKeyBytes(m);
+ byte[] mKey = getKey(m);
int mIndexID = getIndexID(m);
int a = off, b = a, c = off + len - 1, d = c;
while(true)
{
- while (b <= c && is(b, mKey, CompareOp.LE, mIndexID))
+ while ((b <= c) && is(b, mKey, CompareOp.LE, mIndexID))
{
if (is(b, mKey, CompareOp.EQ, mIndexID))
swap(a++, b);
@@ -632,18 +693,20 @@
private void swap(int a, int b)
{
int aOffset = a * 4;
- int bOffset = b * 4;
- int bVal = getIntValue(bOffset);
- System.arraycopy(buffer, aOffset, buffer, bOffset, 4);
- System.arraycopy(getIntBytes(bVal), 0, buffer, aOffset, 4);
+ int bOffset = b * 4;
+ int bVal = getIntegerValue(bOffset);
+ System.arraycopy(buffer, aOffset, buffer, bOffset, 4);
+ System.arraycopy(getIntBytes(bVal), 0, buffer, aOffset, 4);
}
+
private void vectorSwap(int a, int b, int n)
{
for (int i=0; i<n; i++, a++, b++)
swap(a, b);
}
+
private boolean evaluateReturnCode(int rc, CompareOp op)
{
boolean returnCode = false;
@@ -667,6 +730,7 @@
return returnCode;
}
+
/**
* Interface that defines two methods used to compare keys used in this
* class. The Comparator interface cannot be used in this class, so this
@@ -675,6 +739,8 @@
* @param <T> object to use in the compare
*/
public static interface ComparatorBuffer<T> {
+
+
/**
* Compare two offsets in an object, usually a byte array.
*
@@ -690,7 +756,9 @@
*/
int compare(T o, int offset, int length, int indexID, int otherOffset,
int otherLength, int otherIndexID);
- /**
+
+
+ /**
* Compare an offset in an object with the specified object.
*
* @param o The first object.
@@ -698,13 +766,15 @@
* @param length The first length.
* @param indexID The first index id.
* @param other The second object.
+ * @param otherLength The length of the second object.
* @param otherIndexID The second index id.
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second
* object.
*/
int compare(T o, int offset, int length, int indexID, T other,
- int otherIndexID);
+ int otherLength, int otherIndexID);
+
/**
* Compare an offset in an object with the specified object.
@@ -713,23 +783,29 @@
* @param offset The first offset.
* @param length The first length.
* @param other The second object.
+ * @param otherLen The length of the second object.
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second
* object.
*/
- int compare(T o, int offset, int length, T other);
+ int compare(T o, int offset, int length, T other,
+ int otherLen);
}
+
/**
* Implementation of ComparatorBuffer interface. Used to compare keys when
- * they are DNs.
+ * they are DN index is being processed.
*/
public static
class DNComparator implements IndexBuffer.ComparatorBuffer<byte[]>
{
+
/**
* Compare two offsets in an byte array using the DN compare algorithm.
+ * The specified index ID is used in the comparision if the byte arrays
+ * are equal.
*
* @param b The byte array.
* @param offset The first offset.
@@ -745,7 +821,7 @@
int otherOffset, int otherLength, int otherIndexID)
{
for (int i = length - 1, j = otherLength - 1;
- i >= 0 && j >= 0; i--, j--) {
+ i >= 0 && j >= 0; i--, j--) {
if (b[offset + i] > b[otherOffset + j])
{
return 1;
@@ -755,6 +831,8 @@
return -1;
}
}
+ //The arrays are equal, make sure they are in the same index since
+ //multiple suffixes might have the same key.
if(length == otherLength)
{
if(indexID == otherIndexID)
@@ -780,6 +858,65 @@
}
}
+
+ /**
+ * Compare an offset in an byte array with the specified byte array,
+ * using the DN compare algorithm. The specified index ID is used in the
+ * comparision if the byte arrays are equal.
+ *
+ * @param b The byte array.
+ * @param offset The first offset.
+ * @param length The first length.
+ * @param indexID The first index id.
+ * @param other The second byte array to compare to.
+ * @param otherLength The second object's length.
+ * @param otherIndexID The second index id.
+ * @return a negative integer, zero, or a positive integer as the first
+ * offset value is less than, equal to, or greater than the second
+ * byte array.
+ */
+ public int compare(byte[] b, int offset, int length, int indexID,
+ byte[]other, int otherLength, int otherIndexID)
+ {
+ for (int i = length - 1, j = otherLength - 1;
+ i >= 0 && j >= 0; i--, j--) {
+ if (b[offset + i] > other[j])
+ {
+ return 1;
+ }
+ else if (b[offset + i] < other[j])
+ {
+ return -1;
+ }
+ }
+ //The arrays are equal, make sure they are in the same index since
+ //multiple suffixes might have the same key.
+ if(length == otherLength)
+ {
+ if(indexID == otherIndexID)
+ {
+ return 0;
+ }
+ else if(indexID > otherIndexID)
+ {
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+ if(length > otherLength)
+ {
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+
+
/**
* Compare an offset in an byte array with the specified byte array,
* using the DN compare algorithm.
@@ -787,19 +924,17 @@
* @param b The byte array.
* @param offset The first offset.
* @param length The first length.
- * @param indexID The first index id.
* @param other The second byte array to compare to.
- * @param otherIndexID The second index id.
+ * @param otherLength The second object's length.
* @return a negative integer, zero, or a positive integer as the first
- * offset value is less than, equal to, or greater than the second
- * byte array.
+ * offset value is less than, equal to, or greater than the
+ * second byte array.
*/
- public int compare(byte[] b, int offset, int length, int indexID,
- byte[]other, int otherIndexID)
+ public int compare(byte[] b, int offset, int length, byte[] other,
+ int otherLength)
{
- int otherLength = other.length;
for (int i = length - 1, j = otherLength - 1;
- i >= 0 && j >= 0; i--, j--) {
+ i >= 0 && j >= 0; i--, j--) {
if (b[offset + i] > other[j])
{
return 1;
@@ -811,18 +946,7 @@
}
if(length == otherLength)
{
- if(indexID == otherIndexID)
- {
- return 0;
- }
- else if(indexID > otherIndexID)
- {
- return 1;
- }
- else
- {
- return -1;
- }
+ return 0;
}
if(length > otherLength)
{
@@ -833,58 +957,21 @@
return -1;
}
}
-
- /**
- * Compare an offset in an byte array with the specified byte array,
- * using the DN compare algorithm.
- *
- * @param b The byte array.
- * @param offset The first offset.
- * @param length The first length.
- * @param other The second byte array to compare to.
- * @return a negative integer, zero, or a positive integer as the first
- * offset value is less than, equal to, or greater than the
- * second byte array.
- */
- public int compare(byte[] b, int offset, int length, byte[] other)
- {
- int otherLength = other.length;
- for (int i = length - 1, j = otherLength - 1;
- i >= 0 && j >= 0; i--, j--) {
- if (b[offset + i] > other[j])
- {
- return 1;
- }
- else if (b[offset + i] < other[j])
- {
- return -1;
- }
- }
- if(length == otherLength)
- {
- return 0;
- }
- if(length > otherLength)
- {
- return 1;
- }
- else
- {
- return -1;
- }
- }
}
-/**
+
+ /**
* Implementation of ComparatorBuffer interface. Used to compare keys when
- * they are regular indexes.
+ * they are non-DN indexes.
*/
public static
class IndexComparator implements IndexBuffer.ComparatorBuffer<byte[]>
{
- /**
+
+ /**
* Compare two offsets in an byte array using the index compare
- * algorithm.
+ * algorithm. The specified index ID is used in the comparision if the
+ * byte arrays are equal.
*
* @param b The byte array.
* @param offset The first offset.
@@ -910,6 +997,8 @@
return -1;
}
}
+ //The arrays are equal, make sure they are in the same index since
+ //multiple suffixes might have the same key.
if(length == otherLength)
{
if(indexID == otherIndexID)
@@ -935,24 +1024,26 @@
}
}
+
/**
* Compare an offset in an byte array with the specified byte array,
- * using the DN compare algorithm.
+ * using the DN compare algorithm. The specified index ID is used in the
+ * comparision if the byte arrays are equal.
*
* @param b The byte array.
* @param offset The first offset.
* @param length The first length.
* @param indexID The first index id.
* @param other The second byte array to compare to.
+ * @param otherLength The second byte array's length.
* @param otherIndexID The second index id.
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second
* byte array.
*/
public int compare(byte[] b, int offset, int length, int indexID,
- byte[] other, int otherIndexID)
+ byte[] other, int otherLength, int otherIndexID)
{
- int otherLength = other.length;
for(int i = 0; i < length && i < otherLength; i++)
{
if(b[offset + i] > other[i])
@@ -964,6 +1055,8 @@
return -1;
}
}
+ //The arrays are equal, make sure they are in the same index since
+ //multiple suffixes might have the same key.
if(length == otherLength)
{
if(indexID == otherIndexID)
@@ -989,7 +1082,8 @@
}
}
- /**
+
+ /**
* Compare an offset in an byte array with the specified byte array,
* using the DN compare algorithm.
*
@@ -997,13 +1091,14 @@
* @param offset The first offset.
* @param length The first length.
* @param other The second byte array to compare to.
+ * @param otherLength The second byte array's length.
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second
* byte array.
*/
- public int compare(byte[] b, int offset, int length, byte[] other)
+ public int compare(byte[] b, int offset, int length, byte[] other,
+ int otherLength)
{
- int otherLength = other.length;
for(int i = 0; i < length && i < otherLength; i++)
{
if(b[offset + i] > other[i])
@@ -1030,6 +1125,7 @@
}
}
+
/**
* Set the index key associated with an index buffer.
*
@@ -1040,6 +1136,7 @@
this.indexKey = indexKey;
}
+
/**
* Return the index key of an index buffer.
* @return The index buffer's index key.
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java
index 1186916..5268ce9 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java
@@ -30,39 +30,38 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-
import org.opends.server.backends.jeb.*;
import org.opends.server.config.ConfigException;
import org.opends.server.types.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.backends.jeb.importLDIF.Importer.*;
import org.opends.messages.Message;
-import org.opends.messages.Category;
-import org.opends.messages.Severity;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockMode;
-
+import static org.opends.messages.JebMessages.*;
/**
- * The class represents a suffix. OpenDS backends can have multiple suffixes.
+ * The class represents a suffix that is to be loaded during an import, or
+ * rebuild index process. Multiple instances of this class can be instantiated
+ * during and import to support multiple suffixes in a backend. A rebuild
+ * index has only one of these instances.
*/
public class Suffix
{
- private final List<DN> includeBranches;
- private final List<DN> excludeBranches;
+ private final List<DN> includeBranches, excludeBranches;
private final DN baseDN;
private final EntryContainer srcEntryContainer;
private EntryContainer entryContainer;
private final Object synchObject = new Object();
- private static final int PARENT_ID_MAP_SIZE = 4096;
+ private static final int PARENT_ID_SET_SIZE = 16 * KB;
private ConcurrentHashMap<DN, CountDownLatch> pendingMap =
- new ConcurrentHashMap<DN, CountDownLatch>();
- private HashMap<DN,EntryID> parentIDMap =
- new HashMap<DN,EntryID>(PARENT_ID_MAP_SIZE);
-
+ new ConcurrentHashMap<DN, CountDownLatch>();
+ private Set<DN> parentSet = new HashSet<DN>(PARENT_ID_SET_SIZE);
private DN parentDN;
private ArrayList<EntryID> IDs;
+
private
Suffix(EntryContainer entryContainer, EntryContainer srcEntryContainer,
List<DN> includeBranches, List<DN> excludeBranches)
@@ -89,6 +88,7 @@
}
}
+
/**
* Creates a suffix instance using the specified parameters.
*
@@ -104,13 +104,14 @@
public static Suffix
createSuffixContext(EntryContainer entryContainer,
EntryContainer srcEntryContainer,
- List<DN> includeBranches, List<DN> excludeBranches)
- throws InitializationException, ConfigException
+ List<DN> includeBranches, List<DN> excludeBranches)
+ throws InitializationException, ConfigException
{
return new Suffix(entryContainer, srcEntryContainer,
- includeBranches, excludeBranches);
+ includeBranches, excludeBranches);
}
+
/**
* Returns the DN2ID instance pertaining to a suffix instance.
*
@@ -122,11 +123,11 @@
}
- /**
+ /**
* Returns the ID2Entry instance pertaining to a suffix instance.
*
* @return A ID2Entry instance that can be used to manipulate the ID2Entry
- * database.
+ * database.
*/
public ID2Entry getID2Entry()
{
@@ -134,11 +135,11 @@
}
- /**
+ /**
* Returns the DN2URI instance pertaining to a suffix instance.
*
* @return A DN2URI instance that can be used to manipulate the DN2URI
- * database.
+ * database.
*/
public DN2URI getDN2URI()
{
@@ -146,7 +147,7 @@
}
- /**
+ /**
* Returns the entry container pertaining to a suffix instance.
*
* @return The entry container used to create a suffix instance.
@@ -170,11 +171,11 @@
/**
- * Check if the parent DN is in the pending map.
+ * Make sure the specified parent DN is not in the pending map.
*
* @param parentDN The DN of the parent.
*/
- private void checkPending(DN parentDN) throws InterruptedException
+ private void assureNotPending(DN parentDN) throws InterruptedException
{
CountDownLatch l;
if((l=pendingMap.get(parentDN)) != null)
@@ -183,6 +184,7 @@
}
}
+
/**
* Add specified DN to the pending map.
*
@@ -193,6 +195,7 @@
pendingMap.putIfAbsent(dn, new CountDownLatch(1));
}
+
/**
* Remove the specified DN from the pending map, it may not exist if the
* entries are being migrated so just return.
@@ -210,46 +213,65 @@
/**
- * Return the entry ID related to the specified entry DN. First the instance's
- * cache of parent IDs is checked, if it isn't found then the DN2ID is
- * searched.
+ * Return {@code true} if the specified dn is contained in the parent set, or
+ * in the specifed DN cache. This would indicate that the parent has already
+ * been processesd. It returns {@code false} otherwise.
*
- * @param parentDN The DN to get the id for.
- * @return The entry ID related to the parent DN, or null if the id wasn't
- * found in the cache or dn2id database.
+ * It will optionally check the dn2id database for the dn if the specifed
+ * cleared backend boolean is {@code true}.
*
- * @throws DatabaseException If an error occurred search the dn2id database.
+ * @param dn The DN to check for.
+ * @param dnCache The importer DN cache.
+ * @param clearedBackend Set to {@code true} if the import process cleared the
+ * backend before processing.
+ * @return {@code true} if the dn is contained in the parent ID, or
+ * {@code false} otherwise.
+ *
+ * @throws DatabaseException If an error occurred searching the DN cache, or
+ * dn2id database.
+ * @throws InterruptedException If an error occurred processing the pending
+ * map.
*/
public
- EntryID getParentID(DN parentDN) throws DatabaseException {
- EntryID parentID;
+ boolean isParentProcessed(DN dn, DNCache dnCache, boolean clearedBackend)
+ throws DatabaseException, InterruptedException {
synchronized(synchObject) {
- parentID = parentIDMap.get(parentDN);
- if (parentID != null) {
- return parentID;
+ if(parentSet.contains(dn))
+ {
+ return true;
}
}
+ //The DN was not in the parent set. Make sure it isn't pending.
try {
- checkPending(parentDN);
- } catch (Exception e) {
- Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR,
- "Exception thrown in parentID check");
+ assureNotPending(dn);
+ } catch (InterruptedException e) {
+ Message message = ERR_JEB_IMPORT_LDIF_PENDING_ERR.get(e.getMessage());
logError(message);
- return null;
+ throw e;
}
- parentID = entryContainer.getDN2ID().get(null, parentDN, LockMode.DEFAULT);
- //If the parent is in dn2id, add it to the cache.
- if (parentID != null) {
+ //Check the DN cache.
+ boolean parentThere = dnCache.contains(dn);
+ //If the parent isn't found in the DN cache, then check the dn2id database
+ //for the DN only if the backend wasn't cleared.
+ if(!parentThere && !clearedBackend)
+ {
+ if(getDN2ID().get(null, dn, LockMode.DEFAULT) != null)
+ {
+ parentThere = true;
+ }
+ }
+ //Add the DN to the parent set if needed.
+ if (parentThere) {
synchronized(synchObject) {
- if (parentIDMap.size() >= PARENT_ID_MAP_SIZE) {
- Iterator<DN> iterator = parentIDMap.keySet().iterator();
+ if (parentSet.size() >= PARENT_ID_SET_SIZE) {
+ Iterator<DN> iterator = parentSet.iterator();
iterator.next();
iterator.remove();
}
- parentIDMap.put(parentDN, parentID);
+ parentSet.add(dn);
}
}
- return parentID;
+ return parentThere;
}
@@ -307,6 +329,7 @@
}
}
+
/**
* Get the parent DN of the last entry added to a suffix.
*
@@ -328,6 +351,7 @@
this.parentDN = parentDN;
}
+
/**
* Get the entry ID list of the last entry added to a suffix.
*
@@ -349,6 +373,7 @@
this.IDs = IDs;
}
+
/**
* Return a src entry container.
*
@@ -359,6 +384,7 @@
return this.srcEntryContainer;
}
+
/**
* Return include branches.
*
@@ -369,6 +395,7 @@
return this.includeBranches;
}
+
/**
* Return exclude branches.
*
@@ -379,6 +406,7 @@
return this.excludeBranches;
}
+
/**
* Return base DN.
*
--
Gitblit v1.10.0