/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.backends.jeb;
import static org.opends.server.util.StaticUtils.getFileForPath;
import org.opends.server.types.*;
import java.util.*;
import java.io.*;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Transaction;
/**
* This class is used to create an VLV vlvIndex for an import process.
* It is used as follows.
*
* startProcessing();
* processEntry(entry);
* processEntry(entry);
* ...
* stopProcessing();
* merge();
*
*/
public class VLVIndexBuilder implements IndexBuilder
{
/**
* The import context.
*/
private ImportContext importContext;
/**
* The vlvIndex database.
*/
private VLVIndex vlvIndex;
/**
* The add write buffer.
*/
TreeMap addBuffer;
/**
* The delete write buffer.
*/
TreeMap delBuffer;
/**
* The write buffer size.
*/
private int bufferSize;
/**
* Current output file number.
*/
private int fileNumber = 0;
/**
* A unique prefix for temporary files to prevent conflicts.
*/
private String fileNamePrefix;
/**
* Indicates whether we are replacing existing data or not.
*/
private boolean replaceExisting = false;
private ByteArrayOutputStream addBytesStream = new ByteArrayOutputStream();
private ByteArrayOutputStream delBytesStream = new ByteArrayOutputStream();
private DataOutputStream addBytesDataStream;
private DataOutputStream delBytesDataStream;
/**
* A file name filter to identify temporary files we have written.
*/
private FilenameFilter filter = new FilenameFilter()
{
public boolean accept(File d, String name)
{
return name.startsWith(fileNamePrefix);
}
};
/**
* Construct an vlvIndex builder.
*
* @param importContext The import context.
* @param vlvIndex The vlvIndex database we are writing.
* @param bufferSize The amount of memory available for buffering.
*/
public VLVIndexBuilder(ImportContext importContext,
VLVIndex vlvIndex, long bufferSize)
{
this.importContext = importContext;
this.vlvIndex = vlvIndex;
this.bufferSize = (int)bufferSize/100;
long tid = Thread.currentThread().getId();
fileNamePrefix = vlvIndex.getName() + "_" + tid + "_";
replaceExisting =
importContext.getLDIFImportConfig().appendToExistingData() &&
importContext.getLDIFImportConfig().replaceExistingEntries();
addBytesDataStream = new DataOutputStream(addBytesStream);
delBytesDataStream = new DataOutputStream(delBytesStream);
}
/**
* {@inheritDoc}
*/
public void startProcessing()
{
// Clean up any work files left over from a previous run.
File tempDir = getFileForPath(
importContext.getConfig().getImportTempDirectory());
File[] files = tempDir.listFiles(filter);
if (files != null)
{
for (File f : files)
{
f.delete();
}
}
addBuffer = new TreeMap();
delBuffer = new TreeMap();
}
/**
* {@inheritDoc}
*/
public void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID)
throws DatabaseException, IOException, DirectoryException
{
Transaction txn = null;
SortValues newValues = new SortValues(entryID, newEntry,
vlvIndex.sortOrder);
// Update the vlvIndex for this entry.
if (oldEntry != null)
{
if(vlvIndex.shouldInclude(oldEntry))
{
// This is an entry being replaced.
SortValues oldValues = new SortValues(entryID, oldEntry,
vlvIndex.sortOrder);
removeValues(oldValues, entryID);
}
}
if(vlvIndex.shouldInclude(newEntry))
{
insertValues(newValues, entryID);
}
}
/**
* {@inheritDoc}
*/
public void stopProcessing() throws IOException
{
flushBuffer();
}
/**
* Record the insertion of an entry ID.
* @param sortValues The sort values.
* @param entryID The entry ID.
* @throws IOException If an I/O error occurs while writing an intermediate
* file.
*/
private void insertValues(SortValues sortValues, EntryID entryID)
throws IOException
{
if (addBuffer.size() + delBuffer.size() >= bufferSize)
{
flushBuffer();
}
addBuffer.put(sortValues, entryID);
}
/**
* Record the deletion of an entry ID.
* @param sortValues The sort values to remove.
* @param entryID The entry ID.
* @throws IOException If an I/O error occurs while writing an intermediate
* file.
*/
private void removeValues(SortValues sortValues, EntryID entryID)
throws IOException
{
if (addBuffer.size() + delBuffer.size() >= bufferSize)
{
flushBuffer();
}
delBuffer.remove(sortValues);
}
/**
* Called when the buffer is full. It first sorts the buffer using the same
* key comparator used by the vlvIndex database. Then it merges all the
* IDs for the same key together and writes each key and its list of IDs
* to an intermediate binary file.
* A list of deleted IDs is only present if we are replacing existing entries.
*
* @throws IOException If an I/O error occurs while writing an intermediate
* file.
*/
private void flushBuffer() throws IOException
{
if (addBuffer.size() + delBuffer.size() == 0)
{
return;
}
// Start a new file.
fileNumber++;
String fileName = fileNamePrefix + String.valueOf(fileNumber) + "_add";
File file = new File(getFileForPath(
importContext.getConfig().getImportTempDirectory()),
fileName);
BufferedOutputStream bufferedStream =
new BufferedOutputStream(new FileOutputStream(file));
DataOutputStream dataStream = new DataOutputStream(bufferedStream);
try
{
for (SortValues values : addBuffer.keySet())
{
dataStream.writeLong(values.getEntryID());
for(AttributeValue value : values.getValues())
{
if(value != null)
{
byte[] valueBytes = value.getValueBytes();
dataStream.writeInt(valueBytes.length);
dataStream.write(valueBytes);
}
else
{
dataStream.writeInt(0);
}
}
}
}
finally
{
dataStream.close();
}
if (replaceExisting)
{
fileName = fileNamePrefix + String.valueOf(fileNumber) + "_del";
file = new File(getFileForPath(
importContext.getConfig().getImportTempDirectory()),
fileName);
bufferedStream =
new BufferedOutputStream(new FileOutputStream(file));
dataStream = new DataOutputStream(bufferedStream);
try
{
for (SortValues values : delBuffer.keySet())
{
dataStream.writeLong(values.getEntryID());
for(AttributeValue value : values.getValues())
{
byte[] valueBytes = value.getValueBytes();
dataStream.writeInt(valueBytes.length);
dataStream.write(valueBytes);
}
}
}
finally
{
dataStream.close();
}
}
addBuffer = new TreeMap();
delBuffer = new TreeMap();
}
/**
* Get a string that identifies this vlvIndex builder.
*
* @return A string that identifies this vlvIndex builder.
*/
public String toString()
{
return vlvIndex.toString() + " builder";
}
}