From 5d07ec161328a94de355aa4bf93918a2da5a8602 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 30 Apr 2015 14:20:06 +0000
Subject: [PATCH] OPENDJ-1801 (CR-6815) Revise usage of storage.open() and startImport()
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java | 23 +++
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java | 21 +-
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | 122 ++++++++++-------
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java | 5
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java | 7
opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java | 166 +++++++++++++++--------
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java | 29 +++
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java | 45 ++++--
8 files changed, 274 insertions(+), 144 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
index ade7349..c1e9d80 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
@@ -46,6 +46,8 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -85,7 +87,6 @@
import com.persistit.Persistit;
import com.persistit.Transaction;
import com.persistit.Tree;
-import com.persistit.TreeBuilder;
import com.persistit.Value;
import com.persistit.Volume;
import com.persistit.VolumeSpecification;
@@ -247,26 +248,31 @@
/** PersistIt implementation of the {@link Importer} interface. */
private final class ImporterImpl implements Importer
{
- private final TreeBuilder importer = new TreeBuilder(db);
- private final Key importKey = new Key(db);
- private final Value importValue = new Value(db);
private final Map<TreeName, Tree> trees = new HashMap<TreeName, Tree>();
+ private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>();
+ private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>()
+ {
+ @Override
+ protected Map<TreeName, Exchange> initialValue()
+ {
+ final Map<TreeName, Exchange> value = new HashMap<>();
+ allExchanges.add(value);
+ return value;
+ }
+ };
@Override
public void close()
{
- try
+ for (Map<TreeName, Exchange> map : allExchanges)
{
- importer.merge();
+ for (Exchange exchange : map.values())
+ {
+ db.releaseExchange(exchange);
+ }
+ map.clear();
}
- catch (final Exception e)
- {
- throw new StorageRuntimeException(e);
- }
- finally
- {
- PersistItStorage.this.close();
- }
+ PersistItStorage.this.close();
}
@Override
@@ -288,16 +294,59 @@
{
try
{
- final Tree tree = trees.get(treeName);
- importer.store(tree,
- bytesToKey(importKey, key),
- bytesToValue(importValue, value));
+ final Exchange ex = getExchangeFromCache(treeName);
+ bytesToKey(ex.getKey(), key);
+ bytesToValue(ex.getValue(), value);
+ ex.store();
}
catch (final Exception e)
{
throw new StorageRuntimeException(e);
}
}
+
+ @Override
+ public boolean delete(final TreeName treeName, final ByteSequence key)
+ {
+ try
+ {
+ final Exchange ex = getExchangeFromCache(treeName);
+ bytesToKey(ex.getKey(), key);
+ return ex.remove();
+ }
+ catch (final PersistitException e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ }
+
+ @Override
+ public ByteString read(final TreeName treeName, final ByteSequence key)
+ {
+ try
+ {
+ final Exchange ex = getExchangeFromCache(treeName);
+ bytesToKey(ex.getKey(), key);
+ ex.fetch();
+ return valueToBytes(ex.getValue());
+ }
+ catch (final PersistitException e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ }
+
+ private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException
+ {
+ Map<TreeName, Exchange> threadExchanges = exchanges.get();
+ Exchange exchange = threadExchanges.get(treeName);
+ if (exchange == null)
+ {
+ exchange = getNewExchange(treeName, false);
+ threadExchanges.put(treeName, exchange);
+ }
+ return exchange;
+ }
}
/** PersistIt implementation of the {@link WriteableTransaction} interface. */
@@ -306,8 +355,7 @@
private final Map<TreeName, Exchange> exchanges = new HashMap<TreeName, Exchange>();
@Override
- public void put(final TreeName treeName, final ByteSequence key,
- final ByteSequence value)
+ public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value)
{
try
{
@@ -475,8 +523,7 @@
return b1.equals(b2);
}
- private Exchange getExchangeFromCache(final TreeName treeName)
- throws PersistitException
+ private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException
{
Exchange exchange = exchanges.get(treeName);
if (exchange == null)
@@ -495,34 +542,18 @@
}
exchanges.clear();
}
-
- private Exchange getNewExchange(final TreeName treeName, final boolean create)
- throws PersistitException
- {
- return db.getExchange(volume, mangleTreeName(treeName), create);
- }
}
- private static void clearAndCreateDbDir(final File dbDir)
+ private Exchange getNewExchange(final TreeName treeName, final boolean create) throws PersistitException
{
- if (dbDir.exists())
- {
- for (final File child : dbDir.listFiles())
- {
- child.delete();
- }
- }
- else
- {
- dbDir.mkdirs();
- }
+ return db.getExchange(volume, mangleTreeName(treeName), create);
}
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+ private final ServerContext serverContext;
private File backendDirectory;
private Persistit db;
private Volume volume;
- private Configuration dbCfg;
private PersistitBackendCfg config;
private DiskSpaceMonitor diskMonitor;
private MemoryQuota memQuota;
@@ -540,30 +571,36 @@
// FIXME: should be package private once importer is decoupled.
public PersistItStorage(final PersistitBackendCfg cfg, ServerContext serverContext) throws ConfigException
{
+ this.serverContext = serverContext;
backendDirectory = new File(getFileForPath(cfg.getDBDirectory()), cfg.getBackendId());
config = cfg;
- dbCfg = new Configuration();
+ cfg.addPersistitChangeListener(this);
+ }
+
+ private Configuration buildConfiguration()
+ {
+ final Configuration dbCfg = new Configuration();
dbCfg.setLogFile(new File(backendDirectory, VOLUME_NAME + ".log").getPath());
dbCfg.setJournalPath(new File(backendDirectory, VOLUME_NAME + "_journal").getPath());
dbCfg.setVolumeList(asList(new VolumeSpecification(new File(backendDirectory, VOLUME_NAME).getPath(), null,
BUFFER_SIZE, 4096, Long.MAX_VALUE / BUFFER_SIZE, 2048, true, false, false)));
- final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg();
+ final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(dbCfg);
bufferPoolCfg.setMaximumCount(Integer.MAX_VALUE);
diskMonitor = serverContext.getDiskSpaceMonitor();
memQuota = serverContext.getMemoryQuota();
- if (cfg.getDBCacheSize() > 0)
+ if (config.getDBCacheSize() > 0)
{
- bufferPoolCfg.setMaximumMemory(cfg.getDBCacheSize());
- memQuota.acquireMemory(cfg.getDBCacheSize());
+ bufferPoolCfg.setMaximumMemory(config.getDBCacheSize());
+ memQuota.acquireMemory(config.getDBCacheSize());
}
else
{
- bufferPoolCfg.setFraction(cfg.getDBCachePercent() / 100.0f);
- memQuota.acquireMemory(memQuota.memPercentToBytes(cfg.getDBCachePercent()));
+ bufferPoolCfg.setFraction(config.getDBCachePercent() / 100.0f);
+ memQuota.acquireMemory(memQuota.memPercentToBytes(config.getDBCachePercent()));
}
- dbCfg.setCommitPolicy(cfg.isDBTxnNoSync() ? SOFT : GROUP);
- cfg.addPersistitChangeListener(this);
+ dbCfg.setCommitPolicy(config.isDBTxnNoSync() ? SOFT : GROUP);
+ return dbCfg;
}
/** {@inheritDoc} */
@@ -594,21 +631,31 @@
diskMonitor.deregisterMonitoredDirectory(getDirectory(), this);
}
- private BufferPoolConfiguration getBufferPoolCfg()
+ private static BufferPoolConfiguration getBufferPoolCfg(Configuration dbCfg)
{
return dbCfg.getBufferPoolMap().get(BUFFER_SIZE);
}
/** {@inheritDoc} */
@Override
- public void open() throws Exception
+ public void open() throws ConfigException, StorageRuntimeException
+ {
+ open0(buildConfiguration());
+ }
+
+ private void open0(final Configuration dbCfg) throws ConfigException
{
setupStorageFiles();
try
{
+ if (db != null)
+ {
+ throw new IllegalStateException(
+ "Database is already open, either the backend is enabled or an import is currently running.");
+ }
db = new Persistit(dbCfg);
- final long bufferCount = getBufferPoolCfg().computeBufferCount(db.getAvailableHeap());
+ final long bufferCount = getBufferPoolCfg(dbCfg).computeBufferCount(db.getAvailableHeap());
final long totalSize = bufferCount * BUFFER_SIZE / 1024;
logger.info(NOTE_PERSISTIT_MEMORY_CFG, config.getBackendId(),
bufferCount, BUFFER_SIZE, totalSize);
@@ -676,14 +723,13 @@
/** {@inheritDoc} */
@Override
- public Importer startImport() throws Exception
+ public Importer startImport() throws ConfigException, StorageRuntimeException
{
- clearAndCreateDbDir(backendDirectory);
- open();
+ open0(buildConfiguration());
return new ImporterImpl();
}
- private String mangleTreeName(final TreeName treeName)
+ private static String mangleTreeName(final TreeName treeName)
{
StringBuilder mangled = new StringBuilder();
String name = treeName.toString();
@@ -879,19 +925,19 @@
* TODO: it would be nice to use the low-level key/value APIs. They seem quite
* inefficient at the moment for simple byte arrays.
*/
- private Key bytesToKey(final Key key, final ByteSequence bytes)
+ private static Key bytesToKey(final Key key, final ByteSequence bytes)
{
final byte[] tmp = bytes.toByteArray();
return key.clear().appendByteArray(tmp, 0, tmp.length);
}
- private Value bytesToValue(final Value value, final ByteSequence bytes)
+ private static Value bytesToValue(final Value value, final ByteSequence bytes)
{
value.clear().putByteArray(bytes.toByteArray());
return value;
}
- private ByteString valueToBytes(final Value value)
+ private static ByteString valueToBytes(final Value value)
{
if (value.isDefined())
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
index 5ff09d8..e332072 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
@@ -42,6 +42,7 @@
import org.opends.server.backends.pluggable.EntryIDSet.EntryIDSetCodec;
import org.opends.server.backends.pluggable.State.IndexFlag;
import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
import org.opends.server.backends.pluggable.spi.TreeName;
@@ -124,32 +125,32 @@
}
@Override
- public final void importPut(WriteableTransaction txn, ImportIDSet idsToBeAdded) throws StorageRuntimeException
+ public final void importPut(Importer importer, ImportIDSet idsToBeAdded) throws StorageRuntimeException
{
- Reject.ifNull(txn, "txn must not be null");
+ Reject.ifNull(importer, "importer must not be null");
Reject.ifNull(idsToBeAdded, "idsToBeAdded must not be null");
ByteSequence key = idsToBeAdded.getKey();
- ByteString value = txn.read(getName(), key);
+ ByteString value = importer.read(getName(), key);
if (value != null)
{
final EntryIDSet entryIDSet = codec.decode(key, value);
final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit);
importIDSet.merge(idsToBeAdded);
- txn.put(getName(), key, importIDSet.valueToByteString(codec));
+ importer.put(getName(), key, importIDSet.valueToByteString(codec));
}
else
{
- txn.put(getName(), key, idsToBeAdded.valueToByteString(codec));
+ importer.put(getName(), key, idsToBeAdded.valueToByteString(codec));
}
}
@Override
- public final void importRemove(WriteableTransaction txn, ImportIDSet idsToBeRemoved) throws StorageRuntimeException
+ public final void importRemove(Importer importer, ImportIDSet idsToBeRemoved) throws StorageRuntimeException
{
- Reject.ifNull(txn, "txn must not be null");
+ Reject.ifNull(importer, "importer must not be null");
Reject.ifNull(idsToBeRemoved, "idsToBeRemoved must not be null");
ByteSequence key = idsToBeRemoved.getKey();
- ByteString value = txn.read(getName(), key);
+ ByteString value = importer.read(getName(), key);
if (value == null)
{
// Should never happen -- the keys should always be there.
@@ -161,11 +162,11 @@
importIDSet.remove(idsToBeRemoved);
if (importIDSet.isDefined() && importIDSet.size() == 0)
{
- txn.delete(getName(), key);
+ importer.delete(getName(), key);
}
else
{
- txn.put(getName(), key, importIDSet.valueToByteString(codec));
+ importer.put(getName(), key, importIDSet.valueToByteString(codec));
}
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java
index 5259156..e922f95 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java
@@ -31,6 +31,7 @@
import org.forgerock.util.promise.Function;
import org.forgerock.util.promise.NeverThrowsException;
import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.TreeName;
import org.opends.server.backends.pluggable.spi.UpdateFunction;
@@ -90,8 +91,7 @@
private void addToCounter(WriteableTransaction txn, EntryID entryID, final long delta)
{
- final long bucket = (Thread.currentThread().getId() & (SHARD_COUNT - 1));
- final ByteSequence shardedKey = getKeyFromEntryIDAndBucket(entryID, bucket);
+ final ByteSequence shardedKey = getShardedKey(entryID);
txn.update(getName(), shardedKey, new UpdateFunction()
{
@Override
@@ -103,6 +103,30 @@
});
}
+ void importPut(Importer importer, EntryID entryID, long total)
+ {
+ Reject.ifTrue(entryID.longValue() >= TOTAL_COUNT_ENTRY_ID.longValue(), "EntryID overflow.");
+ importPut0(importer, entryID, total);
+ }
+
+ void importPutTotalCount(Importer importer, long total)
+ {
+ importPut0(importer, TOTAL_COUNT_ENTRY_ID, total);
+ }
+
+ private void importPut0(Importer importer, EntryID entryID, final long delta)
+ {
+ Reject.ifNull(importer, "importer must not be null");
+ final ByteSequence shardedKey = getShardedKey(entryID);
+ importer.put(getName(), shardedKey, ByteString.valueOf(delta));
+ }
+
+ private ByteSequence getShardedKey(EntryID entryID)
+ {
+ final long bucket = (Thread.currentThread().getId() & (SHARD_COUNT - 1));
+ return getKeyFromEntryIDAndBucket(entryID, bucket);
+ }
+
/**
* Get the counter value for the specified key
* @param txn The database transaction
@@ -168,5 +192,4 @@
return counterValue;
}
-
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 90271fc..143d7d5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -1061,7 +1061,7 @@
}
}
- private void importPhaseTwo() throws InterruptedException, ExecutionException
+ private void importPhaseTwo() throws Exception
{
ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
@@ -1079,7 +1079,7 @@
* Performs on-disk merge by reading several scratch files at once
* and write their ordered content into the target indexes.
*/
- private void processIndexFiles() throws InterruptedException, ExecutionException
+ private void processIndexFiles() throws Exception
{
if (bufferCount.get() == 0)
{
@@ -1147,20 +1147,30 @@
Semaphore permits = new Semaphore(buffers);
// Start DN processing first.
- List<Future<Void>> futures = new LinkedList<>();
- submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
- submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
- getAll(futures);
+ Storage storage = rootContainer.getStorage();
+ storage.close();
+ try (final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport())
+ {
+ List<Future<Void>> futures = new LinkedList<>();
+ submitIndexDBWriteTasks(DNIndexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
+ submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
+ getAll(futures);
+ }
+ finally
+ {
+ storage.open();
+ }
+
shutdownAll(dbService);
}
- private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
- Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
+ private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs,
+ org.opends.server.backends.pluggable.spi.Importer importer,
+ ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
{
for (IndexManager indexMgr : indexMgrs)
{
- futures.add(dbService.submit(
- new IndexDBWriteTask(rootContainer.getStorage(), indexMgr, permits, buffers, readAheadSize)));
+ futures.add(dbService.submit(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize)));
}
}
@@ -1706,7 +1716,7 @@
*/
private final class IndexDBWriteTask implements Callable<Void>
{
- private final Storage storage;
+ private final org.opends.server.backends.pluggable.spi.Importer importer;
private final IndexManager indexMgr;
private final int cacheSize;
/** indexID => DNState map */
@@ -1728,10 +1738,10 @@
/**
* Creates a new index DB writer.
*
+ * @param importer
+ * The importer
* @param indexMgr
* The index manager.
- * @param storage
- * Where to store data
* @param permits
* The semaphore used for restricting the number of buffer allocations.
* @param maxPermits
@@ -1739,9 +1749,10 @@
* @param cacheSize
* The buffer cache size.
*/
- public IndexDBWriteTask(Storage storage, IndexManager indexMgr, Semaphore permits, int maxPermits, int cacheSize)
+ public IndexDBWriteTask(org.opends.server.backends.pluggable.spi.Importer importer, IndexManager indexMgr,
+ Semaphore permits, int maxPermits, int cacheSize)
{
- this.storage = storage;
+ this.importer = importer;
this.indexMgr = indexMgr;
this.permits = permits;
this.maxPermits = maxPermits;
@@ -1822,7 +1833,7 @@
}
/** Finishes this task. */
- private void endWriteTask(WriteableTransaction txn)
+ private void endWriteTask(org.opends.server.backends.pluggable.spi.Importer importer)
{
isRunning = false;
@@ -1839,8 +1850,9 @@
{
for (DNState dnState : dnStateMap.values())
{
- dnState.flush(txn);
+ dnState.finalFlush(importer);
}
+
if (!isCanceled)
{
logger.info(NOTE_JEB_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount());
@@ -1896,18 +1908,11 @@
@Override
public Void call() throws Exception
{
- storage.write(new WriteOperation()
- {
- @Override
- public void run(WriteableTransaction txn) throws Exception
- {
- call0(txn);
- }
- });
+ call0(importer);
return null;
}
- private void call0(WriteableTransaction txn) throws Exception
+ private void call0(org.opends.server.backends.pluggable.spi.Importer importer) throws Exception
{
if (isCanceled)
{
@@ -1936,7 +1941,7 @@
{
if (previousRecord != null)
{
- addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
+ addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
}
// this is a new record
@@ -1960,7 +1965,7 @@
if (previousRecord != null)
{
- addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
+ addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
}
}
}
@@ -1971,7 +1976,7 @@
}
finally
{
- endWriteTask(txn);
+ endWriteTask(importer);
}
}
@@ -1986,30 +1991,31 @@
return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit());
}
- private void addToDB(WriteableTransaction txn, int indexID, ImportIDSet insertSet, ImportIDSet deleteSet)
- throws DirectoryException
+ private void addToDB(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet insertSet,
+ ImportIDSet deleteSet) throws DirectoryException
{
keyCount.incrementAndGet();
if (indexMgr.isDN2ID())
{
- addDN2ID(txn, indexID, insertSet);
+ addDN2ID(importer, indexID, insertSet);
}
else
{
if (!deleteSet.isDefined() || deleteSet.size() > 0)
{
final Index index = indexIDToIndexMap.get(indexID);
- index.importRemove(txn, deleteSet);
+ index.importRemove(importer, deleteSet);
}
if (!insertSet.isDefined() || insertSet.size() > 0)
{
final Index index = indexIDToIndexMap.get(indexID);
- index.importPut(txn, insertSet);
+ index.importPut(importer, insertSet);
}
}
}
- private void addDN2ID(WriteableTransaction txn, int indexID, ImportIDSet idSet) throws DirectoryException
+ private void addDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet idSet)
+ throws DirectoryException
{
DNState dnState = dnStateMap.get(indexID);
if (dnState == null)
@@ -2017,9 +2023,9 @@
dnState = new DNState(indexIDToECMap.get(indexID));
dnStateMap.put(indexID, dnState);
}
- if (dnState.checkParent(txn, idSet))
+ if (dnState.checkParent(importer, idSet))
{
- dnState.writeToDN2ID(txn, idSet.getKey());
+ dnState.writeToDN2ID(importer, idSet.getKey());
}
}
@@ -2032,7 +2038,7 @@
* This class is used to by a index DB merge thread performing DN processing
* to keep track of the state of individual DN2ID index processing.
*/
- final class DNState
+ private final class DNState
{
private static final int DN_STATE_CACHE_SIZE = 64 * KB;
@@ -2043,8 +2049,9 @@
private ByteSequence parentDN;
private final ByteStringBuilder lastDN = new ByteStringBuilder();
private EntryID parentID, lastID, entryID;
+ private long totalNbEntries;
- DNState(EntryContainer entryContainer)
+ private DNState(EntryContainer entryContainer)
{
this.entryContainer = entryContainer;
dn2id = entryContainer.getDN2ID().getName();
@@ -2062,7 +2069,8 @@
}
/** Why do we still need this if we are checking parents in the first phase? */
- private boolean checkParent(ReadableTransaction txn, ImportIDSet idSet) throws StorageRuntimeException
+ boolean checkParent(org.opends.server.backends.pluggable.spi.Importer importer, ImportIDSet idSet)
+ throws StorageRuntimeException
{
entryID = idSet.iterator().next();
parentDN = getParent(idSet.getKey());
@@ -2072,7 +2080,7 @@
// If null is returned then this is a suffix DN.
if (parentDN != null)
{
- parentID = get(txn, dn2id, parentDN);
+ parentID = get(importer, dn2id, parentDN);
if (parentID == null)
{
// We have a missing parent. Maybe parent checking was turned off?
@@ -2145,43 +2153,53 @@
return importCfg != null && importCfg.appendToExistingData();
}
- EntryID get(ReadableTransaction txn, TreeName dn2id, ByteSequence dn) throws StorageRuntimeException
+ private EntryID get(org.opends.server.backends.pluggable.spi.Importer importer, TreeName dn2id, ByteSequence dn)
+ throws StorageRuntimeException
{
- ByteString value = txn.read(dn2id, dn);
+ ByteString value = importer.read(dn2id, dn);
return value != null ? new EntryID(value) : null;
}
- public void writeToDN2ID(WriteableTransaction txn, ByteSequence key) throws DirectoryException
+ void writeToDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, ByteSequence key)
+ throws DirectoryException
{
- txn.put(dn2id, key, entryID.toByteString());
+ importer.put(dn2id, key, entryID.toByteString());
indexMgr.addTotDNCount(1);
if (parentID != null)
{
- incrementChildrenCounter(txn);
+ incrementChildrenCounter(importer);
}
}
- private void incrementChildrenCounter(WriteableTransaction txn)
+ private void incrementChildrenCounter(org.opends.server.backends.pluggable.spi.Importer importer)
{
final AtomicLong counter = getId2childrenCounter();
counter.incrementAndGet();
if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE)
{
- flush(txn);
+ flush(importer);
}
}
- private void flush(WriteableTransaction txn)
+ private void flush(org.opends.server.backends.pluggable.spi.Importer importer)
{
for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet())
{
- entryContainer.getID2ChildrenCount()
- .addDelta(txn, childrenCounter.getKey(), childrenCounter.getValue().get());
+ final EntryID entryID = childrenCounter.getKey();
+ final long totalForEntryID = childrenCounter.getValue().get();
+ totalNbEntries += totalForEntryID;
+ entryContainer.getID2ChildrenCount().importPut(importer, entryID, totalForEntryID);
}
id2childrenCountTree.clear();
}
+ void finalFlush(org.opends.server.backends.pluggable.spi.Importer importer)
+ {
+ flush(importer);
+
+ entryContainer.getID2ChildrenCount().importPutTotalCount(importer, totalNbEntries);
+ }
}
}
@@ -2957,7 +2975,7 @@
indexKeyQueueMap.clear();
}
- private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
+ private void rebuildIndexesPhaseTwo() throws Exception
{
final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask());
try
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java
index ddccb6e..2aa01f2 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java
@@ -29,6 +29,7 @@
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
@@ -44,10 +45,10 @@
int getIndexEntryLimit();
// Ignores trusted state.
- void importPut(WriteableTransaction txn, ImportIDSet idsToBeAdded);
+ void importPut(Importer importer, ImportIDSet idsToBeAdded);
// Ignores trusted state.
- void importRemove(WriteableTransaction txn, ImportIDSet idsToBeRemoved);
+ void importRemove(Importer importer, ImportIDSet idsToBeRemoved);
boolean isTrusted();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
index ee6bc45..a820646 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
@@ -26,6 +26,7 @@
package org.opends.server.backends.pluggable;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.backends.pluggable.spi.Cursor;
@@ -44,15 +45,11 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.RestoreConfig;
-/**
- * Decorates a {@link Storage} with additional trace logging.
- */
+/** Decorates a {@link Storage} with additional trace logging. */
@SuppressWarnings("javadoc")
final class TracedStorage implements Storage
{
- /**
- * Decorates an {@link Importer} with additional trace logging.
- */
+ /** Decorates an {@link Importer} with additional trace logging. */
private final class TracedImporter implements Importer
{
private final Importer importer;
@@ -63,14 +60,6 @@
}
@Override
- public void close()
- {
- importer.close();
- logger.trace("Storage@%s.Importer@%s.close(%s)",
- storageId(), id(), backendId);
- }
-
- @Override
public void createTree(final TreeName name)
{
importer.createTree(name);
@@ -86,6 +75,32 @@
storageId(), id(), backendId, name, hex(key), hex(value));
}
+ @Override
+ public ByteString read(TreeName name, ByteSequence key)
+ {
+ final ByteString value = importer.read(name, key);
+ logger.trace("Storage@%s.Importer@%s.read(%s, %s, %s) = %s",
+ storageId(), id(), backendId, name, hex(key), hex(value));
+ return value;
+ }
+
+ @Override
+ public boolean delete(TreeName name, ByteSequence key)
+ {
+ final boolean delete = importer.delete(name, key);
+ logger.trace("Storage@%s.Importer@%s.delete(%s, %s, %s) = %b",
+ storageId(), id(), backendId, name, hex(key), delete);
+ return delete;
+ }
+
+ @Override
+ public void close()
+ {
+ importer.close();
+ logger.trace("Storage@%s.Importer@%s.close(%s)",
+ storageId(), id(), backendId);
+ }
+
private int id()
{
return System.identityHashCode(this);
@@ -294,7 +309,7 @@
}
@Override
- public Importer startImport() throws Exception
+ public Importer startImport() throws ConfigException, StorageRuntimeException
{
final Importer importer = storage.startImport();
if (logger.isTraceEnabled())
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java
index a31a73a..95e72fc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java
@@ -28,6 +28,7 @@
import java.io.Closeable;
import org.forgerock.opendj.ldap.ByteSequence;
+import org.forgerock.opendj.ldap.ByteString;
/**
* Allows to run an import. For performance reasons, imports are run without transactions.
@@ -54,6 +55,28 @@
*/
void put(TreeName treeName, ByteSequence key, ByteSequence value);
+ /**
+ * Deletes the record with the provided key, in the tree whose name is provided.
+ *
+ * @param treeName
+ * the tree name
+ * @param key
+ * the key of the record to delete
+ * @return {@code true} if the record could be deleted, {@code false} otherwise
+ */
+ boolean delete(TreeName treeName, ByteSequence key);
+
+ /**
+ * Reads the record's value associated to the provided key, in the tree whose name is provided.
+ *
+ * @param treeName
+ * the tree name
+ * @param key
+ * the record's key
+ * @return the record's value, or {@code null} if none exists
+ */
+ ByteString read(TreeName treeName, ByteSequence key);
+
/** {@inheritDoc} */
@Override
void close();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
index 3bf0247..b485282 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
@@ -27,6 +27,7 @@
import java.io.Closeable;
+import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.BackupDirectory;
import org.opends.server.types.DirectoryException;
@@ -42,11 +43,13 @@
* Starts the import operation.
*
* @return a new Importer object which must be closed to release all resources
- * @throws Exception
+ * @throws ConfigException
+ * if there is a problem with the configuration
+ * @throws StorageRuntimeException
* if a problem occurs with the underlying storage engine
* @see #close() to release all resources once import is finished
*/
- Importer startImport() throws Exception;
+ Importer startImport() throws ConfigException, StorageRuntimeException;
/**
* Opens the storage engine to allow executing operations on it.
--
Gitblit v1.10.0