From cccb086edd542cbf8f4cafa9ee3923f5e9099ed6 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 26 May 2015 08:39:56 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

---
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java                |    8 
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java                   |   29 ++
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java                       |   21 +
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java |  577 +++++++++++++++++++++++++++++++++++++++++++----
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/TreeName.java               |   12 +
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java              |   15 +
 6 files changed, 610 insertions(+), 52 deletions(-)

diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
index 647336e..0167f3b 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
@@ -41,11 +41,13 @@
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
 import org.forgerock.i18n.LocalizableMessage;
@@ -965,6 +967,25 @@
     new BackupManager(config.getBackendId()).restoreBackup(this, restoreConfig);
   }
 
+  @Override
+  public Set<TreeName> listTrees()
+  {
+    try
+    {
+      String[] treeNames = volume.getTreeNames();
+      final Set<TreeName> results = new HashSet<>(treeNames.length);
+      for (String treeName : treeNames)
+      {
+        results.add(TreeName.valueOf(treeName));
+      }
+      return results;
+    }
+    catch (PersistitException e)
+    {
+      throw new StorageRuntimeException(e);
+    }
+  }
+
   /**
    * TODO: it would be nice to use the low-level key/value APIs. They seem quite
    * inefficient at the moment for simple byte arrays.
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java
index 42e21d0..319dffb 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java
@@ -44,8 +44,10 @@
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.ByteStringBuilder;
 import org.forgerock.opendj.ldap.DecodeException;
+import org.forgerock.util.Reject;
 import org.opends.server.api.CompressedSchema;
 import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.Importer;
 import org.opends.server.backends.pluggable.spi.ReadableTransaction;
 import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
 import org.opends.server.backends.pluggable.spi.TreeName;
@@ -323,6 +325,7 @@
   public void put(WriteableTransaction txn, EntryID id, Entry entry)
        throws StorageRuntimeException, DirectoryException
   {
+    Reject.ifNull(txn);
     ByteString key = id.toByteString();
     EntryCodec codec = acquireEntryCodec();
     try
@@ -337,6 +340,32 @@
   }
 
   /**
+   * Write a record in the entry tree.
+   *
+   * @param importer a non null importer
+   * @param id The entry ID which forms the key.
+   * @param entry The LDAP entry.
+   * @throws StorageRuntimeException If an error occurs in the storage.
+   * @throws DirectoryException  If a problem occurs while attempting to encode the entry.
+   */
+  public void importPut(Importer importer, EntryID id, Entry entry)
+       throws StorageRuntimeException, DirectoryException
+  {
+    Reject.ifNull(importer);
+    ByteString key = id.toByteString();
+    EntryCodec codec = acquireEntryCodec();
+    try
+    {
+      ByteString value = codec.encodeInternal(entry, dataConfig);
+      importer.put(getName(), key, value);
+    }
+    finally
+    {
+      codec.release();
+    }
+  }
+
+  /**
    * Remove a record from the entry tree.
    *
    * @param txn a non null transaction
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index d297582..dea8c2c 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -42,20 +42,26 @@
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TimerTask;
+import java.util.TreeSet;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -71,6 +77,7 @@
 import org.forgerock.opendj.ldap.ByteSequence;
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.util.Reject;
 import org.forgerock.util.Utils;
 import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
 import org.opends.server.admin.std.server.BackendIndexCfg;
@@ -83,19 +90,24 @@
 import org.opends.server.backends.pluggable.spi.ReadOperation;
 import org.opends.server.backends.pluggable.spi.ReadableTransaction;
 import org.opends.server.backends.pluggable.spi.Storage;
+import org.opends.server.backends.pluggable.spi.Storage.AccessMode;
 import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
+import org.opends.server.backends.pluggable.spi.StorageStatus;
 import org.opends.server.backends.pluggable.spi.TreeName;
 import org.opends.server.backends.pluggable.spi.WriteOperation;
 import org.opends.server.backends.pluggable.spi.WriteableTransaction;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.ServerContext;
 import org.opends.server.types.AttributeType;
+import org.opends.server.types.BackupConfig;
+import org.opends.server.types.BackupDirectory;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.LDIFImportConfig;
 import org.opends.server.types.LDIFImportResult;
+import org.opends.server.types.RestoreConfig;
 import org.opends.server.util.Platform;
 
 /**
@@ -104,6 +116,11 @@
  */
 final class OnDiskMergeStorageImporter
 {
+  private static UnsupportedOperationException notImplemented()
+  {
+    return new UnsupportedOperationException("Not implemented");
+  }
+
   /** Data to put into id2entry tree. */
   private static final class Id2EntryData
   {
@@ -262,7 +279,8 @@
     private final File file;
     private final FileChannel fileChannel;
     private final List<Integer> bufferPositions = new ArrayList<>();
-    private int bufferSize = 1024; // TODO JNR use MAX_BUFFER_SIZE?
+    /** TODO JNR offer configuration for this. */
+    private int bufferSize = 1024;
 
     // FIXME this is not thread safe yet!!!
     /**
@@ -271,7 +289,7 @@
      * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
      * {@link #bufferSize}.
      */
-    private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
+    private ConcurrentNavigableMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentSkipListMap<>();
     /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
     private int maximumExpectedSizeOnDisk;
 
@@ -375,7 +393,6 @@
       }
     }
 
-    /** {@inheritDoc} */
     @Override
     public String toString()
     {
@@ -387,6 +404,436 @@
     }
   }
 
+  /** A cursor implementation aggregating several cursors and ordering them by their key value. */
+  private static final class CompositeCursor<K extends Comparable<? super K>, V> implements Cursor<K, V>
+  {
+    private static final byte UNINITIALIZED = 0;
+    private static final byte READY = 1;
+    private static final byte CLOSED = 2;
+
+    /**
+     * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or {@link #CLOSED}
+     */
+    private byte state = UNINITIALIZED;
+
+    /**
+     * The cursors are sorted based on the key change of each cursor to consider the next change
+     * across all cursors.
+     */
+    private final NavigableSet<Cursor<K, V>> cursors = new TreeSet<>(new Comparator<Cursor<K, V>>()
+        {
+          @Override
+          public int compare(Cursor<K, V> c1, Cursor<K, V> c2)
+          {
+            final int cmp = c1.getKey().compareTo(c2.getKey());
+            if (cmp == 0)
+            {
+              // Never return 0. Otherwise both cursors are considered equal
+              // and only one of them is kept by this set
+              return System.identityHashCode(c1) - System.identityHashCode(c2);
+            }
+            return cmp;
+          }
+        });
+
+    private CompositeCursor(Collection<Cursor<K, V>> cursors)
+    {
+      Reject.ifNull(cursors);
+
+      for (Iterator<Cursor<K, V>> it = cursors.iterator(); it.hasNext();)
+      {
+        Cursor<K, V> cursor = it.next();
+        if (!cursor.isDefined() && !cursor.next())
+        {
+          it.remove();
+        }
+      }
+
+      this.cursors.addAll(cursors);
+    }
+
+    @Override
+    public boolean next()
+    {
+      if (state == CLOSED)
+      {
+        return false;
+      }
+
+      // If previous state was ready, then we must advance the first cursor
+      // To keep consistent the cursors' order in the SortedSet, it is necessary
+      // to remove the first cursor, then add it again after moving it forward.
+      if (state == UNINITIALIZED)
+      {
+        state = READY;
+      }
+      else if (state == READY)
+      {
+        final Cursor<K, V> cursorToAdvance = cursors.pollFirst();
+        if (cursorToAdvance != null && cursorToAdvance.next())
+        {
+          this.cursors.add(cursorToAdvance);
+        }
+      }
+      return isDefined();
+    }
+
+    @Override
+    public boolean isDefined()
+    {
+      return state == READY && !cursors.isEmpty();
+    }
+
+    @Override
+    public K getKey() throws NoSuchElementException
+    {
+      throwIfNotDefined();
+      return cursors.first().getKey();
+    }
+
+    @Override
+    public V getValue() throws NoSuchElementException
+    {
+      throwIfNotDefined();
+      return cursors.first().getValue();
+    }
+
+    private void throwIfNotDefined()
+    {
+      if (!isDefined())
+      {
+        throw new NoSuchElementException();
+      }
+    }
+
+    @Override
+    public void close()
+    {
+      state = CLOSED;
+      Utils.closeSilently(cursors);
+      cursors.clear();
+    }
+
+    @Override
+    public String toString()
+    {
+      if (isDefined())
+      {
+        return cursors.first().toString();
+      }
+      return "not defined";
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean positionToKey(ByteSequence key)
+    {
+      throw notImplemented();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean positionToKeyOrNext(ByteSequence key)
+    {
+      throw notImplemented();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean positionToLastKey()
+    {
+      throw notImplemented();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean positionToIndex(int index)
+    {
+      throw notImplemented();
+    }
+  }
+
+  /** A cursor implementation reading key/value pairs from memory mapped files, a.k.a {@link MappedByteBuffer}. */
+  private static final class ByteBufferCursor implements Cursor<ByteString, ByteString>
+  {
+    private final ByteBuffer byteBuffer;
+    private final int startPos;
+    private final int endPos;
+    private final ByteStringBuilder keyBuffer = new ByteStringBuilder();//FIXME JNR  bad: do zero copy?
+    private final ByteStringBuilder valueBuffer = new ByteStringBuilder();//FIXME JNR  bad: do zero copy?
+    private int currentPos;
+    private boolean isDefined;
+
+    private ByteBufferCursor(ByteBuffer byteBuffer, int startPos, int endPos)
+    {
+      this.byteBuffer = byteBuffer;
+      this.startPos = startPos;
+      this.endPos = endPos;
+      this.currentPos = startPos;
+    }
+
+    @Override
+    public boolean next()
+    {
+      isDefined = false;
+      if (currentPos >= endPos)
+      {
+        return isDefined = false;
+      }
+      read(keyBuffer);
+      read(valueBuffer);
+      return isDefined = true;
+    }
+
+    private void read(ByteStringBuilder buffer)
+    {
+      int length = byteBuffer.getInt(currentPos);
+      currentPos += INT_SIZE;
+      byteBuffer.position(currentPos);
+
+      buffer.clear();
+      buffer.setLength(length);
+      byteBuffer.get(buffer.getBackingArray(), 0, length);
+      currentPos += length;
+    }
+
+    @Override
+    public boolean isDefined()
+    {
+      return isDefined;
+    }
+
+    @Override
+    public ByteString getKey() throws NoSuchElementException
+    {
+      throwIfNotDefined();
+      return keyBuffer.toByteString();
+    }
+
+    @Override
+    public ByteString getValue() throws NoSuchElementException
+    {
+      throwIfNotDefined();
+      return valueBuffer.toByteString();
+    }
+
+    private void throwIfNotDefined()
+    {
+      if (!isDefined())
+      {
+        throw new NoSuchElementException();
+      }
+    }
+
+    @Override
+    public void close()
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public String toString()
+    {
+      if (isDefined())
+      {
+        final ByteString key = getKey();
+        final ByteString value = getValue();
+        return "<key=" + key + "(" + key.toHexString() + "), value=" + value + "(" + value.toHexString() + ")>";
+      }
+      return "not defined";
+    }
+
+    @Override
+    public boolean positionToKey(ByteSequence key)
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public boolean positionToKeyOrNext(ByteSequence key)
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public boolean positionToLastKey()
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public boolean positionToIndex(int index)
+    {
+      throw notImplemented();
+    }
+  }
+
+  /** A storage using memory mapped files, a.k.a {@link MappedByteBuffer}. */
+  private static final class MemoryMappedStorage implements Storage
+  {
+    private final File bufferDir;
+
+    private MemoryMappedStorage(File bufferDir)
+    {
+      this.bufferDir = bufferDir;
+    }
+
+    @Override
+    public Importer startImport() throws ConfigException, StorageRuntimeException
+    {
+      return new MemoryMappedBufferImporter(bufferDir);
+    }
+
+    @Override
+    public void open(AccessMode accessMode) throws Exception
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public <T> T read(ReadOperation<T> readOperation) throws Exception
+    {
+      return readOperation.run(new ReadableTransaction()
+      {
+        @Override
+        public Cursor<ByteString, ByteString> openCursor(TreeName treeName)
+        {
+          try
+          {
+            List<Integer> bufferPositions = readBufferPositions(treeName);
+
+            // TODO JNR build ByteSequence implementation reading from memory mapped files?
+            return getCursors(treeName, bufferPositions);
+          }
+          catch (IOException e)
+          {
+            throw new StorageRuntimeException(e);
+          }
+        }
+
+        private List<Integer> readBufferPositions(TreeName treeName) throws IOException
+        {
+          // TODO JNR move to Buffer class?
+          File indexFile = new File(bufferDir, treeName + ".index");
+          List<String> indexLines = Files.readAllLines(indexFile.toPath(), Charset.defaultCharset());
+          if (indexLines.size() != 1)
+          {
+            throw new IllegalStateException("Not implemented");// TODO JNR
+          }
+
+          final String[] bufferPositions = indexLines.get(0).split(" ");
+          final List<Integer> results = new ArrayList<>(bufferPositions.length);
+          for (String bufferPos : bufferPositions)
+          {
+            results.add(Integer.valueOf(bufferPos));
+          }
+          return results;
+        }
+
+        private Cursor<ByteString, ByteString> getCursors(TreeName treeName, List<Integer> bufferPositions)
+            throws IOException
+        {
+          // TODO JNR move to Buffer class?
+          File bufferFile = new File(bufferDir, treeName.toString());
+          FileChannel fileChannel = new RandomAccessFile(bufferFile, "r").getChannel();
+          long fileSize = Files.size(bufferFile.toPath());
+          final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
+
+          final List<Cursor<ByteString, ByteString>> cursors = new ArrayList<>(bufferPositions.size() - 1);
+          Iterator<Integer> it = bufferPositions.iterator();
+          if (it.hasNext())
+          {
+            int lastPos = it.next();
+            while (it.hasNext())
+            {
+              final int bufferPos = it.next();
+              cursors.add(new ByteBufferCursor(byteBuffer, lastPos, bufferPos));
+              lastPos = bufferPos;
+            }
+          }
+          return new CompositeCursor<ByteString, ByteString>(cursors);
+        }
+
+        @Override
+        public ByteString read(TreeName treeName, ByteSequence key)
+        {
+          throw notImplemented();
+        }
+
+        @Override
+        public long getRecordCount(TreeName treeName)
+        {
+          throw notImplemented();
+        }
+      });
+    }
+
+    @Override
+    public void write(WriteOperation writeOperation) throws Exception
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public void removeStorageFiles() throws StorageRuntimeException
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public StorageStatus getStorageStatus()
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public boolean supportsBackupAndRestore()
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public void close()
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public void createBackup(BackupConfig backupConfig) throws DirectoryException
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public Set<TreeName> listTrees()
+    {
+      final Set<TreeName> results = new HashSet<>();
+      for (File baseDN : this.bufferDir.listFiles())
+      {
+        for (File index : baseDN.listFiles())
+        {
+          if (!index.getName().endsWith(".index"))
+          {
+            results.add(new TreeName(baseDN.getName(), index.getName()));
+          }
+        }
+      }
+      return results;
+    }
+  }
+
   /** An importer using memory mapped files, a.k.a {@link MappedByteBuffer}. */
   private static final class MemoryMappedBufferImporter implements Importer
   {
@@ -416,7 +863,8 @@
       Buffer buffer = treeNameToBufferMap.get(treeName);
       if (buffer == null)
       {
-        // TODO JNR that would be great if it was creating sub directories :)
+        // Creates sub directories for each suffix
+        // FIXME JNR cannot directly use DN names as directory + file names
         buffer = new Buffer(new File(bufferDir, treeName.toString()));
         treeNameToBufferMap.put(treeName, buffer);
       }
@@ -424,24 +872,6 @@
     }
 
     @Override
-    public ByteString read(TreeName treeName, ByteSequence key)
-    {
-      throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public boolean delete(TreeName treeName, ByteSequence key)
-    {
-      throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public void createTree(TreeName name)
-    {
-      throw new RuntimeException("Not implemented");
-    }
-
-    @Override
     public void close()
     {
       for (Buffer buffer : treeNameToBufferMap.values())
@@ -449,6 +879,24 @@
         buffer.flush();
       }
     }
+
+    @Override
+    public ByteString read(TreeName treeName, ByteSequence key)
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public boolean delete(TreeName treeName, ByteSequence key)
+    {
+      throw notImplemented();
+    }
+
+    @Override
+    public void createTree(TreeName name)
+    {
+      throw notImplemented();
+    }
   }
 
   /**
@@ -918,10 +1366,10 @@
         }
       });
 
-      final Importer tmpImporter = new MemoryMappedBufferImporter(tempDir);
+      final MemoryMappedStorage tmpStorage = new MemoryMappedStorage(tempDir);
 
       final long startTime = System.currentTimeMillis();
-      importPhaseOne(backendStorage, tmpImporter);
+      importPhaseOne(backendStorage, tmpStorage);
       final long phaseOneFinishTime = System.currentTimeMillis();
 
       if (isCanceled())
@@ -929,14 +1377,17 @@
         throw new InterruptedException("Import processing canceled.");
       }
 
+      backendStorage.close();
+
       final long phaseTwoTime = System.currentTimeMillis();
-      importPhaseTwo();
+      importPhaseTwo(backendStorage, tmpStorage);
       if (isCanceled())
       {
         throw new InterruptedException("Import processing canceled.");
       }
       final long phaseTwoFinishTime = System.currentTimeMillis();
 
+      backendStorage.open(AccessMode.READ_WRITE);
       backendStorage.write(new WriteOperation()
       {
         @Override
@@ -1017,31 +1468,34 @@
    * </ol>
    * TODO JNR fix all javadocs
    */
-  private void importPhaseOne(Storage storage, Importer importer) throws Exception
+  private void importPhaseOne(Storage backendStorage, Storage tmpStorage) throws Exception
   {
     final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
     scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
     threadCount = 2; // FIXME JNR id2entry + another task
     final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
 
-    final Id2EntryPutTask id2EntryPutTask = new Id2EntryPutTask(storage);
-    final Future<?> dn2IdPutFuture = execService.submit(id2EntryPutTask);
-    execService.submit(new MigrateExistingEntriesTask(storage, importer, id2EntryPutTask)).get();
-
-    final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
-    if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries())
+    try (Importer tmpImporter = tmpStorage.startImport())
     {
-      for (int i = 0; i < threadCount - 1; i++)
-      {
-        tasks.add(new ImportTask(importer, id2EntryPutTask));
-      }
-    }
-    execService.invokeAll(tasks);
-    tasks.clear();
+      final Id2EntryPutTask id2EntryPutTask = new Id2EntryPutTask(backendStorage);
+      final Future<?> dn2IdPutFuture = execService.submit(id2EntryPutTask);
+      execService.submit(new MigrateExistingEntriesTask(backendStorage, tmpImporter, id2EntryPutTask)).get();
 
-    execService.submit(new MigrateExcludedTask(storage, importer, id2EntryPutTask)).get();
-    id2EntryPutTask.finishedWrites();
-    dn2IdPutFuture.get();
+      final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
+      if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries())
+      {
+        for (int i = 0; i < threadCount - 1; i++)
+        {
+          tasks.add(new ImportTask(tmpImporter, id2EntryPutTask));
+        }
+      }
+      execService.invokeAll(tasks);
+      tasks.clear();
+
+      execService.submit(new MigrateExcludedTask(backendStorage, tmpImporter, id2EntryPutTask)).get();
+      id2EntryPutTask.finishedWrites();
+      dn2IdPutFuture.get();
+    }
 
     shutdownAll(timerService, execService);
   }
@@ -1063,20 +1517,45 @@
     }
   }
 
-  private void importPhaseTwo() throws Exception
+  private void importPhaseTwo(final Storage outStorage, Storage inStorage) throws Exception
   {
     ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
     scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
-    try
+
+    final Set<TreeName> treeNames = inStorage.listTrees(); // FIXME JNR rename to listTreeNames()?
+    ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size());
+    try (Importer importer = outStorage.startImport())
     {
-      // TODO JNR
+      for (final TreeName treeName : treeNames)
+      {
+        copyTo(treeName, inStorage, importer);// FIXME JNR use dbService
+      }
     }
     finally
     {
-      shutdownAll(timerService);
+      shutdownAll(timerService, dbService);
     }
   }
 
+  private void copyTo(final TreeName treeName, Storage input, final Importer output) throws Exception
+  {
+    input.read(new ReadOperation<Void>()
+    {
+      @Override
+      public Void run(ReadableTransaction txn) throws Exception
+      {
+        try (Cursor<ByteString, ByteString> cursor = txn.openCursor(treeName))
+        {
+          while (cursor.next())
+          {// FIXME JNR add merge phase
+            output.put(treeName, cursor.getKey(), cursor.getValue());
+          }
+        }
+        return null;
+      }
+    });
+  }
+
   /** Task used to migrate excluded branch. */
   private final class MigrateExcludedTask extends ImportTask
   {
@@ -1088,7 +1567,6 @@
       this.storage = storage;
     }
 
-    /** {@inheritDoc} */
     @Override
     public Void call() throws Exception
     {
@@ -1266,7 +1744,6 @@
       this.id2EntryPutTask = id2EntryPutTask;
     }
 
-    /** {@inheritDoc} */
     @Override
     public Void call() throws Exception
     {
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
index 5c097ea..ec2a586 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
@@ -25,6 +25,8 @@
  */
 package org.opends.server.backends.pluggable;
 
+import java.util.Set;
+
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.ldap.ByteSequence;
@@ -375,6 +377,17 @@
     }
   }
 
+  @Override
+  public Set<TreeName> listTrees()
+  {
+    final Set<TreeName> results = storage.listTrees();
+    if (logger.isTraceEnabled())
+    {
+      logger.trace("Storage@%s.listTrees() = ", storageId(), results);
+    }
+    return results;
+  }
+
   private String hex(final ByteSequence bytes)
   {
     return bytes != null ? bytes.toByteString().toHexString() : null;
@@ -384,6 +397,4 @@
   {
     return System.identityHashCode(this);
   }
-
-
 }
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
index ca97c08..d5d73ba 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
@@ -26,6 +26,7 @@
 package org.opends.server.backends.pluggable.spi;
 
 import java.io.Closeable;
+import java.util.Set;
 
 import org.forgerock.opendj.config.server.ConfigException;
 import org.opends.server.types.BackupConfig;
@@ -155,4 +156,11 @@
    *           If a Directory Server error occurs.
    */
   void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException;
+
+  /**
+   * TODO JNR.
+   *
+   * @return TODO JNR
+   */
+  Set<TreeName> listTrees();
 }
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/TreeName.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/TreeName.java
index d52866c..9e67d51 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/TreeName.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/TreeName.java
@@ -53,6 +53,18 @@
   }
 
   /**
+   * Builds a new {@link TreeName} object based on the provided string representation.
+   *
+   * @param treeName the string representation of the tree name
+   * @return a new {@link TreeName} object constructed from the provided string 
+   */
+  public static TreeName valueOf(String treeName)
+  {
+    final String[] split = treeName.split("/");
+    return new TreeName(split[0], split[1]);
+  }
+
+  /**
    * Returns the base DN.
    *
    * @return a {@code String} representing the base DN

--
Gitblit v1.10.0