From 9f0904fda87bfcf921deeccdbaeafe834fbad696 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Fri, 24 Apr 2015 14:30:47 +0000
Subject: [PATCH] OPENDJ-1725: Persistit: very long recovery and many discarded txns after addrate test

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java |  178 +++++++++++++++--------------------------------------------
 1 files changed, 45 insertions(+), 133 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index e3f0e02..ef2f193 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -90,7 +90,6 @@
 import org.forgerock.opendj.ldap.ByteSequenceReader;
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.ByteStringBuilder;
-import org.forgerock.opendj.ldap.ResultCode;
 import org.forgerock.opendj.ldap.spi.IndexingOptions;
 import org.forgerock.util.Utils;
 import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
@@ -250,7 +249,6 @@
 
   /** Used to shutdown import if an error occurs in phase one. */
   private volatile boolean isCanceled;
-  private volatile boolean isPhaseOneDone;
 
   /** Number of phase one buffers. */
   private int phaseOneBufferCount;
@@ -287,7 +285,7 @@
     this.serverContext = serverContext;
     this.tmpEnv = null;
     this.threadCount = 1;
-    this.rebuildManager = new RebuildIndexManager(rebuildConfig, cfg);
+    this.rebuildManager = new RebuildIndexManager(rootContainer.getStorage(), rebuildConfig, cfg);
     this.indexCount = rebuildManager.getIndexCount();
     this.clearedBackend = false;
     this.scratchFileWriterList =
@@ -386,7 +384,7 @@
      */
   }
 
-  private File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
+  private static File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
   {
     File parentDir;
     if (tmpDirectory != null)
@@ -400,8 +398,7 @@
     return new File(parentDir, backendCfg.getBackendId());
   }
 
-  private int getTotalIndexCount(PluggableBackendCfg backendCfg)
-      throws ConfigException
+  private static int getTotalIndexCount(PluggableBackendCfg backendCfg) throws ConfigException
   {
     int indexes = 2; // dn2id, dn2uri
     for (String indexName : backendCfg.listBackendIndexes())
@@ -770,14 +767,14 @@
     }
   }
 
-  private void clearSuffix(EntryContainer entryContainer)
+  private static void clearSuffix(EntryContainer entryContainer)
   {
     entryContainer.lock();
     entryContainer.clear();
     entryContainer.unlock();
   }
 
-  private boolean isAnyNotEqualAndAncestorOf(List<DN> dns, DN childDN)
+  private static boolean isAnyNotEqualAndAncestorOf(List<DN> dns, DN childDN)
   {
     for (DN dn : dns)
     {
@@ -789,7 +786,7 @@
     return true;
   }
 
-  private boolean isAnyAncestorOf(List<DN> dns, DN childDN)
+  private static boolean isAnyAncestorOf(List<DN> dns, DN childDN)
   {
     for (DN dn : dns)
     {
@@ -920,7 +917,6 @@
 
       final long startTime = System.currentTimeMillis();
       importPhaseOne();
-      isPhaseOneDone = true;
       final long phaseOneFinishTime = System.currentTimeMillis();
 
       if (!skipDNValidation)
@@ -1090,12 +1086,12 @@
     indexKeyQueueMap.clear();
   }
 
-  private void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
+  private static void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
   {
     timerService.scheduleAtFixedRate(task, TIMER_INTERVAL, TIMER_INTERVAL, TimeUnit.MILLISECONDS);
   }
 
-  private void shutdownAll(ExecutorService... executorServices) throws InterruptedException
+  private static void shutdownAll(ExecutorService... executorServices) throws InterruptedException
   {
     for (ExecutorService executorService : executorServices)
     {
@@ -1107,7 +1103,7 @@
     }
   }
 
-  private void clearAll(Collection<?>... cols)
+  private static void clearAll(Collection<?>... cols)
   {
     for (Collection<?> col : cols)
     {
@@ -1118,7 +1114,7 @@
   private void importPhaseTwo() throws InterruptedException, ExecutionException
   {
     ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
-    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
+    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
     try
     {
       processIndexFiles();
@@ -1218,7 +1214,7 @@
     }
   }
 
-  private <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException
+  private static <T> void getAll(List<Future<T>> futures) throws InterruptedException, ExecutionException
   {
     for (Future<?> result : futures)
     {
@@ -1430,8 +1426,8 @@
       }
     }
 
-    void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix)
-        throws DirectoryException, StorageRuntimeException, InterruptedException
+    void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix) throws DirectoryException,
+        StorageRuntimeException, InterruptedException
     {
       DN entryDN = entry.getName();
       DN2ID dn2id = suffix.getDN2ID();
@@ -2031,11 +2027,11 @@
     {
       if (indexMgr.isDN2ID())
       {
-        return new ImportIDSet(record.getKey(), newDefinedSet(), 1, false);
+        return new ImportIDSet(record.getKey(), newDefinedSet(), 1);
       }
 
       final Index index = indexIDToIndexMap.get(record.getIndexID());
-      return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit(), index.getMaintainCount());
+      return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit());
     }
 
     private void addToDB(WriteableTransaction txn, int indexID, ImportIDSet insertSet, ImportIDSet deleteSet)
@@ -2071,7 +2067,7 @@
       }
       if (dnState.checkParent(txn, idSet))
       {
-        dnState.writeToDN2ID(txn, idSet);
+        dnState.writeToDN2ID(txn, idSet.getKey());
       }
     }
 
@@ -2091,10 +2087,7 @@
       private final EntryContainer entryContainer;
       private final TreeName dn2id;
       private final TreeMap<ByteString, EntryID> parentIDMap = new TreeMap<ByteString, EntryID>();
-      private final Map<ByteString, ImportIDSet> id2childTree = new TreeMap<ByteString, ImportIDSet>();
-      private final Map<ByteString, ImportIDSet> id2subtreeTree = new TreeMap<ByteString, ImportIDSet>();
-      private final int childLimit, subTreeLimit;
-      private final boolean childDoCount, subTreeDoCount;
+      private final Map<EntryID, AtomicLong> id2childrenCountTree = new TreeMap<EntryID, AtomicLong>();
       private ByteSequence parentDN;
       private final ByteStringBuilder lastDN = new ByteStringBuilder();
       private EntryID parentID, lastID, entryID;
@@ -2103,12 +2096,6 @@
       {
         this.entryContainer = entryContainer;
         dn2id = entryContainer.getDN2ID().getName();
-        final Index id2c = entryContainer.getID2Children();
-        childLimit = id2c.getIndexEntryLimit();
-        childDoCount = id2c.getMaintainCount();
-        final Index id2s = entryContainer.getID2Subtree();
-        subTreeLimit = id2s.getIndexEntryLimit();
-        subTreeDoCount = id2s.getMaintainCount();
       }
 
       private ByteSequence getParent(ByteSequence dn)
@@ -2186,62 +2173,15 @@
         return true;
       }
 
-      private void id2child(WriteableTransaction txn, EntryID childID) throws DirectoryException
+      private AtomicLong getId2childrenCounter()
       {
-        if (parentID == null)
+        AtomicLong counter = id2childrenCountTree.get(parentID);
+        if (counter == null)
         {
-          throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_PARENT_ENTRY_IS_MISSING.get());
+          counter = new AtomicLong();
+          id2childrenCountTree.put(parentID, counter);
         }
-
-        getId2childtreeImportIDSet().addEntryID(childID);
-        if (id2childTree.size() > DN_STATE_CACHE_SIZE)
-        {
-          flushToDB(txn, id2childTree.values(), entryContainer.getID2Children(), true);
-        }
-      }
-
-      private ImportIDSet getId2childtreeImportIDSet()
-      {
-        final ByteString parentIDBytes = parentID.toByteString();
-        ImportIDSet idSet = id2childTree.get(parentIDBytes);
-        if (idSet == null)
-        {
-          idSet = new ImportIDSet(parentIDBytes, newDefinedSet(), childLimit, childDoCount);
-          id2childTree.put(parentIDBytes, idSet);
-        }
-        return idSet;
-      }
-
-      private void id2SubTree(WriteableTransaction txn, EntryID childID) throws DirectoryException
-      {
-        if (parentID == null)
-        {
-          throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, ERR_PARENT_ENTRY_IS_MISSING.get());
-        }
-
-        getId2subtreeImportIDSet(parentID).addEntryID(childID);
-        // TODO:
-        // Instead of doing this,
-        // we can just walk to parent cache if available
-        for (ByteSequence dn = getParent(parentDN); dn != null; dn = getParent(dn))
-        {
-          EntryID nodeID = getParentID(txn, dn);
-          if (nodeID != null)
-          {
-            getId2subtreeImportIDSet(nodeID).addEntryID(childID);
-          }
-          // else we have a missing parent. Maybe parent checking was turned off?
-          // Just ignore.
-        }
-        if (id2subtreeTree.size() > DN_STATE_CACHE_SIZE)
-        {
-          flushToDB(txn, id2subtreeTree.values(), entryContainer.getID2Subtree(), true);
-        }
-      }
-
-      private EntryID getParentID(ReadableTransaction txn, ByteSequence dn) throws StorageRuntimeException
-      {
-        return bypassCacheForAppendMode() ? get(txn, dn2id, dn) : parentIDMap.get(dn);
+        return counter;
       }
 
       /**
@@ -2259,45 +2199,34 @@
         return value != null ? new EntryID(value) : null;
       }
 
-      private ImportIDSet getId2subtreeImportIDSet(EntryID entryID)
+      public void writeToDN2ID(WriteableTransaction txn, ByteSequence key) throws DirectoryException
       {
-        ByteString entryIDBytes = entryID.toByteString();
-        ImportIDSet idSet = id2subtreeTree.get(entryIDBytes);
-        if (idSet == null)
-        {
-          idSet = new ImportIDSet(entryIDBytes, newDefinedSet(), subTreeLimit, subTreeDoCount);
-          id2subtreeTree.put(entryIDBytes, idSet);
-        }
-        return idSet;
-      }
-
-      public void writeToDN2ID(WriteableTransaction txn, ImportIDSet idSet) throws DirectoryException
-      {
-        txn.put(dn2id, idSet.getKey(), entryID.toByteString());
+        txn.put(dn2id, key, entryID.toByteString());
         indexMgr.addTotDNCount(1);
-        if (parentDN != null)
+        if (parentID != null)
         {
-          id2child(txn, entryID);
-          id2SubTree(txn, entryID);
+          incrementChildrenCounter(txn);
         }
       }
 
-      public void flush(WriteableTransaction txn)
+      private void incrementChildrenCounter(WriteableTransaction txn)
       {
-        flushToDB(txn, id2childTree.values(), entryContainer.getID2Children(), false);
-        flushToDB(txn, id2subtreeTree.values(), entryContainer.getID2Subtree(), false);
+        final AtomicLong counter = getId2childrenCounter();
+        counter.incrementAndGet();
+        if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE)
+        {
+          flush(txn);
+        }
       }
 
-      private void flushToDB(WriteableTransaction txn, Collection<ImportIDSet> idSets, Index index, boolean clearIDSets)
+      private void flush(WriteableTransaction txn)
       {
-        for (ImportIDSet idSet : idSets)
+        for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet())
         {
-          index.importPut(txn, idSet);
+          entryContainer.getID2ChildrenCount()
+              .addDelta(txn, childrenCounter.getKey(), childrenCounter.getValue().get());
         }
-        if (clearIDSets)
-        {
-          idSets.clear();
-        }
+        id2childrenCountTree.clear();
       }
     }
   }
@@ -2800,9 +2729,9 @@
      * @param cfg
      *          The local DB configuration to use.
      */
-    public RebuildIndexManager(RebuildConfig rebuildConfig, PluggableBackendCfg cfg)
+    public RebuildIndexManager(Storage storage, RebuildConfig rebuildConfig, PluggableBackendCfg cfg)
     {
-      super(null);
+      super(storage);
       this.rebuildConfig = rebuildConfig;
       this.cfg = cfg;
     }
@@ -2945,9 +2874,7 @@
         rebuildIndexMap(txn, false);
         // falls through
       case DEGRADED:
-        if (mode == RebuildMode.ALL
-            || !entryContainer.getID2Children().isTrusted()
-            || !entryContainer.getID2Subtree().isTrusted())
+        if (mode == RebuildMode.ALL)
         {
           dn2id = entryContainer.getDN2ID();
         }
@@ -3032,15 +2959,8 @@
       {
         // dn2uri does not have a trusted status.
         entryContainer.clearDatabase(txn, entryContainer.getDN2URI());
-      }
-
-      if (!onlyDegraded
-          || !entryContainer.getID2Children().isTrusted()
-          || !entryContainer.getID2Subtree().isTrusted())
-      {
         entryContainer.clearDatabase(txn, entryContainer.getDN2ID());
-        entryContainer.clearDatabase(txn, entryContainer.getID2Children());
-        entryContainer.clearDatabase(txn, entryContainer.getID2Subtree());
+        entryContainer.clearDatabase(txn, entryContainer.getID2ChildrenCount());
       }
 
       for (Map.Entry<IndexKey, MatchingRuleIndex> mapEntry : indexMap.entrySet())
@@ -3065,12 +2985,6 @@
     {
       try
       {
-        if (dn2id != null)
-        {
-          EntryContainer ec = suffix.getEntryContainer();
-          ec.getID2Children().setTrusted(txn, trusted);
-          ec.getID2Subtree().setTrusted(txn, trusted);
-        }
         setTrusted(txn, indexMap.values(), trusted);
         for (VLVIndex vlvIndex : vlvIndexes)
         {
@@ -3123,7 +3037,7 @@
 
     private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
     {
-      final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
+      final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask());
       try
       {
         processIndexFiles();
@@ -3482,7 +3396,6 @@
   {
     /** The time in milliseconds of the previous progress report. */
     private long previousTime;
-    private long latestCount;
 
     /**
      * Create a new import progress task.
@@ -3490,10 +3403,9 @@
      * @param latestCount
      *          The latest count of entries processed in phase one.
      */
-    public SecondPhaseProgressTask(long latestCount)
+    public SecondPhaseProgressTask()
     {
       previousTime = System.currentTimeMillis();
-      this.latestCount = latestCount;
     }
 
     /** The action to be performed by this timer task. */

--
Gitblit v1.10.0