From 24b408fb4260a1609b4cf16de07fce4f7af23ca4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 06 Nov 2014 11:59:16 +0000
Subject: [PATCH] OPENDJ-1625 setup fail when importing automatically-generated data
---
opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java | 595 +++++++++++++++++++++-------------------------------------
1 files changed, 218 insertions(+), 377 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index c469e76..7eb7414 100644
--- a/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -186,7 +186,7 @@
* Futures used to indicate when the index file writers are done flushing
* their work queues and have exited. End of phase one.
*/
- private final List<Future<?>> scratchFileWriterFutures;
+ private final List<Future<Void>> scratchFileWriterFutures;
/**
* List of index file writer tasks. Used to signal stopScratchFileWriters to
@@ -258,7 +258,7 @@
this.clearedBackend = false;
this.scratchFileWriterList =
new ArrayList<ScratchFileWriterTask>(indexCount);
- this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
+ this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>();
File parentDir;
if (rebuildConfig.getTmpDirectory() != null)
@@ -321,7 +321,7 @@
&& (importConfiguration.clearBackend() || localDBBackendCfg.getBaseDN().size() <= 1);
this.scratchFileWriterList =
new ArrayList<ScratchFileWriterTask>(indexCount);
- this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>();
+ this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>();
File parentDir;
if (importConfiguration.getTmpDirectory() != null)
{
@@ -363,8 +363,7 @@
SortedSet<IndexType> types = index.getIndexType();
if (types.contains(IndexType.EXTENSIBLE))
{
- indexes +=
- types.size() - 1 + index.getIndexExtensibleMatchingRule().size();
+ indexes += types.size() - 1 + index.getIndexExtensibleMatchingRule().size();
}
else
{
@@ -409,16 +408,14 @@
* @throws InitializationException
* If a problem occurs during calculation.
*/
- private void initializeDBEnv(EnvironmentConfig envConfig)
- throws InitializationException
+ private void initializeDBEnv(EnvironmentConfig envConfig) throws InitializationException
{
// Calculate amount of usable memory. This will need to take into account
// various fudge factors, including the number of IO buffers used by the
// scratch writers (1 per index).
calculateAvailableMemory();
- final long usableMemory =
- availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
+ final long usableMemory = availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
// We need caching when doing DN validation or rebuilding indexes.
if (!skipDNValidation || rebuildManager != null)
@@ -471,8 +468,7 @@
}
}
- final long phaseOneBufferMemory =
- usableMemory - dbCacheSize - tmpEnvCacheSize;
+ final long phaseOneBufferMemory = usableMemory - dbCacheSize - tmpEnvCacheSize;
final int oldThreadCount = threadCount;
if (indexCount != 0) // Avoid / by zero
{
@@ -481,8 +477,7 @@
phaseOneBufferCount = 2 * indexCount * threadCount;
// Scratch writers allocate 4 buffers per index as well.
- final int totalPhaseOneBufferCount =
- phaseOneBufferCount + (4 * indexCount);
+ final int totalPhaseOneBufferCount = phaseOneBufferCount + (4 * indexCount);
long longBufferSize = phaseOneBufferMemory / totalPhaseOneBufferCount;
// We need (2 * bufferSize) to fit in an int for the insertByteStream
// and deleteByteStream constructors.
@@ -492,12 +487,10 @@
{
if (!skipDNValidation)
{
- // The buffers are big enough: the memory is best used for the DN2ID
- // temp DB.
+ // The buffers are big enough: the memory is best used for the DN2ID temp DB
bufferSize = MAX_BUFFER_SIZE;
- final long extraMemory =
- phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
+ final long extraMemory = phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
if (!clearedBackend)
{
dbCacheSize += extraMemory / 2;
@@ -524,8 +517,7 @@
else
{
// Not enough memory.
- final long minimumPhaseOneBufferMemory =
- totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
+ final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
LocalizableMessage message =
ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory,
minimumPhaseOneBufferMemory + dbCacheSize + tmpEnvCacheSize);
@@ -560,6 +552,8 @@
{
// Online import/rebuild.
Runtime runTime = Runtime.getRuntime();
+ // call twice gc to ensure finalizers are called
+ // and young to old gen references are properly gc'd
runTime.gc();
runTime.gc();
final long usedMemory = runTime.totalMemory() - runTime.freeMemory();
@@ -573,14 +567,11 @@
}
else
{
- configuredMemory =
- backendConfiguration.getDBCachePercent()
- * Runtime.getRuntime().maxMemory() / 100;
+ configuredMemory = backendConfiguration.getDBCachePercent() * Runtime.getRuntime().maxMemory() / 100;
}
// Round up to minimum of 16MB (e.g. unit tests only use 2% cache).
- totalAvailableMemory =
- Math.max(Math.min(usableMemory, configuredMemory), 16 * MB);
+ totalAvailableMemory = Math.max(Math.min(usableMemory, configuredMemory), 16 * MB);
}
else
{
@@ -633,66 +624,43 @@
*/
private void generateIndexID(Suffix suffix)
{
- for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
- .getAttrIndexMap().entrySet())
+ for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
{
AttributeIndex attributeIndex = mapEntry.getValue();
- DatabaseContainer container;
- if ((container = attributeIndex.getEqualityIndex()) != null)
- {
- int id = System.identityHashCode(container);
- idContainerMap.putIfAbsent(id, container);
- }
- if ((container = attributeIndex.getPresenceIndex()) != null)
- {
- int id = System.identityHashCode(container);
- idContainerMap.putIfAbsent(id, container);
- }
- if ((container = attributeIndex.getSubstringIndex()) != null)
- {
- int id = System.identityHashCode(container);
- idContainerMap.putIfAbsent(id, container);
- }
- if ((container = attributeIndex.getOrderingIndex()) != null)
- {
- int id = System.identityHashCode(container);
- idContainerMap.putIfAbsent(id, container);
- }
- if ((container = attributeIndex.getApproximateIndex()) != null)
- {
- int id = System.identityHashCode(container);
- idContainerMap.putIfAbsent(id, container);
- }
- Map<String, Collection<Index>> extensibleMap =
- attributeIndex.getExtensibleIndexes();
+ putInIdContainerMap(attributeIndex.getEqualityIndex());
+ putInIdContainerMap(attributeIndex.getPresenceIndex());
+ putInIdContainerMap(attributeIndex.getSubstringIndex());
+ putInIdContainerMap(attributeIndex.getOrderingIndex());
+ putInIdContainerMap(attributeIndex.getApproximateIndex());
+ Map<String, Collection<Index>> extensibleMap = attributeIndex.getExtensibleIndexes();
if (!extensibleMap.isEmpty())
{
- Collection<Index> subIndexes =
- attributeIndex.getExtensibleIndexes().get(
- EXTENSIBLE_INDEXER_ID_SUBSTRING);
- if (subIndexes != null)
- {
- for (DatabaseContainer subIndex : subIndexes)
- {
- int id = System.identityHashCode(subIndex);
- idContainerMap.putIfAbsent(id, subIndex);
- }
- }
- Collection<Index> sharedIndexes =
- attributeIndex.getExtensibleIndexes().get(
- EXTENSIBLE_INDEXER_ID_SHARED);
- if (sharedIndexes != null)
- {
- for (DatabaseContainer sharedIndex : sharedIndexes)
- {
- int id = System.identityHashCode(sharedIndex);
- idContainerMap.putIfAbsent(id, sharedIndex);
- }
- }
+ putInIdContainerMap(extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING));
+ putInIdContainerMap(extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED));
}
}
}
+ private void putInIdContainerMap(Collection<Index> indexes)
+ {
+ if (indexes != null)
+ {
+ for (DatabaseContainer index : indexes)
+ {
+ putInIdContainerMap(index);
+ }
+ }
+ }
+
+ private void putInIdContainerMap(DatabaseContainer container)
+ {
+ if (container != null)
+ {
+ int id = System.identityHashCode(container);
+ idContainerMap.putIfAbsent(id, container);
+ }
+ }
+
private Suffix getSuffix(EntryContainer entryContainer)
throws ConfigException, InitializationException
{
@@ -741,16 +709,7 @@
while (includeBranchIterator.hasNext())
{
DN includeDN = includeBranchIterator.next();
- boolean keep = true;
- for (DN dn : includeBranches)
- {
- if (!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
- {
- keep = false;
- break;
- }
- }
- if (!keep)
+ if (!isAnyNotEqualAndAncestorOf(includeBranches, includeDN))
{
includeBranchIterator.remove();
}
@@ -763,16 +722,7 @@
while (excludeBranchIterator.hasNext())
{
DN excludeDN = excludeBranchIterator.next();
- boolean keep = false;
- for (DN includeDN : includeBranches)
- {
- if (includeDN.isAncestorOf(excludeDN))
- {
- keep = true;
- break;
- }
- }
- if (!keep)
+ if (!isAnyAncestorOf(includeBranches, excludeDN))
{
excludeBranchIterator.remove();
}
@@ -793,10 +743,7 @@
{
// Create a temp entry container
sourceEntryContainer = entryContainer;
- entryContainer =
- rootContainer.openEntryContainer(baseDN, baseDN
- .toNormalizedString()
- + "_importTmp");
+ entryContainer = rootContainer.openEntryContainer(baseDN, baseDN.toNormalizedString() + "_importTmp");
}
}
}
@@ -804,6 +751,30 @@
includeBranches, excludeBranches);
}
+ private boolean isAnyNotEqualAndAncestorOf(List<DN> dns, DN childDN)
+ {
+ for (DN dn : dns)
+ {
+ if (!dn.equals(childDN) && dn.isAncestorOf(childDN))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isAnyAncestorOf(List<DN> dns, DN childDN)
+ {
+ for (DN dn : dns)
+ {
+ if (dn.isAncestorOf(childDN))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Rebuild the indexes using the specified rootcontainer.
*
@@ -827,22 +798,12 @@
this.rootContainer = rootContainer;
long startTime = System.currentTimeMillis();
- DiskSpaceMonitor tmpMonitor =
- new DiskSpaceMonitor(backendConfiguration.getBackendId()
- + " backend index rebuild tmp directory", tempDir,
- backendConfiguration.getDiskLowThreshold(), backendConfiguration
- .getDiskFullThreshold(), 5, TimeUnit.SECONDS, this);
+ DiskSpaceMonitor tmpMonitor = createDiskSpaceMonitor(tempDir, "backend index rebuild tmp directory");
tmpMonitor.initializeMonitorProvider(null);
DirectoryServer.registerMonitorProvider(tmpMonitor);
- File parentDirectory =
- getFileForPath(backendConfiguration.getDBDirectory());
- File backendDirectory =
- new File(parentDirectory, backendConfiguration.getBackendId());
- DiskSpaceMonitor dbMonitor =
- new DiskSpaceMonitor(backendConfiguration.getBackendId()
- + " backend index rebuild DB directory", backendDirectory,
- backendConfiguration.getDiskLowThreshold(), backendConfiguration
- .getDiskFullThreshold(), 5, TimeUnit.SECONDS, this);
+ File parentDirectory = getFileForPath(backendConfiguration.getDBDirectory());
+ File backendDirectory = new File(parentDirectory, backendConfiguration.getBackendId());
+ DiskSpaceMonitor dbMonitor = createDiskSpaceMonitor(backendDirectory, "backend index rebuild DB directory");
dbMonitor.initializeMonitorProvider(null);
DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -898,23 +859,12 @@
throw new InitializationException(message, ioe);
}
- tmpMonitor =
- new DiskSpaceMonitor(backendConfiguration.getBackendId()
- + " backend import tmp directory", tempDir, backendConfiguration
- .getDiskLowThreshold(),
- backendConfiguration.getDiskFullThreshold(), 5, TimeUnit.SECONDS,
- this);
+ tmpMonitor = createDiskSpaceMonitor(tempDir, "backend import tmp directory");
tmpMonitor.initializeMonitorProvider(null);
DirectoryServer.registerMonitorProvider(tmpMonitor);
- File parentDirectory =
- getFileForPath(backendConfiguration.getDBDirectory());
- File backendDirectory =
- new File(parentDirectory, backendConfiguration.getBackendId());
- dbMonitor =
- new DiskSpaceMonitor(backendConfiguration.getBackendId()
- + " backend import DB directory", backendDirectory,
- backendConfiguration.getDiskLowThreshold(), backendConfiguration
- .getDiskFullThreshold(), 5, TimeUnit.SECONDS, this);
+ File parentDirectory = getFileForPath(backendConfiguration.getDBDirectory());
+ File backendDirectory = new File(parentDirectory, backendConfiguration.getBackendId());
+ dbMonitor = createDiskSpaceMonitor(backendDirectory, "backend import DB directory");
dbMonitor.initializeMonitorProvider(null);
DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -955,11 +905,11 @@
{
rate = 1000f * reader.getEntriesRead() / importTime;
}
- logger.info(NOTE_JEB_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount
- .get(), reader.getEntriesIgnored(), reader.getEntriesRejected(),
- migratedCount, importTime / 1000, rate);
- return new LDIFImportResult(reader.getEntriesRead(), reader
- .getEntriesRejected(), reader.getEntriesIgnored());
+ logger.info(NOTE_JEB_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount.get(),
+ reader.getEntriesIgnored(), reader.getEntriesRejected(),
+ migratedCount, importTime / 1000, rate);
+ return new LDIFImportResult(reader.getEntriesRead(),
+ reader.getEntriesRejected(), reader.getEntriesIgnored());
}
finally
{
@@ -988,6 +938,13 @@
}
}
+ private DiskSpaceMonitor createDiskSpaceMonitor(File dir, String backendSuffix)
+ {
+ final LocalDBBackendCfg cfg = backendConfiguration;
+ return new DiskSpaceMonitor(cfg.getBackendId() + " " + backendSuffix, dir,
+ cfg.getDiskLowThreshold(), cfg.getDiskFullThreshold(), 5, TimeUnit.SECONDS, this);
+ }
+
private void recursiveDelete(File dir)
{
if (dir.listFiles() != null)
@@ -1041,9 +998,7 @@
}
catch (DatabaseException ex)
{
- LocalizableMessage message =
- NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
- throw new JebException(message);
+ throw new JebException(NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()));
}
}
@@ -1051,25 +1006,15 @@
{
initializeIndexBuffers();
FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
- ScheduledThreadPoolExecutor timerService =
- new ScheduledThreadPoolExecutor(1);
- timerService.scheduleAtFixedRate(progressTask, TIMER_INTERVAL,
- TIMER_INTERVAL, TimeUnit.MILLISECONDS);
+ ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
+ timerService.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
bufferSortService = Executors.newFixedThreadPool(threadCount);
ExecutorService execService = Executors.newFixedThreadPool(threadCount);
List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
tasks.add(new MigrateExistingTask());
- List<Future<Void>> results = execService.invokeAll(tasks);
- for (Future<Void> result : results)
- {
- if (!result.isDone())
- {
- result.get();
- }
- }
+ getAll(execService.invokeAll(tasks));
tasks.clear();
- results.clear();
if (importConfiguration.appendToExistingData()
&& importConfiguration.replaceExistingEntries())
{
@@ -1085,33 +1030,12 @@
tasks.add(new ImportTask());
}
}
- results = execService.invokeAll(tasks);
- for (Future<Void> result : results)
- {
- if (!result.isDone())
- {
- result.get();
- }
- }
+ getAll(execService.invokeAll(tasks));
tasks.clear();
- results.clear();
tasks.add(new MigrateExcludedTask());
- results = execService.invokeAll(tasks);
- for (Future<Void> result : results)
- {
- if (!result.isDone())
- {
- result.get();
- }
- }
+ getAll(execService.invokeAll(tasks));
stopScratchFileWriters();
- for (Future<?> result : scratchFileWriterFutures)
- {
- if (!result.isDone())
- {
- result.get();
- }
- }
+ getAll(scratchFileWriterFutures);
// Shutdown the executor services
timerService.shutdown();
@@ -1168,8 +1092,7 @@
int buffers;
while (true)
{
- final List<IndexManager> totList =
- new ArrayList<IndexManager>(DNIndexMgrList);
+ final List<IndexManager> totList = new ArrayList<IndexManager>(DNIndexMgrList);
totList.addAll(indexMgrList);
Collections.sort(totList, Collections.reverseOrder());
@@ -1212,8 +1135,7 @@
// processing of smaller indexes.
dbThreads = Math.max(2, dbThreads);
- logger.info(NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT, availableMemory,
- readAheadSize, buffers);
+ logger.info(NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT, availableMemory, readAheadSize, buffers);
// Start indexing tasks.
List<Future<Void>> futures = new LinkedList<Future<Void>>();
@@ -1223,26 +1145,25 @@
// Start DN processing first.
for (IndexManager dnMgr : DNIndexMgrList)
{
- futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, permits,
- buffers, readAheadSize)));
+ futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, permits, buffers, readAheadSize)));
}
for (IndexManager mgr : indexMgrList)
{
- futures.add(dbService.submit(new IndexDBWriteTask(mgr, permits, buffers,
- readAheadSize)));
+ futures.add(dbService.submit(new IndexDBWriteTask(mgr, permits, buffers, readAheadSize)));
}
-
- for (Future<Void> result : futures)
- {
- if (!result.isDone())
- {
- result.get();
- }
- }
+ getAll(futures);
dbService.shutdown();
}
+ private <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException
+ {
+ for (Future<?> result : futures)
+ {
+ result.get();
+ }
+ }
+
private void stopScratchFileWriters()
{
IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0);
@@ -1270,22 +1191,16 @@
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
LockMode lockMode = LockMode.DEFAULT;
- OperationStatus status;
logger.info(NOTE_JEB_IMPORT_MIGRATION_START, "excluded", suffix.getBaseDN());
- Cursor cursor =
- entryContainer.getDN2ID().openCursor(null,
- CursorConfig.READ_COMMITTED);
- Comparator<byte[]> comparator =
- entryContainer.getDN2ID().getComparator();
+ Cursor cursor = entryContainer.getDN2ID().openCursor(null, CursorConfig.READ_COMMITTED);
+ Comparator<byte[]> comparator = entryContainer.getDN2ID().getComparator();
try
{
for (DN excludedDN : suffix.getExcludeBranches())
{
- byte[] bytes =
- JebFormat.dnToDNKey(excludedDN, suffix.getBaseDN()
- .size());
+ byte[] bytes = JebFormat.dnToDNKey(excludedDN, suffix.getBaseDN().size());
key.setData(bytes);
- status = cursor.getSearchKeyRange(key, data, lockMode);
+ OperationStatus status = cursor.getSearchKeyRange(key, data, lockMode);
if (status == OperationStatus.SUCCESS
&& Arrays.equals(key.getData(), bytes))
{
@@ -1300,9 +1215,7 @@
&& !importConfiguration.isCancelled() && !isCanceled)
{
EntryID id = new EntryID(data);
- Entry entry =
- entryContainer.getID2Entry().get(null, id,
- LockMode.DEFAULT);
+ Entry entry = entryContainer.getID2Entry().get(null, id, LockMode.DEFAULT);
processEntry(entry, rootContainer.getNextEntryID(), suffix);
migratedCount++;
status = cursor.getNext(key, data, lockMode);
@@ -1313,8 +1226,7 @@
}
catch (Exception e)
{
- logger.error(ERR_JEB_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR, e
- .getMessage());
+ logger.error(ERR_JEB_IMPORT_LDIF_MIGRATE_EXCLUDED_TASK_ERR, e.getMessage());
isCanceled = true;
throw e;
}
@@ -1340,14 +1252,12 @@
{
for (Suffix suffix : dnSuffixMap.values())
{
- List<byte[]> includeBranches =
- new ArrayList<byte[]>(suffix.getIncludeBranches().size());
+ List<byte[]> includeBranches = new ArrayList<byte[]>(suffix.getIncludeBranches().size());
for (DN includeBranch : suffix.getIncludeBranches())
{
if (includeBranch.isDescendantOf(suffix.getBaseDN()))
{
- includeBranches.add(JebFormat.dnToDNKey(includeBranch, suffix
- .getBaseDN().size()));
+ includeBranches.add(JebFormat.dnToDNKey(includeBranch, suffix.getBaseDN().size()));
}
}
@@ -1357,30 +1267,18 @@
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
LockMode lockMode = LockMode.DEFAULT;
- OperationStatus status;
logger.info(NOTE_JEB_IMPORT_MIGRATION_START, "existing", suffix.getBaseDN());
Cursor cursor = entryContainer.getDN2ID().openCursor(null, null);
try
{
- status = cursor.getFirst(key, data, lockMode);
+ OperationStatus status = cursor.getFirst(key, data, lockMode);
while (status == OperationStatus.SUCCESS
&& !importConfiguration.isCancelled() && !isCanceled)
{
- boolean found = false;
- for (byte[] includeBranch : includeBranches)
- {
- if (Arrays.equals(includeBranch, key.getData()))
- {
- found = true;
- break;
- }
- }
- if (!found)
+ if (!find(includeBranches, key.getData()))
{
EntryID id = new EntryID(data);
- Entry entry =
- entryContainer.getID2Entry()
- .get(null, id, LockMode.DEFAULT);
+ Entry entry = entryContainer.getID2Entry().get(null, id, LockMode.DEFAULT);
processEntry(entry, rootContainer.getNextEntryID(), suffix);
migratedCount++;
status = cursor.getNext(key, data, lockMode);
@@ -1409,8 +1307,7 @@
}
catch (Exception e)
{
- logger.error(ERR_JEB_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR, e
- .getMessage());
+ logger.error(ERR_JEB_IMPORT_LDIF_MIGRATE_EXISTING_TASK_ERR, e.getMessage());
isCanceled = true;
throw e;
}
@@ -1422,6 +1319,18 @@
}
return null;
}
+
+ private boolean find(List<byte[]> arrays, byte[] arrayToFind)
+ {
+ for (byte[] array : arrays)
+ {
+ if (Arrays.equals(array, arrayToFind))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
}
/**
@@ -1470,9 +1379,8 @@
return null;
}
- void processEntry(Entry entry, Suffix suffix) throws DatabaseException,
- DirectoryException, JebException, InterruptedException
-
+ void processEntry(Entry entry, Suffix suffix)
+ throws DatabaseException, DirectoryException, JebException, InterruptedException
{
DN entryDN = entry.getName();
DN2ID dn2id = suffix.getDN2ID();
@@ -1510,11 +1418,9 @@
}
void processAllIndexes(Suffix suffix, Entry entry, EntryID entryID)
- throws DatabaseException, DirectoryException, JebException,
- InterruptedException
+ throws DatabaseException, DirectoryException, JebException, InterruptedException
{
- for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
- .getAttrIndexMap().entrySet())
+ for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
{
AttributeType attributeType = mapEntry.getKey();
fillIndexKey(suffix, mapEntry, entry, attributeType, entryID);
@@ -1549,12 +1455,11 @@
*/
private class ImportTask implements Callable<Void>
{
- private final Map<IndexKey, IndexOutputBuffer> indexBufferMap =
- new HashMap<IndexKey, IndexOutputBuffer>();
+ private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>();
private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
private final EntryInformation entryInfo = new EntryInformation();
- private DatabaseEntry keyEntry = new DatabaseEntry(),
- valEntry = new DatabaseEntry();
+ private DatabaseEntry keyEntry = new DatabaseEntry();
+ private DatabaseEntry valEntry = new DatabaseEntry();
/** {@inheritDoc} */
@Override
@@ -1591,9 +1496,7 @@
}
void processEntry(Entry entry, EntryID entryID, Suffix suffix)
- throws DatabaseException, DirectoryException, JebException,
- InterruptedException
-
+ throws DatabaseException, DirectoryException, JebException, InterruptedException
{
DN entryDN = entry.getName();
if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
@@ -1628,26 +1531,22 @@
EntryID id = suffix.getDN2ID().get(null, entryDN, LockMode.DEFAULT);
if (id != null || !tmpEnv.insert(entryDN, keyEntry, valEntry))
{
- LocalizableMessage message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
- reader.rejectEntry(entry, message);
+ reader.rejectEntry(entry, WARN_JEB_IMPORT_ENTRY_EXISTS.get());
return false;
}
}
else if (!tmpEnv.insert(entryDN, keyEntry, valEntry))
{
- LocalizableMessage message = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
- reader.rejectEntry(entry, message);
+ reader.rejectEntry(entry, WARN_JEB_IMPORT_ENTRY_EXISTS.get());
return false;
}
return true;
}
void processIndexes(Suffix suffix, Entry entry, EntryID entryID)
- throws DatabaseException, DirectoryException, JebException,
- InterruptedException
+ throws DatabaseException, DirectoryException, JebException, InterruptedException
{
- for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
- .getAttrIndexMap().entrySet())
+ for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
{
AttributeType attributeType = mapEntry.getKey();
if (entry.hasAttribute(attributeType))
@@ -1664,28 +1563,32 @@
{
AttributeIndex attributeIndex = mapEntry.getValue();
IndexingOptions options = attributeIndex.getIndexingOptions();
- Index index;
- if ((index = attributeIndex.getEqualityIndex()) != null)
+ Index index = attributeIndex.getEqualityIndex();
+ if (index != null)
{
processAttribute(index, entry, entryID, options, new IndexKey(attributeType,
ImportIndexType.EQUALITY, index.getIndexEntryLimit()));
}
- if ((index = attributeIndex.getPresenceIndex()) != null)
+ index = attributeIndex.getPresenceIndex();
+ if (index != null)
{
processAttribute(index, entry, entryID, options, new IndexKey(attributeType,
ImportIndexType.PRESENCE, index.getIndexEntryLimit()));
}
- if ((index = attributeIndex.getSubstringIndex()) != null)
+ index = attributeIndex.getSubstringIndex();
+ if (index != null)
{
processAttribute(index, entry, entryID, options, new IndexKey(attributeType,
ImportIndexType.SUBSTRING, index.getIndexEntryLimit()));
}
- if ((index = attributeIndex.getOrderingIndex()) != null)
+ index = attributeIndex.getOrderingIndex();
+ if (index != null)
{
processAttribute(index, entry, entryID, options, new IndexKey(attributeType,
ImportIndexType.ORDERING, index.getIndexEntryLimit()));
}
- if ((index = attributeIndex.getApproximateIndex()) != null)
+ index = attributeIndex.getApproximateIndex();
+ if (index != null)
{
processAttribute(index, entry, entryID, options, new IndexKey(attributeType,
ImportIndexType.APPROXIMATE, index.getIndexEntryLimit()));
@@ -1695,33 +1598,25 @@
Transaction transaction = null;
vlvIdx.addEntry(transaction, entryID, entry);
}
- Map<String, Collection<Index>> extensibleMap =
- attributeIndex.getExtensibleIndexes();
+ Map<String, Collection<Index>> extensibleMap = attributeIndex.getExtensibleIndexes();
if (!extensibleMap.isEmpty())
{
- Collection<Index> subIndexes =
- attributeIndex.getExtensibleIndexes().get(
- EXTENSIBLE_INDEXER_ID_SUBSTRING);
- if (subIndexes != null)
+ Collection<Index> subIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING);
+ processAttributes(entry, attributeType, entryID, options, subIndexes, ImportIndexType.EX_SUBSTRING);
+ Collection<Index> sharedIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED);
+ processAttributes(entry, attributeType, entryID, options, sharedIndexes, ImportIndexType.EX_SHARED);
+ }
+ }
+
+ private void processAttributes(Entry entry, AttributeType attributeType, EntryID entryID, IndexingOptions options,
+ Collection<Index> indexes, ImportIndexType indexType) throws InterruptedException
+ {
+ if (indexes != null)
+ {
+ for (Index index : indexes)
{
- for (Index subIndex : subIndexes)
- {
- processAttribute(subIndex, entry, entryID, options, new IndexKey(
- attributeType, ImportIndexType.EX_SUBSTRING, subIndex
- .getIndexEntryLimit()));
- }
- }
- Collection<Index> sharedIndexes =
- attributeIndex.getExtensibleIndexes().get(
- EXTENSIBLE_INDEXER_ID_SHARED);
- if (sharedIndexes != null)
- {
- for (Index sharedIndex : sharedIndexes)
- {
- processAttribute(sharedIndex, entry, entryID, options, new IndexKey(
- attributeType, ImportIndexType.EX_SHARED, sharedIndex
- .getIndexEntryLimit()));
- }
+ IndexKey indexKey = new IndexKey(attributeType, indexType, index.getIndexEntryLimit());
+ processAttribute(index, entry, entryID, options, indexKey);
}
}
}
@@ -1739,10 +1634,8 @@
void flushIndexBuffers() throws InterruptedException, ExecutionException
{
- Set<Map.Entry<IndexKey, IndexOutputBuffer>> set =
- indexBufferMap.entrySet();
- Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> setIterator =
- set.iterator();
+ Set<Map.Entry<IndexKey, IndexOutputBuffer>> set = indexBufferMap.entrySet();
+ Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> setIterator = set.iterator();
while (setIterator.hasNext())
{
Map.Entry<IndexKey, IndexOutputBuffer> e = setIterator.next();
@@ -1752,8 +1645,7 @@
indexBuffer.setComparator(indexComparator);
indexBuffer.setIndexKey(indexKey);
indexBuffer.setDiscard();
- Future<Void> future =
- bufferSortService.submit(new SortTask(indexBuffer));
+ Future<Void> future = bufferSortService.submit(new SortTask(indexBuffer));
future.get();
}
}
@@ -1762,8 +1654,7 @@
IndexOutputBuffer.ComparatorBuffer<byte[]> comparator,
IndexKey indexKey, boolean insert) throws InterruptedException
{
- int sizeNeeded = IndexOutputBuffer.getRequiredSize(
- key.length, entryID.longValue());
+ int sizeNeeded = IndexOutputBuffer.getRequiredSize(key.length, entryID.longValue());
IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey);
if (indexBuffer == null)
{
@@ -1785,8 +1676,7 @@
return id;
}
- IndexOutputBuffer getNewIndexBuffer(int size)
- throws InterruptedException
+ IndexOutputBuffer getNewIndexBuffer(int size) throws InterruptedException
{
IndexOutputBuffer indexBuffer;
if (size > bufferSize)
@@ -1799,16 +1689,12 @@
indexBuffer = freeBufferQueue.take();
if (indexBuffer == null)
{
- LocalizableMessage message =
- LocalizableMessage.raw("Index buffer processing error.");
- throw new InterruptedException(message.toString());
+ throw new InterruptedException("Index buffer processing error.");
}
}
if (indexBuffer.isPoison())
{
- LocalizableMessage message =
- LocalizableMessage.raw("Cancel processing received.");
- throw new InterruptedException(message.toString());
+ throw new InterruptedException("Cancel processing received.");
}
return indexBuffer;
}
@@ -1817,11 +1703,9 @@
throws InterruptedException
{
DN2ID dn2id = suffix.getDN2ID();
- byte[] dnBytes =
- JebFormat.dnToDNKey(dn, suffix.getBaseDN().size());
- int id =
- processKey(dn2id, dnBytes, entryID, indexComparator, new IndexKey(
- dnType, ImportIndexType.DN, 1), true);
+ byte[] dnBytes = JebFormat.dnToDNKey(dn, suffix.getBaseDN().size());
+ IndexKey indexKey = new IndexKey(dnType, ImportIndexType.DN, 1);
+ int id = processKey(dn2id, dnBytes, entryID, indexComparator, indexKey, true);
idECMap.putIfAbsent(id, suffix.getEntryContainer());
}
@@ -2036,9 +1920,8 @@
final long kiloBytesRate = bytesReadInterval / deltaTime;
final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
- logger.info(NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT, indexMgr
- .getBufferFileName(), bytesReadPercent, kiloBytesRemaining,
- kiloBytesRate, currentBatch, totalBatches);
+ logger.info(NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT, indexMgr.getBufferFileName(),
+ bytesReadPercent, kiloBytesRemaining, kiloBytesRate, currentBatch, totalBatches);
lastBytesRead = tmpBytesRead;
}
@@ -2155,8 +2038,7 @@
}
catch (Exception e)
{
- logger.error(ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR, indexMgr
- .getBufferFileName(), e.getMessage());
+ logger.error(ERR_JEB_IMPORT_LDIF_INDEX_WRITE_DB_ERR, indexMgr.getBufferFileName(), e.getMessage());
throw e;
}
finally
@@ -2212,11 +2094,10 @@
{
dnState = dnStateMap.get(indexID);
}
- if (!dnState.checkParent(record))
+ if (dnState.checkParent(record))
{
- return;
+ dnState.writeToDB();
}
- dnState.writeToDB();
}
private void addBytesRead(int bytesRead)
@@ -2263,8 +2144,7 @@
private ByteBuffer getParent(ByteBuffer buffer)
{
- int parentIndex =
- JebFormat.findDNKeyParent(buffer.array(), 0, buffer.limit());
+ int parentIndex = JebFormat.findDNKeyParent(buffer.array(), 0, buffer.limit());
if (parentIndex < 0)
{
// This is the root or base DN
@@ -2313,9 +2193,7 @@
DatabaseEntry key = new DatabaseEntry(parentDN.array(), 0, parentDN.limit());
DatabaseEntry value = new DatabaseEntry();
OperationStatus status;
- status =
- entryContainer.getDN2ID().read(null, key, value,
- LockMode.DEFAULT);
+ status = entryContainer.getDN2ID().read(null, key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
parentID = new EntryID(value);
@@ -2402,32 +2280,20 @@
private EntryID getParentID(ByteBuffer dn) throws DatabaseException
{
- EntryID nodeID;
- //Bypass the cache for append data, lookup the parent DN in the DN2ID
- //db.
- if (importConfiguration != null
- && importConfiguration.appendToExistingData())
+ // Bypass the cache for append data, lookup the parent DN in the DN2ID db
+ if (importConfiguration == null || !importConfiguration.appendToExistingData())
{
- DatabaseEntry key = new DatabaseEntry(dn.array(), 0, dn.limit());
- DatabaseEntry value = new DatabaseEntry();
- OperationStatus status;
- status =
- entryContainer.getDN2ID()
- .read(null, key, value, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- nodeID = new EntryID(value);
- }
- else
- {
- nodeID = null;
- }
+ return parentIDMap.get(dn);
}
- else
+ DatabaseEntry key = new DatabaseEntry(dn.array(), 0, dn.limit());
+ DatabaseEntry value = new DatabaseEntry();
+ OperationStatus status;
+ status = entryContainer.getDN2ID().read(null, key, value, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS)
{
- nodeID = parentIDMap.get(dn);
+ return new EntryID(value);
}
- return nodeID;
+ return null;
}
private void id2SubTree(EntryID childID) throws DirectoryException
@@ -2435,8 +2301,7 @@
if (parentID != null)
{
ImportIDSet idSet;
- if (!id2subtreeTree
- .containsKey(parentID.getDatabaseEntry().getData()))
+ if (!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData()))
{
idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount);
id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet);
@@ -2449,8 +2314,7 @@
// TODO:
// Instead of doing this,
// we can just walk to parent cache if available
- for (ByteBuffer dn = getParent(parentDN); dn != null; dn =
- getParent(dn))
+ for (ByteBuffer dn = getParent(parentDN); dn != null; dn = getParent(dn))
{
EntryID nodeID = getParentID(dn);
if (nodeID == null)
@@ -2459,8 +2323,7 @@
// Just ignore.
break;
}
- if (!id2subtreeTree
- .containsKey(nodeID.getDatabaseEntry().getData()))
+ if (!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData()))
{
idSet = new ImportIDSet(1, subTreeLimit, subTreeDoCount);
id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet);
@@ -2612,8 +2475,8 @@
}
catch (IOException e)
{
- logger.error(ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR, indexMgr
- .getBufferFile().getAbsolutePath(), e.getMessage());
+ logger.error(ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR,
+ indexMgr.getBufferFile().getAbsolutePath(), e.getMessage());
isCanceled = true;
throw e;
}
@@ -2858,8 +2721,7 @@
return null;
}
- private void createIndexWriterTask(IndexKey indexKey)
- throws FileNotFoundException
+ private void createIndexWriterTask(IndexKey indexKey) throws FileNotFoundException
{
synchronized (synObj)
{
@@ -2867,7 +2729,7 @@
{
return;
}
- boolean isDN = !indexKey.getIndexType().equals(ImportIndexType.DN);
+ boolean isDN = indexKey.getIndexType().equals(ImportIndexType.DN);
IndexManager indexMgr = new IndexManager(
indexKey.getName(), isDN, indexKey.getEntryLimit());
if (isDN)
@@ -3139,8 +3001,7 @@
if (!rebuildConfig.isClearDegradedState())
{
- logger.info(NOTE_JEB_REBUILD_FINAL_STATUS, entriesProcessed.get(),
- totalTime / 1000, rate);
+ logger.info(NOTE_JEB_REBUILD_FINAL_STATUS, entriesProcessed.get(), totalTime / 1000, rate);
}
}
@@ -3268,8 +3129,7 @@
{
// rebuildList contains the user-selected index(in USER_DEFINED mode).
final List<String> rebuildList = rebuildConfig.getRebuildList();
- for (final Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix
- .getAttrIndexMap().entrySet())
+ for (final Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
{
final AttributeType attributeType = mapEntry.getKey();
final AttributeIndex attributeIndex = mapEntry.getValue();
@@ -3359,16 +3219,15 @@
private void clearIndexes(boolean onlyDegraded) throws DatabaseException
{
- // Clears all the entry's container databases
- // which are containing the indexes.
-
+ // Clears all the entry's container databases which are containing the indexes
if (!onlyDegraded)
{
// dn2uri does not have a trusted status.
entryContainer.clearDatabase(entryContainer.getDN2URI());
}
- if (!onlyDegraded || !entryContainer.getID2Children().isTrusted()
+ if (!onlyDegraded
+ || !entryContainer.getID2Children().isTrusted()
|| !entryContainer.getID2Subtree().isTrusted())
{
entryContainer.clearDatabase(entryContainer.getDN2ID());
@@ -3452,9 +3311,7 @@
}
catch (DatabaseException ex)
{
- LocalizableMessage message =
- NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage());
- throw new JebException(message);
+ throw new JebException(NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()));
}
}
@@ -3462,8 +3319,7 @@
ExecutionException
{
initializeIndexBuffers();
- RebuildFirstPhaseProgressTask progressTask =
- new RebuildFirstPhaseProgressTask();
+ RebuildFirstPhaseProgressTask progressTask = new RebuildFirstPhaseProgressTask();
Timer timer = new Timer();
timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount);
@@ -3476,21 +3332,9 @@
tasks.add(this);
}
List<Future<Void>> results = rebuildIndexService.invokeAll(tasks);
- for (Future<Void> result : results)
- {
- if (!result.isDone())
- {
- result.get();
- }
- }
+ getAll(results);
stopScratchFileWriters();
- for (Future<?> result : scratchFileWriterFutures)
- {
- if (!result.isDone())
- {
- result.get();
- }
- }
+ getAll(scratchFileWriterFutures);
// Try to clear as much memory as possible.
rebuildIndexService.shutdown();
@@ -3511,8 +3355,7 @@
private void phaseTwo() throws InterruptedException, ExecutionException
{
- SecondPhaseProgressTask progressTask =
- new SecondPhaseProgressTask(entriesProcessed.get());
+ SecondPhaseProgressTask progressTask = new SecondPhaseProgressTask(entriesProcessed.get());
Timer timer2 = new Timer();
timer2.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
processIndexFiles();
@@ -3577,8 +3420,7 @@
LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
throw new InitializationException(msg);
}
- AttributeType attrType =
- DirectoryServer.getAttributeType(attrIndexParts[0]);
+ AttributeType attrType = DirectoryServer.getAttributeType(attrIndexParts[0]);
if (attrType == null)
{
LocalizableMessage msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index);
@@ -4365,7 +4207,6 @@
* Temporary environment used to check DN's when DN validation is performed
* during phase one processing. It is deleted after phase one processing.
*/
-
public final class TmpEnv implements DNCache
{
private String envPath;
--
Gitblit v1.10.0