From 6e7197ffb0c475014c1a19a979327aa337a17844 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 28 May 2015 11:17:05 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java | 69 +++++-----------------------------
1 files changed, 11 insertions(+), 58 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index 62ad490..626412d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -93,6 +93,7 @@
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadOperation;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
+import org.opends.server.backends.pluggable.spi.SequentialCursor;
import org.opends.server.backends.pluggable.spi.Storage;
import org.opends.server.backends.pluggable.spi.Storage.AccessMode;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
@@ -535,7 +536,7 @@
}
/** A cursor performing the "merge" phase of the on-disk merge. */
- private static final class MergingCursor<K, V> implements Cursor<K, V>
+ private static final class MergingCursor<K, V> implements SequentialCursor<K, V>
{
private final Cursor<K, V> delegate;
private final MergingConsumer<V> merger;
@@ -620,30 +621,6 @@
delegate.close();
isDefined = false;
}
-
- @Override
- public boolean positionToKey(ByteSequence key)
- {
- return delegate.positionToKey(key);
- }
-
- @Override
- public boolean positionToKeyOrNext(ByteSequence key)
- {
- return delegate.positionToKeyOrNext(key);
- }
-
- @Override
- public boolean positionToLastKey()
- {
- return delegate.positionToLastKey();
- }
-
- @Override
- public boolean positionToIndex(int index)
- {
- return delegate.positionToIndex(index);
- }
}
/** A cursor implementation aggregating several cursors and ordering them by their key value. */
@@ -662,10 +639,10 @@
* The cursors are sorted based on the key change of each cursor to consider the next change
* across all cursors.
*/
- private final NavigableSet<Cursor<K, V>> cursors = new TreeSet<>(new Comparator<Cursor<K, V>>()
+ private final NavigableSet<SequentialCursor<K, V>> cursors = new TreeSet<>(new Comparator<SequentialCursor<K, V>>()
{
@Override
- public int compare(Cursor<K, V> c1, Cursor<K, V> c2)
+ public int compare(SequentialCursor<K, V> c1, SequentialCursor<K, V> c2)
{
final int cmp = c1.getKey().compareTo(c2.getKey());
if (cmp == 0)
@@ -678,13 +655,13 @@
}
});
- private CompositeCursor(Collection<Cursor<K, V>> cursors)
+ private CompositeCursor(Collection<SequentialCursor<K, V>> cursors)
{
Reject.ifNull(cursors);
- for (Iterator<Cursor<K, V>> it = cursors.iterator(); it.hasNext();)
+ for (Iterator<SequentialCursor<K, V>> it = cursors.iterator(); it.hasNext();)
{
- Cursor<K, V> cursor = it.next();
+ SequentialCursor<K, V> cursor = it.next();
if (!cursor.isDefined() && !cursor.next())
{
it.remove();
@@ -711,7 +688,7 @@
}
else if (state == READY)
{
- final Cursor<K, V> cursorToAdvance = cursors.pollFirst();
+ final SequentialCursor<K, V> cursorToAdvance = cursors.pollFirst();
if (cursorToAdvance != null && cursorToAdvance.next())
{
this.cursors.add(cursorToAdvance);
@@ -792,7 +769,7 @@
}
/** A cursor implementation reading key/value pairs from memory mapped files, a.k.a {@link MappedByteBuffer}. */
- private static final class ByteBufferCursor implements Cursor<ByteString, ByteString>
+ private static final class ByteBufferCursor implements SequentialCursor<ByteString, ByteString>
{
private final ByteBuffer byteBuffer;
private final int startPos;
@@ -880,30 +857,6 @@
}
return "not defined";
}
-
- @Override
- public boolean positionToKey(ByteSequence key)
- {
- throw notImplemented();
- }
-
- @Override
- public boolean positionToKeyOrNext(ByteSequence key)
- {
- throw notImplemented();
- }
-
- @Override
- public boolean positionToLastKey()
- {
- throw notImplemented();
- }
-
- @Override
- public boolean positionToIndex(int index)
- {
- throw notImplemented();
- }
}
/** A storage using memory mapped files, a.k.a {@link MappedByteBuffer}. */
@@ -977,7 +930,7 @@
long fileSize = Files.size(bufferFile.toPath());
final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
- final List<Cursor<ByteString, ByteString>> cursors = new ArrayList<>(bufferPositions.size() - 1);
+ final List<SequentialCursor<ByteString, ByteString>> cursors = new ArrayList<>(bufferPositions.size() - 1);
Iterator<Integer> it = bufferPositions.iterator();
if (it.hasNext())
{
@@ -1789,7 +1742,7 @@
@Override
public Void run(ReadableTransaction txn) throws Exception
{
- try (Cursor<ByteString, ByteString> cursor =
+ try (SequentialCursor<ByteString, ByteString> cursor =
new MergingCursor<ByteString, ByteString>(txn.openCursor(treeName), getMerger(treeName)))
{
while (cursor.next())
--
Gitblit v1.10.0