From 05875933ae6929bc8e53f366ce116f2fc431fd46 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Fri, 27 Jul 2007 21:06:35 +0000
Subject: [PATCH] These set of changes implement VLV and filter capability to OpenDS:

---
 opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java |  353 ++--------------------------------------------------------
 1 files changed, 12 insertions(+), 341 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java b/opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java
index 50e00e5..035993b 100644
--- a/opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java
+++ b/opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java
@@ -26,149 +26,23 @@
  */
 package org.opends.server.backends.jeb;
 
-import org.opends.server.protocols.asn1.ASN1OctetString;
 import org.opends.server.types.Entry;
-import static org.opends.server.util.StaticUtils.getFileForPath;
-
+import org.opends.server.types.DirectoryException;
 import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Transaction;
 
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.io.ByteArrayOutputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.FileOutputStream;
 import java.io.IOException;
 
 /**
- * This class is used to create an attribute index for an import process.
- * It is used as follows.
- * <pre>
- * startProcessing();
- * processEntry(entry);
- * processEntry(entry);
- * ...
- * stopProcessing();
- * merge();
- * </pre>
+ * The interface that represents a index builder for the import process.
  */
-public class IndexBuilder
+public interface IndexBuilder
 {
   /**
-   * The import context.
-   */
-  private ImportContext importContext;
-
-  /**
-   * The index database.
-   */
-  private Index index;
-
-  /**
-   * The indexer to generate the index keys.
-   */
-  private Indexer indexer;
-
-  /**
-   * The write buffer.
-   */
-  ArrayList<IndexMod> buffer;
-
-  /**
-   * The write buffer size.
-   */
-  private int bufferSize;
-
-  /**
-   * Current output file number.
-   */
-  private int fileNumber = 0;
-
-  /**
-   * The index entry limit.
-   */
-  private int entryLimit;
-
-  /**
-   * A unique prefix for temporary files to prevent conflicts.
-   */
-  private String fileNamePrefix;
-
-  /**
-   * Indicates whether we are replacing existing data or not.
-   */
-  private boolean replaceExisting = false;
-
-
-  private ByteArrayOutputStream addBytesStream = new ByteArrayOutputStream();
-  private ByteArrayOutputStream delBytesStream = new ByteArrayOutputStream();
-
-  private DataOutputStream addBytesDataStream;
-  private DataOutputStream delBytesDataStream;
-
-  /**
-   * A file name filter to identify temporary files we have written.
-   */
-  private FilenameFilter filter = new FilenameFilter()
-  {
-    public boolean accept(File d, String name)
-    {
-      return name.startsWith(fileNamePrefix);
-    }
-  };
-
-  /**
-   * Construct an index builder.
-   *
-   * @param importContext The import context.
-   * @param index The index database we are writing.
-   * @param entryLimit The index entry limit.
-   * @param bufferSize The amount of memory available for buffering.
-   */
-  public IndexBuilder(ImportContext importContext,
-                      Index index, int entryLimit, long bufferSize)
-  {
-    this.importContext = importContext;
-    this.index = index;
-    this.indexer = index.indexer;
-    this.entryLimit = entryLimit;
-    this.bufferSize = (int)bufferSize/100;
-    long tid = Thread.currentThread().getId();
-    fileNamePrefix = index.getName() + "_" + tid + "_";
-    replaceExisting =
-         importContext.getLDIFImportConfig().appendToExistingData() &&
-         importContext.getLDIFImportConfig().replaceExistingEntries();
-    addBytesDataStream = new DataOutputStream(addBytesStream);
-    delBytesDataStream = new DataOutputStream(delBytesStream);
-  }
-
-  /**
    * This method must be called before this object can process any
    * entries. It cleans up any temporary files left over from a
    * previous import.
    */
-  public void startProcessing()
-  {
-    // Clean up any work files left over from a previous run.
-    File tempDir = getFileForPath(
-        importContext.getConfig().getBackendImportTempDirectory());
-    File[] files = tempDir.listFiles(filter);
-    if (files != null)
-    {
-      for (File f : files)
-      {
-        f.delete();
-      }
-    }
-
-    buffer = new ArrayList<IndexMod>(bufferSize);
-  }
+  void startProcessing();
 
   /**
    * Indicates that the index thread should process the provided entry.
@@ -176,222 +50,19 @@
    * a new entry.
    * @param newEntry The new contents of the entry.
    * @param entryID The entry ID.
-   * @throws DatabaseException If an error occurs in the JE database.
-   * @throws IOException If an I/O error occurs while writing an intermediate
-   * file.
+   * @throws com.sleepycat.je.DatabaseException If an error occurs in the JE
+   * database.
+   * @throws java.io.IOException If an I/O error occurs while writing an
+   * intermediate file.
+   * @throws DirectoryException If an error occurs while processing the entry.
    */
-  public void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID)
-       throws DatabaseException, IOException
-  {
-    Transaction txn = null;
-
-    // Update the index for this entry.
-    if (oldEntry != null)
-    {
-      // This is an entry being replaced.
-      Set<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>();
-      Set<ASN1OctetString> delKeys = new HashSet<ASN1OctetString>();
-
-      indexer.replaceEntry(txn, oldEntry, newEntry, addKeys, delKeys);
-
-      for (ASN1OctetString k : delKeys)
-      {
-        removeID(k.value(), entryID);
-      }
-
-      for (ASN1OctetString k : addKeys)
-      {
-        insertID(k.value(), entryID);
-      }
-    }
-    else
-    {
-      // This is a new entry.
-      Set<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>();
-      indexer.indexEntry(txn, newEntry, addKeys);
-      for (ASN1OctetString k : addKeys)
-      {
-        insertID(k.value(), entryID);
-      }
-    }
-
-  }
-
-
+  void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID)
+      throws DatabaseException, IOException, DirectoryException;
 
   /**
    * Indicates that there will be no more updates.
    * @throws IOException If an I/O error occurs while writing an intermediate
    * file.
    */
-  public void stopProcessing() throws IOException
-  {
-    flushBuffer();
-  }
-
-
-
-  /**
-   * Get a statistic of the number of keys that reached the entry limit.
-   *
-   * @return The number of keys that reached the entry limit.
-   */
-  public int getEntryLimitExceededCount()
-  {
-    return index.getEntryLimitExceededCount();
-  }
-
-  /**
-   * Record the insertion of an entry ID.
-   * @param key The index key.
-   * @param entryID The entry ID.
-   * @throws IOException If an I/O error occurs while writing an intermediate
-   * file.
-   */
-  private void insertID(byte[] key, EntryID entryID)
-       throws IOException
-  {
-    if (buffer.size() >= bufferSize)
-    {
-      flushBuffer();
-    }
-
-    IndexMod kav = new IndexMod(key, entryID, false);
-    buffer.add(kav);
-  }
-
-  /**
-   * Record the deletion of an entry ID.
-   * @param key The index key.
-   * @param entryID The entry ID.
-   * @throws IOException If an I/O error occurs while writing an intermediate
-   * file.
-   */
-  private void removeID(byte[] key, EntryID entryID)
-       throws IOException
-  {
-    if (buffer.size() >= bufferSize)
-    {
-      flushBuffer();
-    }
-
-    IndexMod kav = new IndexMod(key, entryID, true);
-    buffer.add(kav);
-  }
-
-  /**
-   * Called when the buffer is full. It first sorts the buffer using the same
-   * key comparator used by the index database. Then it merges all the
-   * IDs for the same key together and writes each key and its list of IDs
-   * to an intermediate binary file.
-   * A list of deleted IDs is only present if we are replacing existing entries.
-   *
-   * @throws IOException If an I/O error occurs while writing an intermediate
-   * file.
-   */
-  private void flushBuffer() throws IOException
-  {
-    if (buffer.size() == 0)
-    {
-      return;
-    }
-
-    // Keys must be sorted before we can merge duplicates.
-    IndexModComparator comparator;
-    if (replaceExisting)
-    {
-      // The entry IDs may be out of order.
-      // We must sort by key and ID.
-      comparator = new IndexModComparator(indexer.getComparator(), true);
-    }
-    else
-    {
-      // The entry IDs are all new and are therefore already ordered.
-      // We just need to sort by key.
-      comparator = new IndexModComparator(indexer.getComparator(), false);
-    }
-    Collections.sort(buffer, comparator);
-
-    // Start a new file.
-    fileNumber++;
-    String fileName = fileNamePrefix + String.valueOf(fileNumber);
-    File file = new File(getFileForPath(
-        importContext.getConfig().getBackendImportTempDirectory()),
-                         fileName);
-    BufferedOutputStream bufferedStream =
-         new BufferedOutputStream(new FileOutputStream(file));
-    DataOutputStream dataStream = new DataOutputStream(bufferedStream);
-
-    // Reset the byte array output streams but preserve the underlying arrays.
-    addBytesStream.reset();
-    delBytesStream.reset();
-
-    try
-    {
-      byte[] currentKey = null;
-      for (IndexMod key : buffer)
-      {
-        byte[] keyString = key.key;
-        if (!Arrays.equals(keyString,currentKey))
-        {
-          if (currentKey != null)
-          {
-            dataStream.writeInt(currentKey.length);
-            dataStream.write(currentKey);
-            dataStream.writeInt(addBytesStream.size());
-            addBytesStream.writeTo(dataStream);
-            if (replaceExisting)
-            {
-              dataStream.writeInt(delBytesStream.size());
-              delBytesStream.writeTo(dataStream);
-            }
-          }
-
-          currentKey = keyString;
-          addBytesStream.reset();
-          delBytesStream.reset();
-        }
-
-        if (key.isDelete)
-        {
-          delBytesDataStream.writeLong(key.value.longValue());
-        }
-        else
-        {
-          addBytesDataStream.writeLong(key.value.longValue());
-        }
-
-      }
-
-      if (currentKey != null)
-      {
-        dataStream.writeInt(currentKey.length);
-        dataStream.write(currentKey);
-        dataStream.writeInt(addBytesStream.size());
-        addBytesStream.writeTo(dataStream);
-        if (replaceExisting)
-        {
-          dataStream.writeInt(delBytesStream.size());
-          delBytesStream.writeTo(dataStream);
-        }
-      }
-
-      buffer = new ArrayList<IndexMod>(bufferSize);
-    }
-    finally
-    {
-      dataStream.close();
-    }
-  }
-
-  /**
-   * Get a string that identifies this index builder.
-   *
-   * @return A string that identifies this index builder.
-   */
-  public String toString()
-  {
-    return indexer.toString();
-  }
+  void stopProcessing() throws IOException;
 }
-

--
Gitblit v1.10.0