mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

boli
27.06.2007 05875933ae6929bc8e53f366ce116f2fc431fd46
opends/src/server/org/opends/server/backends/jeb/IndexBuilder.java
@@ -26,149 +26,23 @@
 */
package org.opends.server.backends.jeb;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.types.Entry;
import static org.opends.server.util.StaticUtils.getFileForPath;
import org.opends.server.types.DirectoryException;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Transaction;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.io.ByteArrayOutputStream;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FilenameFilter;
import java.io.FileOutputStream;
import java.io.IOException;
/**
 * This class is used to create an attribute index for an import process.
 * It is used as follows.
 * <pre>
 * startProcessing();
 * processEntry(entry);
 * processEntry(entry);
 * ...
 * stopProcessing();
 * merge();
 * </pre>
 * The interface that represents a index builder for the import process.
 */
public class IndexBuilder
public interface IndexBuilder
{
  /**
   * The import context.
   */
  private ImportContext importContext;
  /**
   * The index database.
   */
  private Index index;
  /**
   * The indexer to generate the index keys.
   */
  private Indexer indexer;
  /**
   * The write buffer.
   */
  ArrayList<IndexMod> buffer;
  /**
   * The write buffer size.
   */
  private int bufferSize;
  /**
   * Current output file number.
   */
  private int fileNumber = 0;
  /**
   * The index entry limit.
   */
  private int entryLimit;
  /**
   * A unique prefix for temporary files to prevent conflicts.
   */
  private String fileNamePrefix;
  /**
   * Indicates whether we are replacing existing data or not.
   */
  private boolean replaceExisting = false;
  private ByteArrayOutputStream addBytesStream = new ByteArrayOutputStream();
  private ByteArrayOutputStream delBytesStream = new ByteArrayOutputStream();
  private DataOutputStream addBytesDataStream;
  private DataOutputStream delBytesDataStream;
  /**
   * A file name filter to identify temporary files we have written.
   */
  private FilenameFilter filter = new FilenameFilter()
  {
    public boolean accept(File d, String name)
    {
      return name.startsWith(fileNamePrefix);
    }
  };
  /**
   * Construct an index builder.
   *
   * @param importContext The import context.
   * @param index The index database we are writing.
   * @param entryLimit The index entry limit.
   * @param bufferSize The amount of memory available for buffering.
   */
  public IndexBuilder(ImportContext importContext,
                      Index index, int entryLimit, long bufferSize)
  {
    this.importContext = importContext;
    this.index = index;
    this.indexer = index.indexer;
    this.entryLimit = entryLimit;
    this.bufferSize = (int)bufferSize/100;
    long tid = Thread.currentThread().getId();
    fileNamePrefix = index.getName() + "_" + tid + "_";
    replaceExisting =
         importContext.getLDIFImportConfig().appendToExistingData() &&
         importContext.getLDIFImportConfig().replaceExistingEntries();
    addBytesDataStream = new DataOutputStream(addBytesStream);
    delBytesDataStream = new DataOutputStream(delBytesStream);
  }
  /**
   * This method must be called before this object can process any
   * entries. It cleans up any temporary files left over from a
   * previous import.
   */
  public void startProcessing()
  {
    // Clean up any work files left over from a previous run.
    File tempDir = getFileForPath(
        importContext.getConfig().getBackendImportTempDirectory());
    File[] files = tempDir.listFiles(filter);
    if (files != null)
    {
      for (File f : files)
      {
        f.delete();
      }
    }
    buffer = new ArrayList<IndexMod>(bufferSize);
  }
  void startProcessing();
  /**
   * Indicates that the index thread should process the provided entry.
@@ -176,222 +50,19 @@
   * a new entry.
   * @param newEntry The new contents of the entry.
   * @param entryID The entry ID.
   * @throws DatabaseException If an error occurs in the JE database.
   * @throws IOException If an I/O error occurs while writing an intermediate
   * file.
   * @throws com.sleepycat.je.DatabaseException If an error occurs in the JE
   * database.
   * @throws java.io.IOException If an I/O error occurs while writing an
   * intermediate file.
   * @throws DirectoryException If an error occurs while processing the entry.
   */
  public void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID)
       throws DatabaseException, IOException
  {
    Transaction txn = null;
    // Update the index for this entry.
    if (oldEntry != null)
    {
      // This is an entry being replaced.
      Set<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>();
      Set<ASN1OctetString> delKeys = new HashSet<ASN1OctetString>();
      indexer.replaceEntry(txn, oldEntry, newEntry, addKeys, delKeys);
      for (ASN1OctetString k : delKeys)
      {
        removeID(k.value(), entryID);
      }
      for (ASN1OctetString k : addKeys)
      {
        insertID(k.value(), entryID);
      }
    }
    else
    {
      // This is a new entry.
      Set<ASN1OctetString> addKeys = new HashSet<ASN1OctetString>();
      indexer.indexEntry(txn, newEntry, addKeys);
      for (ASN1OctetString k : addKeys)
      {
        insertID(k.value(), entryID);
      }
    }
  }
  void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID)
      throws DatabaseException, IOException, DirectoryException;
  /**
   * Indicates that there will be no more updates.
   * @throws IOException If an I/O error occurs while writing an intermediate
   * file.
   */
  public void stopProcessing() throws IOException
  {
    flushBuffer();
  }
  /**
   * Get a statistic of the number of keys that reached the entry limit.
   *
   * @return The number of keys that reached the entry limit.
   */
  public int getEntryLimitExceededCount()
  {
    return index.getEntryLimitExceededCount();
  }
  /**
   * Record the insertion of an entry ID.
   * @param key The index key.
   * @param entryID The entry ID.
   * @throws IOException If an I/O error occurs while writing an intermediate
   * file.
   */
  private void insertID(byte[] key, EntryID entryID)
       throws IOException
  {
    if (buffer.size() >= bufferSize)
    {
      flushBuffer();
    }
    IndexMod kav = new IndexMod(key, entryID, false);
    buffer.add(kav);
  }
  /**
   * Record the deletion of an entry ID.
   * @param key The index key.
   * @param entryID The entry ID.
   * @throws IOException If an I/O error occurs while writing an intermediate
   * file.
   */
  private void removeID(byte[] key, EntryID entryID)
       throws IOException
  {
    if (buffer.size() >= bufferSize)
    {
      flushBuffer();
    }
    IndexMod kav = new IndexMod(key, entryID, true);
    buffer.add(kav);
  }
  /**
   * Called when the buffer is full. It first sorts the buffer using the same
   * key comparator used by the index database. Then it merges all the
   * IDs for the same key together and writes each key and its list of IDs
   * to an intermediate binary file.
   * A list of deleted IDs is only present if we are replacing existing entries.
   *
   * @throws IOException If an I/O error occurs while writing an intermediate
   * file.
   */
  private void flushBuffer() throws IOException
  {
    if (buffer.size() == 0)
    {
      return;
    }
    // Keys must be sorted before we can merge duplicates.
    IndexModComparator comparator;
    if (replaceExisting)
    {
      // The entry IDs may be out of order.
      // We must sort by key and ID.
      comparator = new IndexModComparator(indexer.getComparator(), true);
    }
    else
    {
      // The entry IDs are all new and are therefore already ordered.
      // We just need to sort by key.
      comparator = new IndexModComparator(indexer.getComparator(), false);
    }
    Collections.sort(buffer, comparator);
    // Start a new file.
    fileNumber++;
    String fileName = fileNamePrefix + String.valueOf(fileNumber);
    File file = new File(getFileForPath(
        importContext.getConfig().getBackendImportTempDirectory()),
                         fileName);
    BufferedOutputStream bufferedStream =
         new BufferedOutputStream(new FileOutputStream(file));
    DataOutputStream dataStream = new DataOutputStream(bufferedStream);
    // Reset the byte array output streams but preserve the underlying arrays.
    addBytesStream.reset();
    delBytesStream.reset();
    try
    {
      byte[] currentKey = null;
      for (IndexMod key : buffer)
      {
        byte[] keyString = key.key;
        if (!Arrays.equals(keyString,currentKey))
        {
          if (currentKey != null)
          {
            dataStream.writeInt(currentKey.length);
            dataStream.write(currentKey);
            dataStream.writeInt(addBytesStream.size());
            addBytesStream.writeTo(dataStream);
            if (replaceExisting)
            {
              dataStream.writeInt(delBytesStream.size());
              delBytesStream.writeTo(dataStream);
            }
          }
          currentKey = keyString;
          addBytesStream.reset();
          delBytesStream.reset();
        }
        if (key.isDelete)
        {
          delBytesDataStream.writeLong(key.value.longValue());
        }
        else
        {
          addBytesDataStream.writeLong(key.value.longValue());
        }
      }
      if (currentKey != null)
      {
        dataStream.writeInt(currentKey.length);
        dataStream.write(currentKey);
        dataStream.writeInt(addBytesStream.size());
        addBytesStream.writeTo(dataStream);
        if (replaceExisting)
        {
          dataStream.writeInt(delBytesStream.size());
          delBytesStream.writeTo(dataStream);
        }
      }
      buffer = new ArrayList<IndexMod>(bufferSize);
    }
    finally
    {
      dataStream.close();
    }
  }
  /**
   * Get a string that identifies this index builder.
   *
   * @return A string that identifies this index builder.
   */
  public String toString()
  {
    return indexer.toString();
  }
  void stopProcessing() throws IOException;
}