mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.17.2015 6e7197ffb0c475014c1a19a979327aa337a17844
OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

Addressing comments from CR-7059.

OnDiskMergeStorageImporter.java:
Used SequentialCursor as much as possible.
1 files modified
69 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java 69 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -93,6 +93,7 @@
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadOperation;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.SequentialCursor;
import org.opends.server.backends.pluggable.spi.Storage;
import org.opends.server.backends.pluggable.spi.Storage.AccessMode;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
@@ -535,7 +536,7 @@
  }
  /** A cursor performing the "merge" phase of the on-disk merge. */
  private static final class MergingCursor<K, V> implements Cursor<K, V>
  private static final class MergingCursor<K, V> implements SequentialCursor<K, V>
  {
    private final Cursor<K, V> delegate;
    private final MergingConsumer<V> merger;
@@ -620,30 +621,6 @@
      delegate.close();
      isDefined = false;
    }
    @Override
    public boolean positionToKey(ByteSequence key)
    {
      return delegate.positionToKey(key);
    }
    @Override
    public boolean positionToKeyOrNext(ByteSequence key)
    {
      return delegate.positionToKeyOrNext(key);
    }
    @Override
    public boolean positionToLastKey()
    {
      return delegate.positionToLastKey();
    }
    @Override
    public boolean positionToIndex(int index)
    {
      return delegate.positionToIndex(index);
    }
  }
  /** A cursor implementation aggregating several cursors and ordering them by their key value. */
@@ -662,10 +639,10 @@
     * The cursors are sorted based on the key change of each cursor to consider the next change
     * across all cursors.
     */
    private final NavigableSet<Cursor<K, V>> cursors = new TreeSet<>(new Comparator<Cursor<K, V>>()
    private final NavigableSet<SequentialCursor<K, V>> cursors = new TreeSet<>(new Comparator<SequentialCursor<K, V>>()
        {
          @Override
          public int compare(Cursor<K, V> c1, Cursor<K, V> c2)
          public int compare(SequentialCursor<K, V> c1, SequentialCursor<K, V> c2)
          {
            final int cmp = c1.getKey().compareTo(c2.getKey());
            if (cmp == 0)
@@ -678,13 +655,13 @@
          }
        });
    private CompositeCursor(Collection<Cursor<K, V>> cursors)
    private CompositeCursor(Collection<SequentialCursor<K, V>> cursors)
    {
      Reject.ifNull(cursors);
      for (Iterator<Cursor<K, V>> it = cursors.iterator(); it.hasNext();)
      for (Iterator<SequentialCursor<K, V>> it = cursors.iterator(); it.hasNext();)
      {
        Cursor<K, V> cursor = it.next();
        SequentialCursor<K, V> cursor = it.next();
        if (!cursor.isDefined() && !cursor.next())
        {
          it.remove();
@@ -711,7 +688,7 @@
      }
      else if (state == READY)
      {
        final Cursor<K, V> cursorToAdvance = cursors.pollFirst();
        final SequentialCursor<K, V> cursorToAdvance = cursors.pollFirst();
        if (cursorToAdvance != null && cursorToAdvance.next())
        {
          this.cursors.add(cursorToAdvance);
@@ -792,7 +769,7 @@
  }
  /** A cursor implementation reading key/value pairs from memory mapped files, a.k.a {@link MappedByteBuffer}. */
  private static final class ByteBufferCursor implements Cursor<ByteString, ByteString>
  private static final class ByteBufferCursor implements SequentialCursor<ByteString, ByteString>
  {
    private final ByteBuffer byteBuffer;
    private final int startPos;
@@ -880,30 +857,6 @@
      }
      return "not defined";
    }
    @Override
    public boolean positionToKey(ByteSequence key)
    {
      throw notImplemented();
    }
    @Override
    public boolean positionToKeyOrNext(ByteSequence key)
    {
      throw notImplemented();
    }
    @Override
    public boolean positionToLastKey()
    {
      throw notImplemented();
    }
    @Override
    public boolean positionToIndex(int index)
    {
      throw notImplemented();
    }
  }
  /** A storage using memory mapped files, a.k.a {@link MappedByteBuffer}. */
@@ -977,7 +930,7 @@
          long fileSize = Files.size(bufferFile.toPath());
          final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
          final List<Cursor<ByteString, ByteString>> cursors = new ArrayList<>(bufferPositions.size() - 1);
          final List<SequentialCursor<ByteString, ByteString>> cursors = new ArrayList<>(bufferPositions.size() - 1);
          Iterator<Integer> it = bufferPositions.iterator();
          if (it.hasNext())
          {
@@ -1789,7 +1742,7 @@
      @Override
      public Void run(ReadableTransaction txn) throws Exception
      {
        try (Cursor<ByteString, ByteString> cursor =
        try (SequentialCursor<ByteString, ByteString> cursor =
            new MergingCursor<ByteString, ByteString>(txn.openCursor(treeName), getMerger(treeName)))
        {
          while (cursor.next())