From 5d07ec161328a94de355aa4bf93918a2da5a8602 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 30 Apr 2015 14:20:06 +0000
Subject: [PATCH] OPENDJ-1801 (CR-6815) Revise usage of storage.open() and startImport()
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | 122 +++++++++++++++++++++++-----------------
1 files changed, 70 insertions(+), 52 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 90271fc..143d7d5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -1061,7 +1061,7 @@
}
}
- private void importPhaseTwo() throws InterruptedException, ExecutionException
+ private void importPhaseTwo() throws Exception
{
ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
@@ -1079,7 +1079,7 @@
* Performs on-disk merge by reading several scratch files at once
* and write their ordered content into the target indexes.
*/
- private void processIndexFiles() throws InterruptedException, ExecutionException
+ private void processIndexFiles() throws Exception
{
if (bufferCount.get() == 0)
{
@@ -1147,20 +1147,30 @@
Semaphore permits = new Semaphore(buffers);
// Start DN processing first.
- List<Future<Void>> futures = new LinkedList<>();
- submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
- submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
- getAll(futures);
+ Storage storage = rootContainer.getStorage();
+ storage.close();
+ try (final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport())
+ {
+ List<Future<Void>> futures = new LinkedList<>();
+ submitIndexDBWriteTasks(DNIndexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
+ submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
+ getAll(futures);
+ }
+ finally
+ {
+ storage.open();
+ }
+
shutdownAll(dbService);
}
- private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
- Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
+ private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs,
+ org.opends.server.backends.pluggable.spi.Importer importer,
+ ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
{
for (IndexManager indexMgr : indexMgrs)
{
- futures.add(dbService.submit(
- new IndexDBWriteTask(rootContainer.getStorage(), indexMgr, permits, buffers, readAheadSize)));
+ futures.add(dbService.submit(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize)));
}
}
@@ -1706,7 +1716,7 @@
*/
private final class IndexDBWriteTask implements Callable<Void>
{
- private final Storage storage;
+ private final org.opends.server.backends.pluggable.spi.Importer importer;
private final IndexManager indexMgr;
private final int cacheSize;
/** indexID => DNState map */
@@ -1728,10 +1738,10 @@
/**
* Creates a new index DB writer.
*
+ * @param importer
+ * The importer
* @param indexMgr
* The index manager.
- * @param storage
- * Where to store data
* @param permits
* The semaphore used for restricting the number of buffer allocations.
* @param maxPermits
@@ -1739,9 +1749,10 @@
* @param cacheSize
* The buffer cache size.
*/
- public IndexDBWriteTask(Storage storage, IndexManager indexMgr, Semaphore permits, int maxPermits, int cacheSize)
+ public IndexDBWriteTask(org.opends.server.backends.pluggable.spi.Importer importer, IndexManager indexMgr,
+ Semaphore permits, int maxPermits, int cacheSize)
{
- this.storage = storage;
+ this.importer = importer;
this.indexMgr = indexMgr;
this.permits = permits;
this.maxPermits = maxPermits;
@@ -1822,7 +1833,7 @@
}
/** Finishes this task. */
- private void endWriteTask(WriteableTransaction txn)
+ private void endWriteTask(org.opends.server.backends.pluggable.spi.Importer importer)
{
isRunning = false;
@@ -1839,8 +1850,9 @@
{
for (DNState dnState : dnStateMap.values())
{
- dnState.flush(txn);
+ dnState.finalFlush(importer);
}
+
if (!isCanceled)
{
logger.info(NOTE_JEB_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount());
@@ -1896,18 +1908,11 @@
@Override
public Void call() throws Exception
{
- storage.write(new WriteOperation()
- {
- @Override
- public void run(WriteableTransaction txn) throws Exception
- {
- call0(txn);
- }
- });
+ call0(importer);
return null;
}
- private void call0(WriteableTransaction txn) throws Exception
+ private void call0(org.opends.server.backends.pluggable.spi.Importer importer) throws Exception
{
if (isCanceled)
{
@@ -1936,7 +1941,7 @@
{
if (previousRecord != null)
{
- addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
+ addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
}
// this is a new record
@@ -1960,7 +1965,7 @@
if (previousRecord != null)
{
- addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
+ addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
}
}
}
@@ -1971,7 +1976,7 @@
}
finally
{
- endWriteTask(txn);
+ endWriteTask(importer);
}
}
@@ -1986,30 +1991,31 @@
return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit());
}
- private void addToDB(WriteableTransaction txn, int indexID, ImportIDSet insertSet, ImportIDSet deleteSet)
- throws DirectoryException
+ private void addToDB(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet insertSet,
+ ImportIDSet deleteSet) throws DirectoryException
{
keyCount.incrementAndGet();
if (indexMgr.isDN2ID())
{
- addDN2ID(txn, indexID, insertSet);
+ addDN2ID(importer, indexID, insertSet);
}
else
{
if (!deleteSet.isDefined() || deleteSet.size() > 0)
{
final Index index = indexIDToIndexMap.get(indexID);
- index.importRemove(txn, deleteSet);
+ index.importRemove(importer, deleteSet);
}
if (!insertSet.isDefined() || insertSet.size() > 0)
{
final Index index = indexIDToIndexMap.get(indexID);
- index.importPut(txn, insertSet);
+ index.importPut(importer, insertSet);
}
}
}
- private void addDN2ID(WriteableTransaction txn, int indexID, ImportIDSet idSet) throws DirectoryException
+ private void addDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet idSet)
+ throws DirectoryException
{
DNState dnState = dnStateMap.get(indexID);
if (dnState == null)
@@ -2017,9 +2023,9 @@
dnState = new DNState(indexIDToECMap.get(indexID));
dnStateMap.put(indexID, dnState);
}
- if (dnState.checkParent(txn, idSet))
+ if (dnState.checkParent(importer, idSet))
{
- dnState.writeToDN2ID(txn, idSet.getKey());
+ dnState.writeToDN2ID(importer, idSet.getKey());
}
}
@@ -2032,7 +2038,7 @@
* This class is used to by a index DB merge thread performing DN processing
* to keep track of the state of individual DN2ID index processing.
*/
- final class DNState
+ private final class DNState
{
private static final int DN_STATE_CACHE_SIZE = 64 * KB;
@@ -2043,8 +2049,9 @@
private ByteSequence parentDN;
private final ByteStringBuilder lastDN = new ByteStringBuilder();
private EntryID parentID, lastID, entryID;
+ private long totalNbEntries;
- DNState(EntryContainer entryContainer)
+ private DNState(EntryContainer entryContainer)
{
this.entryContainer = entryContainer;
dn2id = entryContainer.getDN2ID().getName();
@@ -2062,7 +2069,8 @@
}
/** Why do we still need this if we are checking parents in the first phase? */
- private boolean checkParent(ReadableTransaction txn, ImportIDSet idSet) throws StorageRuntimeException
+ boolean checkParent(org.opends.server.backends.pluggable.spi.Importer importer, ImportIDSet idSet)
+ throws StorageRuntimeException
{
entryID = idSet.iterator().next();
parentDN = getParent(idSet.getKey());
@@ -2072,7 +2080,7 @@
// If null is returned then this is a suffix DN.
if (parentDN != null)
{
- parentID = get(txn, dn2id, parentDN);
+ parentID = get(importer, dn2id, parentDN);
if (parentID == null)
{
// We have a missing parent. Maybe parent checking was turned off?
@@ -2145,43 +2153,53 @@
return importCfg != null && importCfg.appendToExistingData();
}
- EntryID get(ReadableTransaction txn, TreeName dn2id, ByteSequence dn) throws StorageRuntimeException
+ private EntryID get(org.opends.server.backends.pluggable.spi.Importer importer, TreeName dn2id, ByteSequence dn)
+ throws StorageRuntimeException
{
- ByteString value = txn.read(dn2id, dn);
+ ByteString value = importer.read(dn2id, dn);
return value != null ? new EntryID(value) : null;
}
- public void writeToDN2ID(WriteableTransaction txn, ByteSequence key) throws DirectoryException
+ void writeToDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, ByteSequence key)
+ throws DirectoryException
{
- txn.put(dn2id, key, entryID.toByteString());
+ importer.put(dn2id, key, entryID.toByteString());
indexMgr.addTotDNCount(1);
if (parentID != null)
{
- incrementChildrenCounter(txn);
+ incrementChildrenCounter(importer);
}
}
- private void incrementChildrenCounter(WriteableTransaction txn)
+ private void incrementChildrenCounter(org.opends.server.backends.pluggable.spi.Importer importer)
{
final AtomicLong counter = getId2childrenCounter();
counter.incrementAndGet();
if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE)
{
- flush(txn);
+ flush(importer);
}
}
- private void flush(WriteableTransaction txn)
+ private void flush(org.opends.server.backends.pluggable.spi.Importer importer)
{
for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet())
{
- entryContainer.getID2ChildrenCount()
- .addDelta(txn, childrenCounter.getKey(), childrenCounter.getValue().get());
+ final EntryID entryID = childrenCounter.getKey();
+ final long totalForEntryID = childrenCounter.getValue().get();
+ totalNbEntries += totalForEntryID;
+ entryContainer.getID2ChildrenCount().importPut(importer, entryID, totalForEntryID);
}
id2childrenCountTree.clear();
}
+ void finalFlush(org.opends.server.backends.pluggable.spi.Importer importer)
+ {
+ flush(importer);
+
+ entryContainer.getID2ChildrenCount().importPutTotalCount(importer, totalNbEntries);
+ }
}
}
@@ -2957,7 +2975,7 @@
indexKeyQueueMap.clear();
}
- private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
+ private void rebuildIndexesPhaseTwo() throws Exception
{
final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask());
try
--
Gitblit v1.10.0