From e73561d3b0db47696c578736a50489a454ad6f9c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Mar 2015 13:08:57 +0000
Subject: [PATCH] OPENDJ-1708 Persistit: no rebuild-index support
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | 233 +++++++++++++++++++++++++++++++++++-----------------------
1 files changed, 141 insertions(+), 92 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index e685a1d..757b048 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -859,22 +859,17 @@
InterruptedException, ExecutionException
{
this.rootContainer = rootContainer;
- final long startTime = System.currentTimeMillis();
try
{
- rootContainer.getStorage().write(new WriteOperation()
+ if (rebuildManager.rebuildConfig.isClearDegradedState())
{
- @Override
- public void run(WriteableStorage txn) throws Exception
- {
- rebuildManager.initialize();
- rebuildManager.printStartMessage(txn);
- rebuildManager.rebuildIndexes(txn);
- recursiveDelete(tempDir);
- rebuildManager.printStopMessage(startTime);
- }
- });
+ clearDegradedState();
+ }
+ else
+ {
+ rebuildIndexes();
+ }
}
catch (Exception e)
{
@@ -882,6 +877,54 @@
}
}
+ private void clearDegradedState() throws Exception
+ {
+ rootContainer.getStorage().write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ final long startTime = System.currentTimeMillis();
+ rebuildManager.initialize();
+ rebuildManager.printStartMessage(txn);
+ rebuildManager.clearDegradedState(txn);
+ recursiveDelete(tempDir);
+ rebuildManager.printStopMessage(startTime);
+ }
+ });
+ }
+
+ private void rebuildIndexes() throws Exception
+ {
+ final long startTime = System.currentTimeMillis();
+ final Storage storage = rootContainer.getStorage();
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ rebuildManager.initialize();
+ rebuildManager.printStartMessage(txn);
+ rebuildManager.preRebuildIndexes(txn);
+ }
+ });
+
+ rebuildManager.rebuildIndexesPhaseOne();
+ rebuildManager.throwIfCancelled();
+ rebuildManager.rebuildIndexesPhaseTwo();
+
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ rebuildManager.postRebuildIndexes(txn);
+ }
+ });
+ recursiveDelete(tempDir);
+ rebuildManager.printStopMessage(startTime);
+ }
+
/**
* Import a LDIF using the specified root container.
*
@@ -924,7 +967,7 @@
setIndexesTrusted(false);
final long startTime = System.currentTimeMillis();
- phaseOne(txn);
+ importPhaseOne(txn);
isPhaseOneDone = true;
final long phaseOneFinishTime = System.currentTimeMillis();
@@ -938,7 +981,7 @@
}
final long phaseTwoTime = System.currentTimeMillis();
- phaseTwo(txn);
+ importPhaseTwo();
if (isCanceled)
{
throw new InterruptedException("Import processing canceled.");
@@ -1035,7 +1078,16 @@
}
}
- private void phaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
+ /**
+ * Reads all entries from id2entry, and:
+ * <ol>
+ * <li>compute how the entry is indexed for each index</li>
+ * <li>store the result of indexing entries into in-memory index buffers</li>
+ * <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
+ * The scratch files will be read by phaseTwo to perform on-disk merge</li>
+ * </ol>
+ */
+ private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
{
initializeIndexBuffers();
@@ -1106,13 +1158,13 @@
}
}
- private void phaseTwo(WriteableStorage txn) throws InterruptedException, ExecutionException
+ private void importPhaseTwo() throws InterruptedException, ExecutionException
{
ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
try
{
- processIndexFiles(txn);
+ processIndexFiles();
}
finally
{
@@ -1120,7 +1172,11 @@
}
}
- private void processIndexFiles(WriteableStorage txn) throws InterruptedException, ExecutionException
+ /**
+ * Performs on-disk merge by reading several scratch files at once
+ * and write their ordered content into the target indexes.
+ */
+ private void processIndexFiles() throws InterruptedException, ExecutionException
{
if (bufferCount.get() == 0)
{
@@ -1190,17 +1246,20 @@
Semaphore permits = new Semaphore(buffers);
// Start DN processing first.
- submitIndexDBWriteTasks(DNIndexMgrList, txn, dbService, permits, buffers, readAheadSize, futures);
- submitIndexDBWriteTasks(indexMgrList, txn, dbService, permits, buffers, readAheadSize, futures);
+ submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
+ submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
getAll(futures);
shutdownAll(dbService);
}
- private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, WriteableStorage txn, ExecutorService dbService,
+ private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
{
for (IndexManager indexMgr : indexMgrs)
{
+ // avoid threading issues by allocating one writeable storage per thread
+ // DB transactions are generally tied to a single thread
+ WriteableStorage txn = this.rootContainer.getStorage().getWriteableStorage();
futures.add(dbService.submit(new IndexDBWriteTask(indexMgr, txn, permits, buffers, readAheadSize)));
}
}
@@ -1257,7 +1316,8 @@
while (success
&& ByteSequence.COMPARATOR.compare(key, end) < 0
- && !importConfiguration.isCancelled() && !isCanceled)
+ && !importConfiguration.isCancelled()
+ && !isCanceled)
{
EntryID id = new EntryID(cursor.getValue());
Entry entry = entryContainer.getID2Entry().get(txn, id);
@@ -1908,7 +1968,7 @@
}
finally
{
- close(bufferFile, bufferIndexFile);
+ close(bufferFile, bufferIndexFile, txn);
indexMgr.getBufferFile().delete();
indexMgr.getBufferIndexFile().delete();
@@ -2767,8 +2827,7 @@
/**
* The rebuild index manager handles all rebuild index related processing.
*/
- private class RebuildIndexManager extends ImportTask implements
- DiskSpaceMonitorHandler
+ private class RebuildIndexManager extends ImportTask implements DiskSpaceMonitorHandler
{
/** Rebuild index configuration. */
@@ -2927,55 +2986,43 @@
}
}
- /**
- * Perform rebuild index processing.
- *
- * @param txn
- * The database transaction
- * @throws InterruptedException
- * If an interrupted error occurred.
- * @throws ExecutionException
- * If an Execution error occurred.
- * @throws StorageRuntimeException
- * If an JEB error occurred.
- */
- public void rebuildIndexes(WriteableStorage txn)
- throws InterruptedException, ExecutionException, StorageRuntimeException
+ private void clearDegradedState(WriteableStorage txn)
{
- this.txn = txn;
- // Sets only the needed indexes.
- setIndexesListsToBeRebuilt();
+ setIndexesListsToBeRebuilt(txn);
+ logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
+ postRebuildIndexes(txn);
+ }
- if (!rebuildConfig.isClearDegradedState())
- {
- // If not in a 'clear degraded state' operation,
- // need to rebuild the indexes.
- setRebuildListIndexesTrusted(false);
- clearIndexes(txn, true);
- phaseOne();
- if (isCanceled)
- {
- throw new InterruptedException("Rebuild Index canceled.");
- }
- phaseTwo();
- }
- else
- {
- logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
- }
- setRebuildListIndexesTrusted(true);
+ private void preRebuildIndexes(WriteableStorage txn)
+ {
+ setIndexesListsToBeRebuilt(txn);
+ setRebuildListIndexesTrusted(txn, false);
+ clearIndexes(txn, true);
+ }
+
+ private void throwIfCancelled() throws InterruptedException
+ {
+ if (isCanceled)
+ {
+ throw new InterruptedException("Rebuild Index canceled.");
+ }
+ }
+
+ private void postRebuildIndexes(WriteableStorage txn)
+ {
+ setRebuildListIndexesTrusted(txn, true);
}
@SuppressWarnings("fallthrough")
- private void setIndexesListsToBeRebuilt() throws StorageRuntimeException
+ private void setIndexesListsToBeRebuilt(WriteableStorage txn) throws StorageRuntimeException
{
// Depends on rebuild mode, (re)building indexes' lists.
final RebuildMode mode = rebuildConfig.getRebuildMode();
switch (mode)
{
case ALL:
- rebuildIndexMap(false);
+ rebuildIndexMap(txn, false);
// falls through
case DEGRADED:
if (mode == RebuildMode.ALL
@@ -2991,7 +3038,7 @@
if (mode == RebuildMode.DEGRADED
|| entryContainer.getAttributeIndexes().isEmpty())
{
- rebuildIndexMap(true); // only degraded.
+ rebuildIndexMap(txn, true); // only degraded.
}
if (mode == RebuildMode.ALL || vlvIndexes.isEmpty())
{
@@ -3001,14 +3048,14 @@
case USER_DEFINED:
// false may be required if the user wants to rebuild specific index.
- rebuildIndexMap(false);
+ rebuildIndexMap(txn, false);
break;
default:
break;
}
}
- private void rebuildIndexMap(final boolean onlyDegraded)
+ private void rebuildIndexMap(WriteableStorage txn, boolean onlyDegraded)
{
// rebuildList contains the user-selected index(in USER_DEFINED mode).
final List<String> rebuildList = rebuildConfig.getRebuildList();
@@ -3020,7 +3067,7 @@
|| rebuildConfig.getRebuildMode() == RebuildMode.DEGRADED)
{
// Get all existing indexes for all && degraded mode.
- rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
+ rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
}
else if (!rebuildList.isEmpty())
{
@@ -3029,46 +3076,46 @@
{
if (attributeType.getNameOrOID().toLowerCase().equals(index.toLowerCase()))
{
- rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
+ rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
}
}
}
}
}
- private void rebuildAttributeIndexes(final AttributeIndex attrIndex, final AttributeType attrType,
- final boolean onlyDegraded) throws StorageRuntimeException
+ private void rebuildAttributeIndexes(WriteableStorage txn, AttributeIndex attrIndex, AttributeType attrType,
+ boolean onlyDegraded) throws StorageRuntimeException
{
- fillIndexMap(attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
+ fillIndexMap(txn, attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
+ fillIndexMap(txn, attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
+ fillIndexMap(txn, attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
+ fillIndexMap(txn, attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
+ fillIndexMap(txn, attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
final Map<String, Collection<Index>> extensibleMap = attrIndex.getExtensibleIndexes();
if (!extensibleMap.isEmpty())
{
final Collection<Index> subIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING);
- fillIndexMap(attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
+ fillIndexMap(txn, attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
final Collection<Index> sharedIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED);
- fillIndexMap(attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
+ fillIndexMap(txn, attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
}
}
- private void fillIndexMap(final AttributeType attrType, final Collection<Index> indexes,
- final ImportIndexType importIndexType, final boolean onlyDegraded)
+ private void fillIndexMap(WriteableStorage txn, AttributeType attrType, Collection<Index> indexes,
+ ImportIndexType importIndexType, boolean onlyDegraded)
{
if (indexes != null && !indexes.isEmpty())
{
final List<Index> mutableCopy = new LinkedList<Index>(indexes);
for (final Iterator<Index> it = mutableCopy.iterator(); it.hasNext();)
{
- final Index sharedIndex = it.next();
- if (!onlyDegraded || !sharedIndex.isTrusted())
+ final Index index = it.next();
+ if (!onlyDegraded || !index.isTrusted())
{
- if (!rebuildConfig.isClearDegradedState() || sharedIndex.getRecordCount(txn) == 0)
+ if (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0)
{
- putInIdContainerMap(sharedIndex);
+ putInIdContainerMap(index);
}
}
else
@@ -3084,10 +3131,11 @@
}
}
- private void fillIndexMap(final AttributeType attrType, final Index index,
- final ImportIndexType importIndexType, final boolean onlyDegraded)
+ private void fillIndexMap(WriteableStorage txn, AttributeType attrType, Index index,
+ ImportIndexType importIndexType, boolean onlyDegraded)
{
- if (index != null && (!onlyDegraded || !index.isTrusted())
+ if (index != null
+ && (!onlyDegraded || !index.isTrusted())
&& (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0))
{
putInIdContainerMap(index);
@@ -3148,7 +3196,7 @@
}
}
- private void setRebuildListIndexesTrusted(boolean trusted) throws StorageRuntimeException
+ private void setRebuildListIndexesTrusted(WriteableStorage txn, boolean trusted) throws StorageRuntimeException
{
try
{
@@ -3158,7 +3206,7 @@
ec.getID2Children().setTrusted(txn, trusted);
ec.getID2Subtree().setTrusted(txn, trusted);
}
- setTrusted(indexMap.values(), trusted);
+ setTrusted(txn, indexMap.values(), trusted);
if (!vlvIndexes.isEmpty())
{
for (VLVIndex vlvIndex : vlvIndexes)
@@ -3170,7 +3218,7 @@
{
for (Collection<Index> subIndexes : extensibleIndexMap.values())
{
- setTrusted(subIndexes, trusted);
+ setTrusted(txn, subIndexes, trusted);
}
}
}
@@ -3180,7 +3228,7 @@
}
}
- private void setTrusted(final Collection<Index> indexes, boolean trusted)
+ private void setTrusted(WriteableStorage txn, final Collection<Index> indexes, boolean trusted)
{
if (indexes != null && !indexes.isEmpty())
{
@@ -3191,7 +3239,8 @@
}
}
- private void phaseOne() throws StorageRuntimeException, InterruptedException,
+ /** @see Importer#importPhaseOne(WriteableStorage) */
+ private void rebuildIndexesPhaseOne() throws StorageRuntimeException, InterruptedException,
ExecutionException
{
initializeIndexBuffers();
@@ -3217,12 +3266,12 @@
indexKeyQueueMap.clear();
}
- private void phaseTwo() throws InterruptedException, ExecutionException
+ private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
{
final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
try
{
- processIndexFiles(txn);
+ processIndexFiles();
}
finally
{
--
Gitblit v1.10.0