From 6f092b277fa084796db106dfe497040c38db23a7 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 02 Apr 2015 00:16:00 +0000
Subject: [PATCH] OPENDJ-1711 - re-implement VLV support for pluggable backends

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java |  201 ++++++++++++++++++++++++++++++++-----------------
 1 files changed, 130 insertions(+), 71 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
index 62426db..0c865cc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
@@ -35,6 +35,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -515,26 +516,73 @@
     final int currentCount = count.get();
     final int beforeCount = vlvRequest.getBeforeCount();
     final int afterCount = vlvRequest.getAfterCount();
-
     final ByteString assertion = vlvRequest.getGreaterThanOrEqualAssertion();
     final ByteSequence encodedTargetAssertion =
         encodeTargetAssertion(sortOrder, assertion, searchOperation, currentCount);
-    final Cursor cursor = txn.openCursor(getName());
+    @SuppressWarnings("unchecked")
+    final Cursor<ByteString, ByteString> cursor = txn.openCursor(getName());
     try
     {
-      if (!cursor.positionToKeyOrNext(encodedTargetAssertion))
+      final LinkedList<Long> selectedIDs = new LinkedList<Long>();
+      int targetPosition = 0;
+
+      // Don't waste cycles looking for an assertion that does not match anything.
+      if (cursor.positionToKeyOrNext(encodedTargetAssertion) && cursor.positionToIndex(0))
       {
-        return newDefinedSet();
+        /*
+         * Unfortunately we need to iterate from the start of the index in order to correctly
+         * calculate the target position.
+         */
+        boolean targetFound = false;
+        int includedAfter = 0;
+        do
+        {
+          final ByteString key = cursor.getKey();
+          if (!targetFound)
+          {
+            selectedIDs.add(decodeEntryIDFromVLVKey(key));
+            if (encodedTargetAssertion.compareTo(key) > 0)
+            {
+              if (targetPosition >= beforeCount)
+              {
+                // Strip out unwanted results.
+                selectedIDs.removeFirst();
+              }
+              targetPosition++;
+            }
+            else
+            {
+              targetFound = true;
+            }
+          }
+          else if (includedAfter < afterCount)
+          {
+            selectedIDs.add(decodeEntryIDFromVLVKey(key));
+            includedAfter++;
+          }
+          else
+          {
+            break;
+          }
+        }
+        while (cursor.next());
       }
-      int count = afterCount;
-      for (int i = 0; cursor.previous() && i < beforeCount; i++, count++)
+      else
       {
-        // Empty block.
+        /*
+         * Treat an non-matching assertion as matching beyond the end of the index.
+         */
+        targetPosition = currentCount;
       }
-      final long[] selectedIDs = readRange(cursor, count, debugBuilder);
-      searchOperation.addResponseControl(new VLVResponseControl(count - afterCount, currentCount,
+      searchOperation.addResponseControl(new VLVResponseControl(targetPosition + 1, currentCount,
           LDAPResultCode.SUCCESS));
-      return newDefinedSet(selectedIDs);
+      final long[] result = new long[selectedIDs.size()];
+      int i = 0;
+      for (Long entryID : selectedIDs)
+      {
+        result[i++] = entryID;
+      }
+      return newDefinedSet(result);
     }
     finally
     {
@@ -625,11 +673,15 @@
     final Cursor cursor = txn.openCursor(getName());
     try
     {
-      if (!cursor.positionToIndex(startPos))
+      final long[] selectedIDs;
+      if (cursor.positionToIndex(startPos))
       {
-        return newDefinedSet();
+        selectedIDs = readRange(cursor, count, debugBuilder);
       }
-      final long[] selectedIDs = readRange(cursor, count, debugBuilder);
+      else
+      {
+        selectedIDs = new long[0];
+      }
       searchOperation.addResponseControl(new VLVResponseControl(targetOffset, currentCount, LDAPResultCode.SUCCESS));
       return newDefinedSet(selectedIDs);
     }
@@ -644,22 +696,26 @@
   {
     long[] selectedIDs = new long[count];
     int selectedPos = 0;
-    while (cursor.next() && selectedPos < count)
+    if (count > 0)
     {
-      final ByteString key = cursor.getKey();
-      if (logger.isTraceEnabled())
+      do
       {
-        logSearchKeyResult(key);
+        final ByteString key = cursor.getKey();
+        if (logger.isTraceEnabled())
+        {
+          logSearchKeyResult(key);
+        }
+        selectedIDs[selectedPos++] = decodeEntryIDFromVLVKey(key);
       }
-      selectedIDs[selectedPos++] = decodeEntryIDFromVLVKey(key);
-    }
-    if (selectedPos < count)
-    {
-      /*
-       * We don't have enough entries in the set to meet the requested page size, so we'll need to
-       * shorten the array.
-       */
-      selectedIDs = Arrays.copyOf(selectedIDs, selectedPos);
+      while (selectedPos < count && cursor.next());
+      if (selectedPos < count)
+      {
+        /*
+         * We don't have enough entries in the set to meet the requested page size, so we'll need to
+         * shorten the array.
+         */
+        selectedIDs = Arrays.copyOf(selectedIDs, selectedPos);
+      }
     }
     if (debugBuilder != null)
     {
@@ -714,48 +770,41 @@
   {
     for (final SortKey sortKey : sortOrder.getSortKeys())
     {
-      final ByteString value = findBestMatchingValue(sortKey, entry);
-      final MatchingRule matchingRule = sortKey.getAttributeType().getOrderingMatchingRule();
-      ByteString normalizedAttributeValue;
-      try
+      final AttributeType attributeType = sortKey.getAttributeType();
+      final MatchingRule matchingRule = attributeType.getOrderingMatchingRule();
+      final List<Attribute> attrList = entry.getAttribute(attributeType);
+      ByteString sortValue = null;
+      if (attrList != null)
       {
-        normalizedAttributeValue = matchingRule.normalizeAttributeValue(value);
-      }
-      catch (final DecodeException e)
-      {
-        /*
-         * This shouldn't happen because the attribute should have already been validated. If it
-         * does then treat the value as missing.
-         */
-        normalizedAttributeValue = ByteString.empty();
-      }
-      encodeVLVKeyValue(sortKey, normalizedAttributeValue, builder);
-    }
-  }
-
-  /**
-   * Returns the value contained in the entry which should be used for the provided sort key. For
-   * ascending order we select the highest value from the entry, and for descending order we select
-   * the lowest.
-   */
-  private static ByteString findBestMatchingValue(final SortKey sortKey, final Entry entry)
-  {
-    final List<Attribute> attrList = entry.getAttribute(sortKey.getAttributeType());
-    ByteString sortValue = null;
-    if (attrList != null)
-    {
-      for (Attribute a : attrList)
-      {
-        for (ByteString v : a)
+        for (Attribute a : attrList)
         {
-          if (sortValue == null || sortKey.compareValues(v, sortValue) < 0)
+          for (ByteString v : a)
           {
-            sortValue = v;
+            try
+            {
+              /*
+               * The RFC states that the lowest value of a multi-valued attribute should be used,
+               * regardless of the sort order.
+               */
+              final ByteString nv = matchingRule.normalizeAttributeValue(v);
+              if (sortValue == null || nv.compareTo(sortValue) < 0)
+              {
+                sortValue = nv;
+              }
+            }
+            catch (final DecodeException e)
+            {
+              /*
+               * This shouldn't happen because the attribute should have already been validated. If
+               * it does then treat the value as missing.
+               */
+              continue;
+            }
           }
         }
       }
+      encodeVLVKeyValue(sortKey, sortValue, builder);
     }
-    return sortValue != null ? sortValue : ByteString.empty();
   }
 
   private static void encodeVLVKeyValue(final SortKey sortKey, final ByteString normalizedAttributeValue,
@@ -763,19 +812,29 @@
   {
     final boolean ascending = sortKey.ascending();
     final byte separator = ascending ? (byte) 0x00 : (byte) 0xff;
-    final byte escape = ascending ? (byte) 0x01 : (byte) 0xfe;
-    final byte sortOrderMask = separator;
-
-    final int length = normalizedAttributeValue.length();
-    for (int i = 0; i < length; i++)
+    if (normalizedAttributeValue != null)
     {
-      final byte b = normalizedAttributeValue.byteAt(i);
-      if (b == separator || b == escape)
+      // Ensure that all keys sort before (ascending) or after (descending) missing keys.
+      builder.append(separator);
+
+      final byte escape = ascending ? (byte) 0x01 : (byte) 0xfe;
+      final byte sortOrderMask = separator;
+      final int length = normalizedAttributeValue.length();
+      for (int i = 0; i < length; i++)
       {
-        builder.append(escape);
+        final byte b = normalizedAttributeValue.byteAt(i);
+        if (b == separator || b == escape)
+        {
+          builder.append(escape);
+        }
+        // Invert the bits if this key is in descending order.
+        builder.append((byte) (b ^ sortOrderMask));
       }
-      // Invert the bits if this key is in descending order.
-      builder.append((byte) (b ^ sortOrderMask));
+    }
+    else
+    {
+      // Ensure that missing keys sort after (ascending) or before (descending) all other keys.
+      builder.append(ascending ? (byte) 0xff : (byte) 0x00);
     }
     builder.append(separator);
   }

--
Gitblit v1.10.0