From 9f0904fda87bfcf921deeccdbaeafe834fbad696 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Fri, 24 Apr 2015 14:30:47 +0000
Subject: [PATCH] OPENDJ-1725: Persistit: very long recovery and many discarded txns after addrate test
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | 178 +++++++++++++++--------------------------------------------
1 files changed, 45 insertions(+), 133 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index e3f0e02..ef2f193 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -90,7 +90,6 @@
import org.forgerock.opendj.ldap.ByteSequenceReader;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
-import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.spi.IndexingOptions;
import org.forgerock.util.Utils;
import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
@@ -250,7 +249,6 @@
/** Used to shutdown import if an error occurs in phase one. */
private volatile boolean isCanceled;
- private volatile boolean isPhaseOneDone;
/** Number of phase one buffers. */
private int phaseOneBufferCount;
@@ -287,7 +285,7 @@
this.serverContext = serverContext;
this.tmpEnv = null;
this.threadCount = 1;
- this.rebuildManager = new RebuildIndexManager(rebuildConfig, cfg);
+ this.rebuildManager = new RebuildIndexManager(rootContainer.getStorage(), rebuildConfig, cfg);
this.indexCount = rebuildManager.getIndexCount();
this.clearedBackend = false;
this.scratchFileWriterList =
@@ -386,7 +384,7 @@
*/
}
- private File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
+ private static File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
{
File parentDir;
if (tmpDirectory != null)
@@ -400,8 +398,7 @@
return new File(parentDir, backendCfg.getBackendId());
}
- private int getTotalIndexCount(PluggableBackendCfg backendCfg)
- throws ConfigException
+ private static int getTotalIndexCount(PluggableBackendCfg backendCfg) throws ConfigException
{
int indexes = 2; // dn2id, dn2uri
for (String indexName : backendCfg.listBackendIndexes())
@@ -770,14 +767,14 @@
}
}
- private void clearSuffix(EntryContainer entryContainer)
+ private static void clearSuffix(EntryContainer entryContainer)
{
entryContainer.lock();
entryContainer.clear();
entryContainer.unlock();
}
- private boolean isAnyNotEqualAndAncestorOf(List<DN> dns, DN childDN)
+ private static boolean isAnyNotEqualAndAncestorOf(List<DN> dns, DN childDN)
{
for (DN dn : dns)
{
@@ -789,7 +786,7 @@
return true;
}
- private boolean isAnyAncestorOf(List<DN> dns, DN childDN)
+ private static boolean isAnyAncestorOf(List<DN> dns, DN childDN)
{
for (DN dn : dns)
{
@@ -920,7 +917,6 @@
final long startTime = System.currentTimeMillis();
importPhaseOne();
- isPhaseOneDone = true;
final long phaseOneFinishTime = System.currentTimeMillis();
if (!skipDNValidation)
@@ -1090,12 +1086,12 @@
indexKeyQueueMap.clear();
}
- private void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
+ private static void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
{
timerService.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
}
- private void shutdownAll(ExecutorService... executorServices) throws InterruptedException
+ private static void shutdownAll(ExecutorService... executorServices) throws InterruptedException
{
for (ExecutorService executorService : executorServices)
{
@@ -1107,7 +1103,7 @@
}
}
- private void clearAll(Collection<?>... cols)
+ private static void clearAll(Collection<?>... cols)
{
for (Collection<?> col : cols)
{
@@ -1118,7 +1114,7 @@
private void importPhaseTwo() throws InterruptedException, ExecutionException
{
ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
- scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
+ scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
try
{
processIndexFiles();
@@ -1218,7 +1214,7 @@
}
}
- private <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException
+ private static <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException
{
for (Future<?> result : futures)
{
@@ -1430,8 +1426,8 @@
}
}
- void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix)
- throws DirectoryException, StorageRuntimeException, InterruptedException
+ void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix) throws DirectoryException,
+ StorageRuntimeException, InterruptedException
{
DN entryDN = entry.getName();
DN2ID dn2id = suffix.getDN2ID();
@@ -2031,11 +2027,11 @@
{
if (indexMgr.isDN2ID())
{
- return new ImportIDSet(record.getKey(), newDefinedSet(), 1, false);
+ return new ImportIDSet(record.getKey(), newDefinedSet(), 1);
}
final Index index = indexIDToIndexMap.get(record.getIndexID());
- return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit(), index.getMaintainCount());
+ return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit());
}
private void addToDB(WriteableTransaction txn, int indexID, ImportIDSet insertSet, ImportIDSet deleteSet)
@@ -2071,7 +2067,7 @@
}
if (dnState.checkParent(txn, idSet))
{
- dnState.writeToDN2ID(txn, idSet);
+ dnState.writeToDN2ID(txn, idSet.getKey());
}
}
@@ -2091,10 +2087,7 @@
private final EntryContainer entryContainer;
private final TreeName dn2id;
private final TreeMap<ByteString, EntryID> parentIDMap = new TreeMap<ByteString, EntryID>();
- private final Map<ByteString, ImportIDSet> id2childTree = new TreeMap<ByteString, ImportIDSet>();
- private final Map<ByteString, ImportIDSet> id2subtreeTree = new TreeMap<ByteString, ImportIDSet>();
- private final int childLimit, subTreeLimit;
- private final boolean childDoCount, subTreeDoCount;
+ private final Map<EntryID, AtomicLong> id2childrenCountTree = new TreeMap<EntryID, AtomicLong>();
private ByteSequence parentDN;
private final ByteStringBuilder lastDN = new ByteStringBuilder();
private EntryID parentID, lastID, entryID;
@@ -2103,12 +2096,6 @@
{
this.entryContainer = entryContainer;
dn2id = entryContainer.getDN2ID().getName();
- final Index id2c = entryContainer.getID2Children();
- childLimit = id2c.getIndexEntryLimit();
- childDoCount = id2c.getMaintainCount();
- final Index id2s = entryContainer.getID2Subtree();
- subTreeLimit = id2s.getIndexEntryLimit();
- subTreeDoCount = id2s.getMaintainCount();
}
private ByteSequence getParent(ByteSequence dn)
@@ -2186,62 +2173,15 @@
return true;
}
- private void id2child(WriteableTransaction txn, EntryID childID) throws DirectoryException
+ private AtomicLong getId2childrenCounter()
{
- if (parentID == null)
+ AtomicLong counter = id2childrenCountTree.get(parentID);
+ if (counter == null)
{
- throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_PARENT_ENTRY_IS_MISSING.get());
+ counter = new AtomicLong();
+ id2childrenCountTree.put(parentID, counter);
}
-
- getId2childtreeImportIDSet().addEntryID(childID);
- if (id2childTree.size() > DN_STATE_CACHE_SIZE)
- {
- flushToDB(txn, id2childTree.values(), entryContainer.getID2Children(), true);
- }
- }
-
- private ImportIDSet getId2childtreeImportIDSet()
- {
- final ByteString parentIDBytes = parentID.toByteString();
- ImportIDSet idSet = id2childTree.get(parentIDBytes);
- if (idSet == null)
- {
- idSet = new ImportIDSet(parentIDBytes, newDefinedSet(), childLimit, childDoCount);
- id2childTree.put(parentIDBytes, idSet);
- }
- return idSet;
- }
-
- private void id2SubTree(WriteableTransaction txn, EntryID childID) throws DirectoryException
- {
- if (parentID == null)
- {
- throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_PARENT_ENTRY_IS_MISSING.get());
- }
-
- getId2subtreeImportIDSet(parentID).addEntryID(childID);
- // TODO:
- // Instead of doing this,
- // we can just walk to parent cache if available
- for (ByteSequence dn = getParent(parentDN); dn != null; dn = getParent(dn))
- {
- EntryID nodeID = getParentID(txn, dn);
- if (nodeID != null)
- {
- getId2subtreeImportIDSet(nodeID).addEntryID(childID);
- }
- // else we have a missing parent. Maybe parent checking was turned off?
- // Just ignore.
- }
- if (id2subtreeTree.size() > DN_STATE_CACHE_SIZE)
- {
- flushToDB(txn, id2subtreeTree.values(), entryContainer.getID2Subtree(), true);
- }
- }
-
- private EntryID getParentID(ReadableTransaction txn, ByteSequence dn) throws StorageRuntimeException
- {
- return bypassCacheForAppendMode() ? get(txn, dn2id, dn) : parentIDMap.get(dn);
+ return counter;
}
/**
@@ -2259,45 +2199,34 @@
return value != null ? new EntryID(value) : null;
}
- private ImportIDSet getId2subtreeImportIDSet(EntryID entryID)
+ public void writeToDN2ID(WriteableTransaction txn, ByteSequence key) throws DirectoryException
{
- ByteString entryIDBytes = entryID.toByteString();
- ImportIDSet idSet = id2subtreeTree.get(entryIDBytes);
- if (idSet == null)
- {
- idSet = new ImportIDSet(entryIDBytes, newDefinedSet(), subTreeLimit, subTreeDoCount);
- id2subtreeTree.put(entryIDBytes, idSet);
- }
- return idSet;
- }
-
- public void writeToDN2ID(WriteableTransaction txn, ImportIDSet idSet) throws DirectoryException
- {
- txn.put(dn2id, idSet.getKey(), entryID.toByteString());
+ txn.put(dn2id, key, entryID.toByteString());
indexMgr.addTotDNCount(1);
- if (parentDN != null)
+ if (parentID != null)
{
- id2child(txn, entryID);
- id2SubTree(txn, entryID);
+ incrementChildrenCounter(txn);
}
}
- public void flush(WriteableTransaction txn)
+ private void incrementChildrenCounter(WriteableTransaction txn)
{
- flushToDB(txn, id2childTree.values(), entryContainer.getID2Children(), false);
- flushToDB(txn, id2subtreeTree.values(), entryContainer.getID2Subtree(), false);
+ final AtomicLong counter = getId2childrenCounter();
+ counter.incrementAndGet();
+ if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE)
+ {
+ flush(txn);
+ }
}
- private void flushToDB(WriteableTransaction txn, Collection<ImportIDSet> idSets, Index index, boolean clearIDSets)
+ private void flush(WriteableTransaction txn)
{
- for (ImportIDSet idSet : idSets)
+ for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet())
{
- index.importPut(txn, idSet);
+ entryContainer.getID2ChildrenCount()
+ .addDelta(txn, childrenCounter.getKey(), childrenCounter.getValue().get());
}
- if (clearIDSets)
- {
- idSets.clear();
- }
+ id2childrenCountTree.clear();
}
}
}
@@ -2800,9 +2729,9 @@
* @param cfg
* The local DB configuration to use.
*/
- public RebuildIndexManager(RebuildConfig rebuildConfig, PluggableBackendCfg cfg)
+ public RebuildIndexManager(Storage storage, RebuildConfig rebuildConfig, PluggableBackendCfg cfg)
{
- super(null);
+ super(storage);
this.rebuildConfig = rebuildConfig;
this.cfg = cfg;
}
@@ -2945,9 +2874,7 @@
rebuildIndexMap(txn, false);
// falls through
case DEGRADED:
- if (mode == RebuildMode.ALL
- || !entryContainer.getID2Children().isTrusted()
- || !entryContainer.getID2Subtree().isTrusted())
+ if (mode == RebuildMode.ALL)
{
dn2id = entryContainer.getDN2ID();
}
@@ -3032,15 +2959,8 @@
{
// dn2uri does not have a trusted status.
entryContainer.clearDatabase(txn, entryContainer.getDN2URI());
- }
-
- if (!onlyDegraded
- || !entryContainer.getID2Children().isTrusted()
- || !entryContainer.getID2Subtree().isTrusted())
- {
entryContainer.clearDatabase(txn, entryContainer.getDN2ID());
- entryContainer.clearDatabase(txn, entryContainer.getID2Children());
- entryContainer.clearDatabase(txn, entryContainer.getID2Subtree());
+ entryContainer.clearDatabase(txn, entryContainer.getID2ChildrenCount());
}
for (Map.Entry<IndexKey, MatchingRuleIndex> mapEntry : indexMap.entrySet())
@@ -3065,12 +2985,6 @@
{
try
{
- if (dn2id != null)
- {
- EntryContainer ec = suffix.getEntryContainer();
- ec.getID2Children().setTrusted(txn, trusted);
- ec.getID2Subtree().setTrusted(txn, trusted);
- }
setTrusted(txn, indexMap.values(), trusted);
for (VLVIndex vlvIndex : vlvIndexes)
{
@@ -3123,7 +3037,7 @@
private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
{
- final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
+ final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask());
try
{
processIndexFiles();
@@ -3482,7 +3396,6 @@
{
/** The time in milliseconds of the previous progress report. */
private long previousTime;
- private long latestCount;
/**
* Create a new import progress task.
@@ -3490,10 +3403,9 @@
* @param latestCount
* The latest count of entries processed in phase one.
*/
- public SecondPhaseProgressTask(long latestCount)
+ public SecondPhaseProgressTask()
{
previousTime = System.currentTimeMillis();
- this.latestCount = latestCount;
}
/** The action to be performed by this timer task. */
--
Gitblit v1.10.0