From 641e89ef0e15c9edde69f3b8cf82c7dd5f68687a Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 30 Sep 2015 14:28:07 +0000
Subject: [PATCH] OPENDJ-2016: New on disk merge import strategy based on storage engine.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java |  116 +++++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 75 insertions(+), 41 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
index 61ab41f..ddf46d8 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
@@ -27,7 +27,6 @@
 
 import static com.persistit.Transaction.CommitPolicy.*;
 import static java.util.Arrays.*;
-
 import static org.opends.messages.BackendMessages.*;
 import static org.opends.messages.ConfigMessages.*;
 import static org.opends.messages.UtilityMessages.*;
@@ -69,6 +68,7 @@
 import org.opends.server.backends.pluggable.spi.Importer;
 import org.opends.server.backends.pluggable.spi.ReadOnlyStorageException;
 import org.opends.server.backends.pluggable.spi.ReadOperation;
+import org.opends.server.backends.pluggable.spi.SequentialCursor;
 import org.opends.server.backends.pluggable.spi.Storage;
 import org.opends.server.backends.pluggable.spi.StorageInUseException;
 import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
@@ -94,7 +94,6 @@
 import com.persistit.Key;
 import com.persistit.Persistit;
 import com.persistit.Transaction;
-import com.persistit.Tree;
 import com.persistit.Value;
 import com.persistit.Volume;
 import com.persistit.VolumeSpecification;
@@ -102,12 +101,15 @@
 import com.persistit.exception.PersistitException;
 import com.persistit.exception.RollbackException;
 import com.persistit.exception.TreeNotFoundException;
+import com.persistit.mxbeans.CheckpointManagerMXBean;
 
 /** PersistIt database implementation of the {@link Storage} engine. */
 @SuppressWarnings("javadoc")
 public final class PDBStorage implements Storage, Backupable, ConfigurationChangeListener<PDBBackendCfg>,
   DiskSpaceMonitorHandler
 {
+  private static final int IMPORT_DB_CACHE_SIZE = 4 * MB;
+
   private static final double MAX_SLEEP_ON_RETRY_MS = 50.0;
 
   private static final String VOLUME_NAME = "dj";
@@ -263,7 +265,6 @@
   /** PersistIt implementation of the {@link Importer} interface. */
   private final class ImporterImpl implements Importer
   {
-    private final Map<TreeName, Tree> trees = new HashMap<>();
     private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>();
     private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>()
     {
@@ -291,17 +292,60 @@
     }
 
     @Override
-    public void createTree(final TreeName treeName)
+    public void clearTree(final TreeName treeName)
     {
+      final Transaction txn = db.getTransaction();
+      deleteTree(txn, treeName);
+      createTree(txn, treeName);
+    }
+
+    private void createTree(final Transaction txn, final TreeName treeName)
+    {
+      Exchange ex = null;
       try
       {
-        final Tree tree = volume.getTree(mangleTreeName(treeName), true);
-        trees.put(treeName, tree);
+        txn.begin();
+        ex = getNewExchange(treeName, true);
+        txn.commit();
       }
-      catch (final PersistitException e)
+      catch (PersistitException e)
       {
         throw new StorageRuntimeException(e);
       }
+      finally
+      {
+        txn.end();
+        releaseExchangeSilenty(ex);
+      }
+    }
+
+    private void deleteTree(Transaction txn, final TreeName treeName)
+    {
+      Exchange ex = null;
+      try
+      {
+        txn.begin();
+        ex = getNewExchange(treeName, true);
+        ex.removeTree();
+        txn.commit();
+      }
+      catch (PersistitException e)
+      {
+        throw new StorageRuntimeException(e);
+      }
+      finally
+      {
+        txn.end();
+        releaseExchangeSilenty(ex);
+      }
+    }
+
+    private void releaseExchangeSilenty(Exchange ex)
+    {
+      if ( ex != null)
+      {
+        db.releaseExchange(ex);
+      }
     }
 
     @Override
@@ -321,21 +365,6 @@
     }
 
     @Override
-    public boolean delete(final TreeName treeName, final ByteSequence key)
-    {
-      try
-      {
-        final Exchange ex = getExchangeFromCache(treeName);
-        bytesToKey(ex.getKey(), key);
-        return ex.remove();
-      }
-      catch (final PersistitException e)
-      {
-        throw new StorageRuntimeException(e);
-      }
-    }
-
-    @Override
     public ByteString read(final TreeName treeName, final ByteSequence key)
     {
       try
@@ -362,6 +391,19 @@
       }
       return exchange;
     }
+
+    @Override
+    public SequentialCursor<ByteString, ByteString> openCursor(TreeName treeName)
+    {
+      try
+      {
+        return new CursorImpl(getNewExchange(treeName, false));
+      }
+      catch (PersistitException e)
+      {
+        throw new StorageRuntimeException(e);
+      }
+    }
   }
 
   /** Common interface for internal WriteableTransaction implementations. */
@@ -430,8 +472,7 @@
     public long getRecordCount(TreeName treeName)
     {
       // FIXME: is there a better/quicker way to do this?
-      final Cursor<?, ?> cursor = openCursor(treeName);
-      try
+      try(final Cursor<?, ?> cursor = openCursor(treeName))
       {
         long count = 0;
         while (cursor.next())
@@ -440,10 +481,6 @@
         }
         return count;
       }
-      finally
-      {
-        cursor.close();
-      }
     }
 
     @Override
@@ -501,12 +538,6 @@
     }
 
     @Override
-    public void renameTree(final TreeName oldTreeName, final TreeName newTreeName)
-    {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
     public boolean update(final TreeName treeName, final ByteSequence key, final UpdateFunction f)
     {
       try
@@ -642,12 +673,6 @@
     }
 
     @Override
-    public void renameTree(TreeName oldName, TreeName newName)
-    {
-      throw new ReadOnlyStorageException();
-    }
-
-    @Override
     public void deleteTree(TreeName name)
     {
       throw new ReadOnlyStorageException();
@@ -712,6 +737,15 @@
     cfg.addPDBChangeListener(this);
   }
 
+  private Configuration buildImportConfiguration()
+  {
+    final Configuration dbCfg = buildConfiguration(AccessMode.READ_WRITE);
+    getBufferPoolCfg(dbCfg).setMaximumMemory(IMPORT_DB_CACHE_SIZE);
+    dbCfg.setCheckpointInterval(CheckpointManagerMXBean.MAXIMUM_CHECKPOINT_INTERVAL_S);
+    dbCfg.setCommitPolicy(SOFT);
+    return dbCfg;
+  }
+
   private Configuration buildConfiguration(AccessMode accessMode)
   {
     this.accessMode = accessMode;
@@ -867,7 +901,7 @@
   @Override
   public Importer startImport() throws ConfigException, StorageRuntimeException
   {
-    open0(buildConfiguration(AccessMode.READ_WRITE));
+    open0(buildImportConfiguration());
     return new ImporterImpl();
   }
 

--
Gitblit v1.10.0