From e73561d3b0db47696c578736a50489a454ad6f9c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Mar 2015 13:08:57 +0000
Subject: [PATCH] OPENDJ-1708 Persistit: no rebuild-index support
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | 233 ++++++++++++++++++++++++++++------------------
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java | 13 ++
opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java | 24 ++--
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/ReadableStorage.java | 7 +
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java | 23 ++++
5 files changed, 194 insertions(+), 106 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
index ef68ce8..4e66b49 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
@@ -27,13 +27,10 @@
import static com.persistit.Transaction.CommitPolicy.*;
import static java.util.Arrays.*;
-import static org.opends.messages.ConfigMessages.ERR_CONFIG_BACKEND_INSANE_MODE;
-import static org.opends.messages.ConfigMessages.ERR_CONFIG_BACKEND_MODE_INVALID;
+
+import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.JebMessages.*;
-import static org.opends.server.util.ServerConstants.ALERT_DESCRIPTION_DISK_FULL;
-import static org.opends.server.util.ServerConstants.ALERT_DESCRIPTION_DISK_SPACE_LOW;
-import static org.opends.server.util.ServerConstants.ALERT_TYPE_DISK_FULL;
-import static org.opends.server.util.ServerConstants.ALERT_TYPE_DISK_SPACE_LOW;
+import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.File;
@@ -482,7 +479,8 @@
return exchange;
}
- private void release()
+ @Override
+ public void close()
{
for (final Exchange ex : exchanges.values())
{
@@ -632,7 +630,7 @@
}
finally
{
- storageImpl.release();
+ storageImpl.close();
}
}
catch (final RollbackException e)
@@ -704,7 +702,7 @@
}
finally
{
- storageImpl.release();
+ storageImpl.close();
}
}
catch (final RollbackException e)
@@ -724,6 +722,12 @@
}
@Override
+ public WriteableStorage getWriteableStorage()
+ {
+ return new StorageImpl();
+ }
+
+ @Override
public boolean supportsBackupAndRestore()
{
return true;
@@ -962,7 +966,7 @@
setDBDirPermissions(config, backendDirectory);
}
- /** {@inheritDoc} */
+ @Override
public void removeStorageFiles() throws StorageRuntimeException
{
if (!backendDirectory.isDirectory())
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index e685a1d..757b048 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -859,22 +859,17 @@
InterruptedException, ExecutionException
{
this.rootContainer = rootContainer;
- final long startTime = System.currentTimeMillis();
try
{
- rootContainer.getStorage().write(new WriteOperation()
+ if (rebuildManager.rebuildConfig.isClearDegradedState())
{
- @Override
- public void run(WriteableStorage txn) throws Exception
- {
- rebuildManager.initialize();
- rebuildManager.printStartMessage(txn);
- rebuildManager.rebuildIndexes(txn);
- recursiveDelete(tempDir);
- rebuildManager.printStopMessage(startTime);
- }
- });
+ clearDegradedState();
+ }
+ else
+ {
+ rebuildIndexes();
+ }
}
catch (Exception e)
{
@@ -882,6 +877,54 @@
}
}
+ private void clearDegradedState() throws Exception
+ {
+ rootContainer.getStorage().write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ final long startTime = System.currentTimeMillis();
+ rebuildManager.initialize();
+ rebuildManager.printStartMessage(txn);
+ rebuildManager.clearDegradedState(txn);
+ recursiveDelete(tempDir);
+ rebuildManager.printStopMessage(startTime);
+ }
+ });
+ }
+
+ private void rebuildIndexes() throws Exception
+ {
+ final long startTime = System.currentTimeMillis();
+ final Storage storage = rootContainer.getStorage();
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ rebuildManager.initialize();
+ rebuildManager.printStartMessage(txn);
+ rebuildManager.preRebuildIndexes(txn);
+ }
+ });
+
+ rebuildManager.rebuildIndexesPhaseOne();
+ rebuildManager.throwIfCancelled();
+ rebuildManager.rebuildIndexesPhaseTwo();
+
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ rebuildManager.postRebuildIndexes(txn);
+ }
+ });
+ recursiveDelete(tempDir);
+ rebuildManager.printStopMessage(startTime);
+ }
+
/**
* Import a LDIF using the specified root container.
*
@@ -924,7 +967,7 @@
setIndexesTrusted(false);
final long startTime = System.currentTimeMillis();
- phaseOne(txn);
+ importPhaseOne(txn);
isPhaseOneDone = true;
final long phaseOneFinishTime = System.currentTimeMillis();
@@ -938,7 +981,7 @@
}
final long phaseTwoTime = System.currentTimeMillis();
- phaseTwo(txn);
+ importPhaseTwo();
if (isCanceled)
{
throw new InterruptedException("Import processing canceled.");
@@ -1035,7 +1078,16 @@
}
}
- private void phaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
+ /**
+ * Reads all entries from id2entry, and:
+ * <ol>
+ * <li>compute how the entry is indexed for each index</li>
+ * <li>store the result of indexing entries into in-memory index buffers</li>
+ * <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
+ * The scratch files will be read by phaseTwo to perform on-disk merge</li>
+ * </ol>
+ */
+ private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
{
initializeIndexBuffers();
@@ -1106,13 +1158,13 @@
}
}
- private void phaseTwo(WriteableStorage txn) throws InterruptedException, ExecutionException
+ private void importPhaseTwo() throws InterruptedException, ExecutionException
{
ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
try
{
- processIndexFiles(txn);
+ processIndexFiles();
}
finally
{
@@ -1120,7 +1172,11 @@
}
}
- private void processIndexFiles(WriteableStorage txn) throws InterruptedException, ExecutionException
+ /**
+ * Performs on-disk merge by reading several scratch files at once
+ * and write their ordered content into the target indexes.
+ */
+ private void processIndexFiles() throws InterruptedException, ExecutionException
{
if (bufferCount.get() == 0)
{
@@ -1190,17 +1246,20 @@
Semaphore permits = new Semaphore(buffers);
// Start DN processing first.
- submitIndexDBWriteTasks(DNIndexMgrList, txn, dbService, permits, buffers, readAheadSize, futures);
- submitIndexDBWriteTasks(indexMgrList, txn, dbService, permits, buffers, readAheadSize, futures);
+ submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
+ submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
getAll(futures);
shutdownAll(dbService);
}
- private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, WriteableStorage txn, ExecutorService dbService,
+ private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
{
for (IndexManager indexMgr : indexMgrs)
{
+ // avoid threading issues by allocating one writeable storage per thread
+ // DB transactions are generally tied to a single thread
+ WriteableStorage txn = this.rootContainer.getStorage().getWriteableStorage();
futures.add(dbService.submit(new IndexDBWriteTask(indexMgr, txn, permits, buffers, readAheadSize)));
}
}
@@ -1257,7 +1316,8 @@
while (success
&& ByteSequence.COMPARATOR.compare(key, end) < 0
- && !importConfiguration.isCancelled() && !isCanceled)
+ && !importConfiguration.isCancelled()
+ && !isCanceled)
{
EntryID id = new EntryID(cursor.getValue());
Entry entry = entryContainer.getID2Entry().get(txn, id);
@@ -1908,7 +1968,7 @@
}
finally
{
- close(bufferFile, bufferIndexFile);
+ close(bufferFile, bufferIndexFile, txn);
indexMgr.getBufferFile().delete();
indexMgr.getBufferIndexFile().delete();
@@ -2767,8 +2827,7 @@
/**
* The rebuild index manager handles all rebuild index related processing.
*/
- private class RebuildIndexManager extends ImportTask implements
- DiskSpaceMonitorHandler
+ private class RebuildIndexManager extends ImportTask implements DiskSpaceMonitorHandler
{
/** Rebuild index configuration. */
@@ -2927,55 +2986,43 @@
}
}
- /**
- * Perform rebuild index processing.
- *
- * @param txn
- * The database transaction
- * @throws InterruptedException
- * If an interrupted error occurred.
- * @throws ExecutionException
- * If an Execution error occurred.
- * @throws StorageRuntimeException
- * If an JEB error occurred.
- */
- public void rebuildIndexes(WriteableStorage txn)
- throws InterruptedException, ExecutionException, StorageRuntimeException
+ private void clearDegradedState(WriteableStorage txn)
{
- this.txn = txn;
- // Sets only the needed indexes.
- setIndexesListsToBeRebuilt();
+ setIndexesListsToBeRebuilt(txn);
+ logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
+ postRebuildIndexes(txn);
+ }
- if (!rebuildConfig.isClearDegradedState())
- {
- // If not in a 'clear degraded state' operation,
- // need to rebuild the indexes.
- setRebuildListIndexesTrusted(false);
- clearIndexes(txn, true);
- phaseOne();
- if (isCanceled)
- {
- throw new InterruptedException("Rebuild Index canceled.");
- }
- phaseTwo();
- }
- else
- {
- logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
- }
- setRebuildListIndexesTrusted(true);
+ private void preRebuildIndexes(WriteableStorage txn)
+ {
+ setIndexesListsToBeRebuilt(txn);
+ setRebuildListIndexesTrusted(txn, false);
+ clearIndexes(txn, true);
+ }
+
+ private void throwIfCancelled() throws InterruptedException
+ {
+ if (isCanceled)
+ {
+ throw new InterruptedException("Rebuild Index canceled.");
+ }
+ }
+
+ private void postRebuildIndexes(WriteableStorage txn)
+ {
+ setRebuildListIndexesTrusted(txn, true);
}
@SuppressWarnings("fallthrough")
- private void setIndexesListsToBeRebuilt() throws StorageRuntimeException
+ private void setIndexesListsToBeRebuilt(WriteableStorage txn) throws StorageRuntimeException
{
// Depends on rebuild mode, (re)building indexes' lists.
final RebuildMode mode = rebuildConfig.getRebuildMode();
switch (mode)
{
case ALL:
- rebuildIndexMap(false);
+ rebuildIndexMap(txn, false);
// falls through
case DEGRADED:
if (mode == RebuildMode.ALL
@@ -2991,7 +3038,7 @@
if (mode == RebuildMode.DEGRADED
|| entryContainer.getAttributeIndexes().isEmpty())
{
- rebuildIndexMap(true); // only degraded.
+ rebuildIndexMap(txn, true); // only degraded.
}
if (mode == RebuildMode.ALL || vlvIndexes.isEmpty())
{
@@ -3001,14 +3048,14 @@
case USER_DEFINED:
// false may be required if the user wants to rebuild specific index.
- rebuildIndexMap(false);
+ rebuildIndexMap(txn, false);
break;
default:
break;
}
}
- private void rebuildIndexMap(final boolean onlyDegraded)
+ private void rebuildIndexMap(WriteableStorage txn, boolean onlyDegraded)
{
// rebuildList contains the user-selected index(in USER_DEFINED mode).
final List<String> rebuildList = rebuildConfig.getRebuildList();
@@ -3020,7 +3067,7 @@
|| rebuildConfig.getRebuildMode() == RebuildMode.DEGRADED)
{
// Get all existing indexes for all && degraded mode.
- rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
+ rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
}
else if (!rebuildList.isEmpty())
{
@@ -3029,46 +3076,46 @@
{
if (attributeType.getNameOrOID().toLowerCase().equals(index.toLowerCase()))
{
- rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
+ rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
}
}
}
}
}
- private void rebuildAttributeIndexes(final AttributeIndex attrIndex, final AttributeType attrType,
- final boolean onlyDegraded) throws StorageRuntimeException
+ private void rebuildAttributeIndexes(WriteableStorage txn, AttributeIndex attrIndex, AttributeType attrType,
+ boolean onlyDegraded) throws StorageRuntimeException
{
- fillIndexMap(attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
- fillIndexMap(attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
+ fillIndexMap(txn, attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
+ fillIndexMap(txn, attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
+ fillIndexMap(txn, attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
+ fillIndexMap(txn, attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
+ fillIndexMap(txn, attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
final Map<String, Collection<Index>> extensibleMap = attrIndex.getExtensibleIndexes();
if (!extensibleMap.isEmpty())
{
final Collection<Index> subIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING);
- fillIndexMap(attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
+ fillIndexMap(txn, attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
final Collection<Index> sharedIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED);
- fillIndexMap(attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
+ fillIndexMap(txn, attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
}
}
- private void fillIndexMap(final AttributeType attrType, final Collection<Index> indexes,
- final ImportIndexType importIndexType, final boolean onlyDegraded)
+ private void fillIndexMap(WriteableStorage txn, AttributeType attrType, Collection<Index> indexes,
+ ImportIndexType importIndexType, boolean onlyDegraded)
{
if (indexes != null && !indexes.isEmpty())
{
final List<Index> mutableCopy = new LinkedList<Index>(indexes);
for (final Iterator<Index> it = mutableCopy.iterator(); it.hasNext();)
{
- final Index sharedIndex = it.next();
- if (!onlyDegraded || !sharedIndex.isTrusted())
+ final Index index = it.next();
+ if (!onlyDegraded || !index.isTrusted())
{
- if (!rebuildConfig.isClearDegradedState() || sharedIndex.getRecordCount(txn) == 0)
+ if (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0)
{
- putInIdContainerMap(sharedIndex);
+ putInIdContainerMap(index);
}
}
else
@@ -3084,10 +3131,11 @@
}
}
- private void fillIndexMap(final AttributeType attrType, final Index index,
- final ImportIndexType importIndexType, final boolean onlyDegraded)
+ private void fillIndexMap(WriteableStorage txn, AttributeType attrType, Index index,
+ ImportIndexType importIndexType, boolean onlyDegraded)
{
- if (index != null && (!onlyDegraded || !index.isTrusted())
+ if (index != null
+ && (!onlyDegraded || !index.isTrusted())
&& (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0))
{
putInIdContainerMap(index);
@@ -3148,7 +3196,7 @@
}
}
- private void setRebuildListIndexesTrusted(boolean trusted) throws StorageRuntimeException
+ private void setRebuildListIndexesTrusted(WriteableStorage txn, boolean trusted) throws StorageRuntimeException
{
try
{
@@ -3158,7 +3206,7 @@
ec.getID2Children().setTrusted(txn, trusted);
ec.getID2Subtree().setTrusted(txn, trusted);
}
- setTrusted(indexMap.values(), trusted);
+ setTrusted(txn, indexMap.values(), trusted);
if (!vlvIndexes.isEmpty())
{
for (VLVIndex vlvIndex : vlvIndexes)
@@ -3170,7 +3218,7 @@
{
for (Collection<Index> subIndexes : extensibleIndexMap.values())
{
- setTrusted(subIndexes, trusted);
+ setTrusted(txn, subIndexes, trusted);
}
}
}
@@ -3180,7 +3228,7 @@
}
}
- private void setTrusted(final Collection<Index> indexes, boolean trusted)
+ private void setTrusted(WriteableStorage txn, final Collection<Index> indexes, boolean trusted)
{
if (indexes != null && !indexes.isEmpty())
{
@@ -3191,7 +3239,8 @@
}
}
- private void phaseOne() throws StorageRuntimeException, InterruptedException,
+ /** @see Importer#importPhaseOne(WriteableStorage) */
+ private void rebuildIndexesPhaseOne() throws StorageRuntimeException, InterruptedException,
ExecutionException
{
initializeIndexBuffers();
@@ -3217,12 +3266,12 @@
indexKeyQueueMap.clear();
}
- private void phaseTwo() throws InterruptedException, ExecutionException
+ private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
{
final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
try
{
- processIndexFiles(txn);
+ processIndexFiles();
}
finally
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
index 8854d73..0d8217f 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
@@ -138,6 +138,12 @@
return value;
}
+ @Override
+ public void close()
+ {
+ logger.trace("Storage@%s.ReadableStorage@%s.close()", storageId(), id());
+ }
+
private int id()
{
return System.identityHashCode(this);
@@ -251,6 +257,12 @@
return isUpdated;
}
+ @Override
+ public void close()
+ {
+ logger.trace("Storage@%s.WriteableStorage@%s.close()", storageId(), id());
+ }
+
private int id()
{
return System.identityHashCode(this);
@@ -370,6 +382,17 @@
storage.write(op);
}
+ @Override
+ public WriteableStorage getWriteableStorage()
+ {
+ final WriteableStorage writeableStorage = storage.getWriteableStorage();
+ if (logger.isTraceEnabled())
+ {
+ return new TracedWriteableStorage(writeableStorage);
+ }
+ return writeableStorage;
+ }
+
private String hex(final ByteSequence bytes)
{
return bytes != null ? bytes.toByteString().toHexString() : null;
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/ReadableStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/ReadableStorage.java
index 8731037..96c6a41 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/ReadableStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/ReadableStorage.java
@@ -25,13 +25,15 @@
*/
package org.opends.server.backends.pluggable.spi;
+import java.io.Closeable;
+
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
/**
* Represents a readable transaction on a storage engine.
*/
-public interface ReadableStorage
+public interface ReadableStorage extends Closeable
{
/**
* Reads the record's value associated to the provided key, in the tree whose name is provided.
@@ -75,4 +77,7 @@
* @return the number of key/value pairs in the provided tree.
*/
long getRecordCount(TreeName treeName);
+
+ @Override
+ public void close();
}
\ No newline at end of file
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
index 780e90f..4855f36 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
@@ -79,9 +79,12 @@
*/
void write(WriteOperation writeOperation) throws Exception;
- /** {@inheritDoc} */
- @Override
- void close();
+ /**
+ * Returns a new writeable storage.
+ *
+ * @return a new writeable storage
+ */
+ WriteableStorage getWriteableStorage();
/**
* Remove all files for a backend of this storage.
@@ -123,4 +126,8 @@
* If backup and restore is not supported by this storage.
*/
FilenameFilter getFilesToBackupFilter();
+
+ /** {@inheritDoc} */
+ @Override
+ void close();
}
--
Gitblit v1.10.0