From 05875933ae6929bc8e53f366ce116f2fc431fd46 Mon Sep 17 00:00:00 2001
From: boli <boli@localhost>
Date: Fri, 27 Jul 2007 21:06:35 +0000
Subject: [PATCH] These set of changes implement VLV and filter capability to OpenDS:
---
opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java | 353 ++--------------------------------------------------------
1 files changed, 12 insertions(+), 341 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java b/opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java
index 50e00e5..035993b 100644
--- a/opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java
+++ b/opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java
@@ -26,149 +26,23 @@
*/
package org.opends.server.backends.jeb;
-import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.types.Entry;
-import static org.opends.server.util.StaticUtils.getFileForPath;
-
+import org.opends.server.types.DirectoryException;
import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Transaction;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.io.ByteArrayOutputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.FileOutputStream;
import java.io.IOException;
/**
- * This class is used to create an attribute index for an import process.
- * It is used as follows.
- * <pre>
- * startProcessing();
- * processEntry(entry);
- * processEntry(entry);
- * ...
- * stopProcessing();
- * merge();
- * </pre>
+ * The interface that represents a index builder for the import process.
*/
-public class IndexBuilder
+public interface IndexBuilder
{
/**
- * The import context.
- */
- private ImportContext importContext;
-
- /**
- * The index database.
- */
- private Index index;
-
- /**
- * The indexer to generate the index keys.
- */
- private Indexer indexer;
-
- /**
- * The write buffer.
- */
- ArrayList<IndexMod> buffer;
-
- /**
- * The write buffer size.
- */
- private int bufferSize;
-
- /**
- * Current output file number.
- */
- private int fileNumber = 0;
-
- /**
- * The index entry limit.
- */
- private int entryLimit;
-
- /**
- * A unique prefix for temporary files to prevent conflicts.
- */
- private String fileNamePrefix;
-
- /**
- * Indicates whether we are replacing existing data or not.
- */
- private boolean replaceExisting = false;
-
-
- private ByteArrayOutputStream addBytesStream = new ByteArrayOutputStream();
- private ByteArrayOutputStream delBytesStream = new ByteArrayOutputStream();
-
- private DataOutputStream addBytesDataStream;
- private DataOutputStream delBytesDataStream;
-
- /**
- * A file name filter to identify temporary files we have written.
- */
- private FilenameFilter filter = new FilenameFilter()
- {
- public boolean accept(File d, String name)
- {
- return name.startsWith(fileNamePrefix);
- }
- };
-
- /**
- * Construct an index builder.
- *
- * @param importContext The import context.
- * @param index The index database we are writing.
- * @param entryLimit The index entry limit.
- * @param bufferSize The amount of memory available for buffering.
- */
- public IndexBuilder(ImportContext importContext,
- Index index, int entryLimit, long bufferSize)
- {
- this.importContext = importContext;
- this.index = index;
- this.indexer = index.indexer;
- this.entryLimit = entryLimit;
- this.bufferSize = (int)bufferSize/100;
- long tid = Thread.currentThread().getId();
- fileNamePrefix = index.getName() + "_" + tid + "_";
- replaceExisting =
- importContext.getLDIFImportConfig().appendToExistingData() &&
- importContext.getLDIFImportConfig().replaceExistingEntries();
- addBytesDataStream = new DataOutputStream(addBytesStream);
- delBytesDataStream = new DataOutputStream(delBytesStream);
- }
-
- /**
* This method must be called before this object can process any
* entries. It cleans up any temporary files left over from a
* previous import.
*/
- public void startProcessing()
- {
- // Clean up any work files left over from a previous run.
- File tempDir = getFileForPath(
- importContext.getConfig().getBackendImportTempDirectory());
- File[] files = tempDir.listFiles(filter);
- if (files != null)
- {
- for (File f : files)
- {
- f.delete();
- }
- }
-
- buffer = new ArrayList<IndexMod>(bufferSize);
- }
+ void startProcessing();
/**
* Indicates that the index thread should process the provided entry.
@@ -176,222 +50,19 @@
* a new entry.
* @param newEntry The new contents of the entry.
* @param entryID The entry ID.
- * @throws DatabaseException If an error occurs in the JE database.
- * @throws IOException If an I/O error occurs while writing an intermediate
- * file.
+ * @throws com.sleepycat.je.DatabaseException If an error occurs in the JE
+ * database.
+ * @throws java.io.IOException If an I/O error occurs while writing an
+ * intermediate file.
+ * @throws DirectoryException If an error occurs while processing the entry.
*/
- public void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID)
- throws DatabaseException, IOException
- {
- Transaction txn = null;
-
- // Update the index for this entry.
- if (oldEntry != null)
- {
- // This is an entry being replaced.
- Set<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>();
- Set<ASN1OctetString> delKeys = new HashSet<ASN1OctetString>();
-
- indexer.replaceEntry(txn, oldEntry, newEntry, addKeys, delKeys);
-
- for (ASN1OctetString k : delKeys)
- {
- removeID(k.value(), entryID);
- }
-
- for (ASN1OctetString k : addKeys)
- {
- insertID(k.value(), entryID);
- }
- }
- else
- {
- // This is a new entry.
- Set<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>();
- indexer.indexEntry(txn, newEntry, addKeys);
- for (ASN1OctetString k : addKeys)
- {
- insertID(k.value(), entryID);
- }
- }
-
- }
-
-
+ void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID)
+ throws DatabaseException, IOException, DirectoryException;
/**
* Indicates that there will be no more updates.
* @throws IOException If an I/O error occurs while writing an intermediate
* file.
*/
- public void stopProcessing() throws IOException
- {
- flushBuffer();
- }
-
-
-
- /**
- * Get a statistic of the number of keys that reached the entry limit.
- *
- * @return The number of keys that reached the entry limit.
- */
- public int getEntryLimitExceededCount()
- {
- return index.getEntryLimitExceededCount();
- }
-
- /**
- * Record the insertion of an entry ID.
- * @param key The index key.
- * @param entryID The entry ID.
- * @throws IOException If an I/O error occurs while writing an intermediate
- * file.
- */
- private void insertID(byte[] key, EntryID entryID)
- throws IOException
- {
- if (buffer.size() >= bufferSize)
- {
- flushBuffer();
- }
-
- IndexMod kav = new IndexMod(key, entryID, false);
- buffer.add(kav);
- }
-
- /**
- * Record the deletion of an entry ID.
- * @param key The index key.
- * @param entryID The entry ID.
- * @throws IOException If an I/O error occurs while writing an intermediate
- * file.
- */
- private void removeID(byte[] key, EntryID entryID)
- throws IOException
- {
- if (buffer.size() >= bufferSize)
- {
- flushBuffer();
- }
-
- IndexMod kav = new IndexMod(key, entryID, true);
- buffer.add(kav);
- }
-
- /**
- * Called when the buffer is full. It first sorts the buffer using the same
- * key comparator used by the index database. Then it merges all the
- * IDs for the same key together and writes each key and its list of IDs
- * to an intermediate binary file.
- * A list of deleted IDs is only present if we are replacing existing entries.
- *
- * @throws IOException If an I/O error occurs while writing an intermediate
- * file.
- */
- private void flushBuffer() throws IOException
- {
- if (buffer.size() == 0)
- {
- return;
- }
-
- // Keys must be sorted before we can merge duplicates.
- IndexModComparator comparator;
- if (replaceExisting)
- {
- // The entry IDs may be out of order.
- // We must sort by key and ID.
- comparator = new IndexModComparator(indexer.getComparator(), true);
- }
- else
- {
- // The entry IDs are all new and are therefore already ordered.
- // We just need to sort by key.
- comparator = new IndexModComparator(indexer.getComparator(), false);
- }
- Collections.sort(buffer, comparator);
-
- // Start a new file.
- fileNumber++;
- String fileName = fileNamePrefix + String.valueOf(fileNumber);
- File file = new File(getFileForPath(
- importContext.getConfig().getBackendImportTempDirectory()),
- fileName);
- BufferedOutputStream bufferedStream =
- new BufferedOutputStream(new FileOutputStream(file));
- DataOutputStream dataStream = new DataOutputStream(bufferedStream);
-
- // Reset the byte array output streams but preserve the underlying arrays.
- addBytesStream.reset();
- delBytesStream.reset();
-
- try
- {
- byte[] currentKey = null;
- for (IndexMod key : buffer)
- {
- byte[] keyString = key.key;
- if (!Arrays.equals(keyString,currentKey))
- {
- if (currentKey != null)
- {
- dataStream.writeInt(currentKey.length);
- dataStream.write(currentKey);
- dataStream.writeInt(addBytesStream.size());
- addBytesStream.writeTo(dataStream);
- if (replaceExisting)
- {
- dataStream.writeInt(delBytesStream.size());
- delBytesStream.writeTo(dataStream);
- }
- }
-
- currentKey = keyString;
- addBytesStream.reset();
- delBytesStream.reset();
- }
-
- if (key.isDelete)
- {
- delBytesDataStream.writeLong(key.value.longValue());
- }
- else
- {
- addBytesDataStream.writeLong(key.value.longValue());
- }
-
- }
-
- if (currentKey != null)
- {
- dataStream.writeInt(currentKey.length);
- dataStream.write(currentKey);
- dataStream.writeInt(addBytesStream.size());
- addBytesStream.writeTo(dataStream);
- if (replaceExisting)
- {
- dataStream.writeInt(delBytesStream.size());
- delBytesStream.writeTo(dataStream);
- }
- }
-
- buffer = new ArrayList<IndexMod>(bufferSize);
- }
- finally
- {
- dataStream.close();
- }
- }
-
- /**
- * Get a string that identifies this index builder.
- *
- * @return A string that identifies this index builder.
- */
- public String toString()
- {
- return indexer.toString();
- }
+ void stopProcessing() throws IOException;
}
-
--
Gitblit v1.10.0