From 039ee58ed643a74b0ebcacc41cf8af0c0de76c1d Mon Sep 17 00:00:00 2001
From: dugan <dugan@localhost>
Date: Mon, 17 Mar 2008 15:41:50 +0000
Subject: [PATCH] These changes remove the temporary file limitation from import-ldif. Several other changes were made also:
---
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/EntryContainer.java | 26
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java | 1096 ++++++++++++++++
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/AttributeIndex.java | 45
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/IntegerImportIDSet.java | 354 +++++
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/DNContext.java | 238 ++
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java | 83 +
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/BackendImpl.java | 70
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/JebFormat.java | 24
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ID2Entry.java | 2
/dev/null | 473 ------
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkElement.java | 104 +
opendj-sdk/opends/resource/config/config.ldif | 3
opendj-sdk/opends/src/messages/messages/jeb.properties | 29
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/LongImportIDSet.java | 405 +++++
opendj-sdk/opends/tests/unit-tests-testng/resource/config-changes.ldif | 15
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/RootContainer.java | 31
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java | 457 ++++++
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/Index.java | 90 +
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java | 359 +++++
opendj-sdk/opends/resource/schema/02-config.ldif | 18
opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml | 101 -
21 files changed, 3,324 insertions(+), 699 deletions(-)
diff --git a/opendj-sdk/opends/resource/config/config.ldif b/opendj-sdk/opends/resource/config/config.ldif
index fca2328..6505f36 100644
--- a/opendj-sdk/opends/resource/config/config.ldif
+++ b/opendj-sdk/opends/resource/config/config.ldif
@@ -181,10 +181,7 @@
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-subtree-delete-batch-size: 5000
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-temp-directory: import-tmp
-ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
-ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
diff --git a/opendj-sdk/opends/resource/schema/02-config.ldif b/opendj-sdk/opends/resource/schema/02-config.ldif
index b5375b7..a3be337 100644
--- a/opendj-sdk/opends/resource/schema/02-config.ldif
+++ b/opendj-sdk/opends/resource/schema/02-config.ldif
@@ -1085,16 +1085,6 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.26027.1.1.220
- NAME 'ds-cfg-import-temp-directory'
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
- SINGLE-VALUE
- X-ORIGIN 'OpenDS Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.26027.1.1.221
- NAME 'ds-cfg-import-buffer-size'
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
- SINGLE-VALUE
- X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.222
NAME 'ds-cfg-import-queue-size'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
@@ -1155,11 +1145,6 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.26027.1.1.234
- NAME 'ds-cfg-import-pass-size'
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
- SINGLE-VALUE
- X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.235
NAME 'ds-cfg-replication-server-id'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
@@ -2241,13 +2226,10 @@
MAY ( ds-cfg-index-entry-limit $
ds-cfg-subtree-delete-size-limit $
ds-cfg-preload-time-limit $
- ds-cfg-import-temp-directory $
- ds-cfg-import-buffer-size $
ds-cfg-import-queue-size $
ds-cfg-import-thread-count $
ds-cfg-entries-compressed $
ds-cfg-deadlock-retry-limit $
- ds-cfg-import-pass-size $
ds-cfg-db-directory-permissions $
ds-cfg-db-cache-percent $
ds-cfg-subtree-delete-batch-size $
diff --git a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml
index f33095b..ac512a8 100644
--- a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml
+++ b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml
@@ -31,7 +31,7 @@
xmlns:ldap="http://www.opends.org/admin-ldap"
xmlns:cli="http://www.opends.org/admin-cli">
<adm:synopsis>
- The
+ The
<adm:user-friendly-name />
uses the Berkeley DB Java Edition to store user-provided data in a local
repository.
@@ -246,73 +246,9 @@
</ldap:attribute>
</adm:profile>
</adm:property>
- <adm:property name="import-buffer-size" advanced="true">
- <adm:synopsis>
- Specifies the amount of memory that should be used as an internal
- buffer for index information when processing an LDIF import.
- </adm:synopsis>
- <adm:requires-admin-action>
- <adm:none>
- <adm:synopsis>
- Changes do not take effect for any import that may already
- be in progress.
- </adm:synopsis>
- </adm:none>
- </adm:requires-admin-action>
- <adm:default-behavior>
- <adm:defined>
- <adm:value>256mb</adm:value>
- </adm:defined>
- </adm:default-behavior>
- <adm:syntax>
- <adm:size lower-limit="10mb" />
- </adm:syntax>
- <adm:profile name="ldap">
- <ldap:attribute>
- <ldap:name>ds-cfg-import-buffer-size</ldap:name>
- </ldap:attribute>
- </adm:profile>
- </adm:property>
- <adm:property name="import-pass-size" advanced="true">
- <adm:synopsis>
- Specifies the maximum number of entries that should be imported in
- each import pass.
- </adm:synopsis>
- <adm:description>
- An import pass consists of the processing required to import a set
- of entries as well as the index post-processing required to index
- those entries. A value of zero for this property indicates that
- all entries should be processed in a single pass, which is the
- recommended configuration for most deployments, although a
- non-zero value may be required when importing a very large number
- of entries if the amount of memory required for index
- post-processing exceeds the total amount available to the server.
- </adm:description>
- <adm:requires-admin-action>
- <adm:none>
- <adm:synopsis>
- Changes do not take effect for any import that may already
- be in progress.
- </adm:synopsis>
- </adm:none>
- </adm:requires-admin-action>
- <adm:default-behavior>
- <adm:defined>
- <adm:value>0</adm:value>
- </adm:defined>
- </adm:default-behavior>
- <adm:syntax>
- <adm:integer lower-limit="0" upper-limit="2147483647" />
- </adm:syntax>
- <adm:profile name="ldap">
- <ldap:attribute>
- <ldap:name>ds-cfg-import-pass-size</ldap:name>
- </ldap:attribute>
- </adm:profile>
- </adm:property>
<adm:property name="import-queue-size" advanced="true">
<adm:synopsis>
- Specifies the size (in number of entries) of the queue that is
+ Specifies the size (in number of entries) of the queue that is
used to hold the entries read during an LDIF import.
</adm:synopsis>
<adm:requires-admin-action>
@@ -337,39 +273,6 @@
</ldap:attribute>
</adm:profile>
</adm:property>
- <adm:property name="import-temp-directory" mandatory="true">
- <adm:synopsis>
- Specifies the location of the directory that is used to hold
- temporary information during the index post-processing phase of an LDIF import.
- </adm:synopsis>
- <adm:description>
- The specified directory is only used while an import is in
- progress and the files created in this directory are deleted
- as they are processed. It may be an absolute path or one that is
- relative to the instance root directory.
- </adm:description>
- <adm:requires-admin-action>
- <adm:none>
- <adm:synopsis>
- Changes do not take effect for any import that may already
- be in progress.
- </adm:synopsis>
- </adm:none>
- </adm:requires-admin-action>
- <adm:default-behavior>
- <adm:defined>
- <adm:value>import-tmp</adm:value>
- </adm:defined>
- </adm:default-behavior>
- <adm:syntax>
- <adm:string />
- </adm:syntax>
- <adm:profile name="ldap">
- <ldap:attribute>
- <ldap:name>ds-cfg-import-temp-directory</ldap:name>
- </ldap:attribute>
- </adm:profile>
- </adm:property>
<adm:property name="import-thread-count" advanced="true">
<adm:synopsis>
Specifies the number of threads that is used for concurrent
diff --git a/opendj-sdk/opends/src/messages/messages/jeb.properties b/opendj-sdk/opends/src/messages/messages/jeb.properties
index 57fb4c6..20ac108 100644
--- a/opendj-sdk/opends/src/messages/messages/jeb.properties
+++ b/opendj-sdk/opends/src/messages/messages/jeb.properties
@@ -354,3 +354,32 @@
INFO_JEB_IMPORT_STARTING_173=%s starting import (build %s, R%d)
SEVERE_ERR_JEB_DIRECTORY_DOES_NOT_EXIST_174=The backend database directory \
'%s' does not exist
+SEVERE_ERR_JEB_IMPORT_LDIF_ABORT_175=The import was aborted because an \
+ uncaught exception was thrown during processing
+INFO_JEB_IMPORT_LDIF_ROOTCONTAINER_CLOSE_176=Import LDIF environment close \
+ took %d seconds
+INFO_JEB_IMPORT_LDIF_BUFFER_FLUSH_177=Begin substring buffer flush of %d \
+ elements. Buffer total access: %d buffer hits: %d
+INFO_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED_178=Substring buffer flush \
+ completed in %d seconds
+INFO_JEB_IMPORT_LDIF_FINAL_CLEAN_179=Begin final cleaner run
+INFO_JEB_IMPORT_LDIF_CLEAN_180=Begin cleaner run
+INFO_JEB_IMPORT_LDIF_CLEANER_RUN_DONE_181=Cleaner run took %d seconds %d logs \
+ removed
+INFO_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS_182=Cleaner will remove %d logs
+INFO_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM_184=Available buffer memory %d bytes is \
+ below the minimum value of %d bytes. Setting available buffer memory to \
+ the minimum
+INFO_JEB_IMPORT_LDIF_MEMORY_INFO_185=Using DB cache bytes: %d available \
+ substring buffer bytes: %d
+INFO_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM_186=Available buffer memory %d \
+ bytes is below the minimum value of %d bytes allowed for a single import \
+ context. Setting context available buffer memory to the minimum
+INFO_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS_187=Checkpoints performed: %d
+INFO_JEB_IMPORT_LDIF_CLEANER_STATS_188=Cleaner runs: %d files deleted: %d \
+ entries read: %d IN nodes cleaned: %d
+INFO_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS_189=Eviction in progress. Passes: \
+ %d nodes evicted: %d BIN nodes stripped: %d
+INFO_JEB_IMPORT_LDIF_EVICTION_DETECTED_190=Eviction detected after importing \
+ %d entries
+
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/AttributeIndex.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/AttributeIndex.java
index b67e351..451e58d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/AttributeIndex.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/AttributeIndex.java
@@ -1631,4 +1631,49 @@
builder.append(indexConfig.getAttribute().getNameOrOID());
return builder.toString();
}
+
+ /**
+ * Return the equality index.
+ *
+ * @return The equality index.
+ */
+ public Index getEqualityIndex() {
+ return equalityIndex;
+ }
+
+ /**
+ * Return the approximate index.
+ *
+ * @return The approximate index.
+ */
+ public Index getApproximateIndex() {
+ return approximateIndex;
+ }
+
+ /**
+ * Return the ordering index.
+ *
+ * @return The ordering index.
+ */
+ public Index getOrderingIndex() {
+ return orderingIndex;
+ }
+
+ /**
+ * Return the substring index.
+ *
+ * @return The substring index.
+ */
+ public Index getSubstringIndex() {
+ return substringIndex;
+ }
+
+ /**
+ * Return the presence index.
+ *
+ * @return The presence index.
+ */
+ public Index getPresenceIndex() {
+ return presenceIndex;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/AttributeIndexBuilder.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/AttributeIndexBuilder.java
deleted file mode 100644
index 86ec878..0000000
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/AttributeIndexBuilder.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2008 Sun Microsystems, Inc.
- */
-package org.opends.server.backends.jeb;
-
-import org.opends.server.types.Entry;
-import static org.opends.server.util.StaticUtils.getFileForPath;
-
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Transaction;
-
-import java.util.*;
-import java.io.ByteArrayOutputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-/**
- * This class is used to create an attribute index for an import process.
- * It is used as follows.
- * <pre>
- * startProcessing();
- * processEntry(entry);
- * processEntry(entry);
- * ...
- * stopProcessing();
- * merge();
- * </pre>
- */
-public class AttributeIndexBuilder implements IndexBuilder
-{
- /**
- * The directory in which temporary merge files are held.
- */
- private final File tempDir;
-
- /**
- * The index database.
- */
- private final Index index;
-
- /**
- * The indexer to generate the index keys.
- */
- private final Indexer indexer;
-
- /**
- * The write buffer.
- */
- private ArrayList<IndexMod> buffer;
-
- /**
- * The write buffer size.
- */
- private final int bufferSize;
-
- /**
- * Current output file number.
- */
- private int fileNumber = 0;
-
- /**
- * A unique prefix for temporary files to prevent conflicts.
- */
- private final String fileNamePrefix;
-
- /**
- * Indicates whether we are replacing existing data or not.
- */
- private final boolean replaceExisting;
-
-
- private final ByteArrayOutputStream addBytesStream =
- new ByteArrayOutputStream();
- private final ByteArrayOutputStream delBytesStream =
- new ByteArrayOutputStream();
-
- private final DataOutputStream addBytesDataStream;
- private final DataOutputStream delBytesDataStream;
-
- /**
- * A file name filter to identify temporary files we have written.
- */
- private final FilenameFilter filter = new FilenameFilter()
- {
- public boolean accept(File d, String name)
- {
- return name.startsWith(fileNamePrefix);
- }
- };
-
- /**
- * Construct an index builder.
- *
- * @param importContext The import context.
- * @param index The index database we are writing.
- * @param entryLimit The index entry limit.
- * @param bufferSize The amount of memory available for buffering.
- */
- public AttributeIndexBuilder(ImportContext importContext,
- Index index, int entryLimit, long bufferSize)
- {
- File parentDir = getFileForPath(importContext.getConfig()
- .getImportTempDirectory());
- this.tempDir = new File(parentDir,
- importContext.getConfig().getBackendId());
-
- this.index = index;
- this.indexer = index.indexer;
- this.bufferSize = (int)bufferSize/100;
- long tid = Thread.currentThread().getId();
- fileNamePrefix = index.getName() + "_" + tid + "_";
- replaceExisting =
- importContext.getLDIFImportConfig().appendToExistingData() &&
- importContext.getLDIFImportConfig().replaceExistingEntries();
- addBytesDataStream = new DataOutputStream(addBytesStream);
- delBytesDataStream = new DataOutputStream(delBytesStream);
- }
-
- /**
- * {@inheritDoc}
- */
- public void startProcessing()
- {
- // Clean up any work files left over from a previous run.
- File[] files = tempDir.listFiles(filter);
- if (files != null)
- {
- for (File f : files)
- {
- f.delete();
- }
- }
-
- buffer = new ArrayList<IndexMod>(bufferSize);
- }
-
- /**
- * {@inheritDoc}
- */
- public void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID)
- throws DatabaseException, IOException
- {
- Transaction txn = null;
-
- // Update the index for this entry.
- if (oldEntry != null)
- {
- // This is an entry being replaced.
- TreeSet<byte[]> addKeys = new TreeSet<byte[]>(indexer.getComparator());
- TreeSet<byte[]> delKeys = new TreeSet<byte[]>(indexer.getComparator());
-
- indexer.replaceEntry(txn, oldEntry, newEntry, addKeys, delKeys);
-
- for (byte[] k : delKeys)
- {
- removeID(k, entryID);
- }
-
- for (byte[] k : addKeys)
- {
- insertID(k, entryID);
- }
- }
- else
- {
- // This is a new entry.
- TreeSet<byte[]> addKeys = new TreeSet<byte[]>(indexer.getComparator());
- indexer.indexEntry(txn, newEntry, addKeys);
- for (byte[] k : addKeys)
- {
- insertID(k, entryID);
- }
- }
-
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public void stopProcessing() throws IOException
- {
- flushBuffer();
- }
-
-
-
- /**
- * Get a statistic of the number of keys that reached the entry limit.
- *
- * @return The number of keys that reached the entry limit.
- */
- public int getEntryLimitExceededCount()
- {
- return index.getEntryLimitExceededCount();
- }
-
-
-
- /**
- * Record the insertion of an entry ID.
- * @param key The index key.
- * @param entryID The entry ID.
- * @throws IOException If an I/O error occurs while writing an intermediate
- * file.
- */
- private void insertID(byte[] key, EntryID entryID)
- throws IOException
- {
- if (buffer.size() >= bufferSize)
- {
- flushBuffer();
- }
-
- IndexMod kav = new IndexMod(key, entryID, false);
- buffer.add(kav);
- }
-
- /**
- * Record the deletion of an entry ID.
- * @param key The index key.
- * @param entryID The entry ID.
- * @throws IOException If an I/O error occurs while writing an intermediate
- * file.
- */
- private void removeID(byte[] key, EntryID entryID)
- throws IOException
- {
- if (buffer.size() >= bufferSize)
- {
- flushBuffer();
- }
-
- IndexMod kav = new IndexMod(key, entryID, true);
- buffer.add(kav);
- }
-
- /**
- * Called when the buffer is full. It first sorts the buffer using the same
- * key comparator used by the index database. Then it merges all the
- * IDs for the same key together and writes each key and its list of IDs
- * to an intermediate binary file.
- * A list of deleted IDs is only present if we are replacing existing entries.
- *
- * @throws IOException If an I/O error occurs while writing an intermediate
- * file.
- */
- private void flushBuffer() throws IOException
- {
- if (buffer.size() == 0)
- {
- return;
- }
-
- // Keys must be sorted before we can merge duplicates.
- IndexModComparator comparator;
- if (replaceExisting)
- {
- // The entry IDs may be out of order.
- // We must sort by key and ID.
- comparator = new IndexModComparator(indexer.getComparator(), true);
- }
- else
- {
- // The entry IDs are all new and are therefore already ordered.
- // We just need to sort by key.
- comparator = new IndexModComparator(indexer.getComparator(), false);
- }
- Collections.sort(buffer, comparator);
-
- // Start a new file.
- fileNumber++;
- String fileName = fileNamePrefix + String.valueOf(fileNumber);
- File file = new File(tempDir, fileName);
- BufferedOutputStream bufferedStream =
- new BufferedOutputStream(new FileOutputStream(file));
- DataOutputStream dataStream = new DataOutputStream(bufferedStream);
-
- // Reset the byte array output streams but preserve the underlying arrays.
- addBytesStream.reset();
- delBytesStream.reset();
-
- try
- {
- byte[] currentKey = null;
- for (IndexMod key : buffer)
- {
- byte[] keyString = key.key;
- if (!Arrays.equals(keyString,currentKey))
- {
- if (currentKey != null)
- {
- dataStream.writeInt(currentKey.length);
- dataStream.write(currentKey);
- dataStream.writeInt(addBytesStream.size());
- addBytesStream.writeTo(dataStream);
- if (replaceExisting)
- {
- dataStream.writeInt(delBytesStream.size());
- delBytesStream.writeTo(dataStream);
- }
- }
-
- currentKey = keyString;
- addBytesStream.reset();
- delBytesStream.reset();
- }
-
- if (key.isDelete)
- {
- delBytesDataStream.writeLong(key.value.longValue());
- }
- else
- {
- addBytesDataStream.writeLong(key.value.longValue());
- }
-
- }
-
- if (currentKey != null)
- {
- dataStream.writeInt(currentKey.length);
- dataStream.write(currentKey);
- dataStream.writeInt(addBytesStream.size());
- addBytesStream.writeTo(dataStream);
- if (replaceExisting)
- {
- dataStream.writeInt(delBytesStream.size());
- delBytesStream.writeTo(dataStream);
- }
- }
-
- buffer = new ArrayList<IndexMod>(bufferSize);
- }
- finally
- {
- dataStream.close();
- }
- }
-
- /**
- * Get a string that identifies this index builder.
- *
- * @return A string that identifies this index builder.
- */
- public String toString()
- {
- return indexer.toString();
- }
-}
-
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/BackendImpl.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/BackendImpl.java
index 56d799a..e54ecd9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/BackendImpl.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/BackendImpl.java
@@ -74,6 +74,7 @@
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.types.DN;
+import org.opends.server.backends.jeb.importLDIF.Importer;
/**
* This is an implementation of a Directory Server Backend which stores entries
@@ -1166,50 +1167,31 @@
try
{
EnvironmentConfig envConfig =
- ConfigurableEnvironment.parseConfigEntry(cfg);
- /**
- envConfig.setConfigParam("je.env.runCleaner", "false");
- envConfig.setConfigParam("je.log.numBuffers", "2");
- envConfig.setConfigParam("je.log.bufferSize", "15000000");
- envConfig.setConfigParam("je.log.totalBufferBytes", "30000000");
- envConfig.setConfigParam("je.log.fileMax", "100000000");
- **/
-
- if (importConfig.appendToExistingData())
- {
- envConfig.setReadOnly(false);
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setTxnNoSync(true);
- envConfig.setConfigParam("je.env.isLocking", "true");
- envConfig.setConfigParam("je.env.runCheckpointer", "false");
- }
- else if(importConfig.clearBackend() || cfg.getBaseDN().size() <= 1)
- {
- // We have the writer lock on the environment, now delete the
- // environment and re-open it. Only do this when we are
- // importing to all the base DNs in the backend or if the backend only
- // have one base DN.
- File parentDirectory = getFileForPath(cfg.getDBDirectory());
- File backendDirectory = new File(parentDirectory, cfg.getBackendId());
- // If the backend does not exist the import will create it.
- if (backendDirectory.exists())
- {
- EnvManager.removeFiles(backendDirectory.getPath());
+ ConfigurableEnvironment.parseConfigEntry(cfg);
+ if(!importConfig.appendToExistingData()) {
+ if(importConfig.clearBackend() || cfg.getBaseDN().size() <= 1) {
+ // We have the writer lock on the environment, now delete the
+ // environment and re-open it. Only do this when we are
+ // importing to all the base DNs in the backend or if the backend only
+ // have one base DN.
+ File parentDirectory = getFileForPath(cfg.getDBDirectory());
+ File backendDirectory = new File(parentDirectory, cfg.getBackendId());
+ // If the backend does not exist the import will create it.
+ if (backendDirectory.exists()) {
+ EnvManager.removeFiles(backendDirectory.getPath());
+ }
}
-
- envConfig.setReadOnly(false);
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(false);
- envConfig.setTxnNoSync(false);
- envConfig.setConfigParam("je.env.isLocking", "false");
- envConfig.setConfigParam("je.env.runCheckpointer", "false");
}
-
+ envConfig.setReadOnly(false);
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(false);
+ envConfig.setTxnNoSync(false);
+ envConfig.setConfigParam("je.env.isLocking", "false");
+ envConfig.setConfigParam("je.env.runCheckpointer", "false");
+ Importer importer = new Importer(importConfig);
+ envConfig.setConfigParam("je.maxMemory", importer.getDBCacheSize());
rootContainer = initializeRootContainer(envConfig);
-
- ImportJob importJob = new ImportJob(importConfig);
- return importJob.importLDIF(rootContainer);
+ return importer.processImport(rootContainer);
}
catch (IOException ioe)
{
@@ -1263,7 +1245,13 @@
{
if (rootContainer != null)
{
+ long startTime = System.currentTimeMillis();
rootContainer.close();
+ long finishTime = System.currentTimeMillis();
+ long closeTime = (finishTime - startTime) / 1000;
+ Message msg =
+ INFO_JEB_IMPORT_LDIF_ROOTCONTAINER_CLOSE.get(closeTime);
+ logError(msg);
rootContainer = null;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/EntryContainer.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/EntryContainer.java
index e885246..50aa260 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/EntryContainer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/EntryContainer.java
@@ -667,6 +667,16 @@
return attrIndexMap.get(attrType);
}
+
+ /**
+ * Return attribute index map.
+ *
+ * @return The attribute index map.
+ */
+ public Map<AttributeType, AttributeIndex> getAttributeIndexMap() {
+ return attrIndexMap;
+ }
+
/**
* Look for an VLV index for the given index name.
*
@@ -4657,4 +4667,20 @@
}
return matchedDN;
}
+
+ /**
+ * Get the exclusive lock.
+ *
+ */
+ public void lock() {
+ exclusiveLock.lock();
+ }
+
+ /**
+ * Unlock the exclusive lock.
+ */
+ public void unlock() {
+ exclusiveLock.unlock();
+ }
+
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ID2Entry.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ID2Entry.java
index 55ad746..e9425e6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ID2Entry.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ID2Entry.java
@@ -103,7 +103,7 @@
* @throws DirectoryException If a problem occurs while attempting to encode
* the entry.
*/
- private DatabaseEntry entryData(Entry entry)
+ public DatabaseEntry entryData(Entry entry)
throws DirectoryException
{
byte[] entryBytes;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
deleted file mode 100644
index 99c6f88..0000000
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
+++ /dev/null
@@ -1,1310 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2008 Sun Microsystems, Inc.
- */
-package org.opends.server.backends.jeb;
-import org.opends.messages.Message;
-
-import com.sleepycat.je.*;
-
-import org.opends.server.types.DebugLogLevel;
-import org.opends.messages.JebMessages;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.DN;
-import org.opends.server.types.Entry;
-import org.opends.server.types.LDIFImportConfig;
-import org.opends.server.types.LDIFImportResult;
-import org.opends.server.types.ResultCode;
-import org.opends.server.util.LDIFException;
-import org.opends.server.util.LDIFReader;
-import org.opends.server.util.StaticUtils;
-import org.opends.server.util.RuntimeInformation;
-import static org.opends.server.util.StaticUtils.getFileForPath;
-
-import static org.opends.server.util.DynamicConstants.*;
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-
-import static org.opends.messages.JebMessages.
- WARN_JEB_IMPORT_ENTRY_EXISTS;
-import static org.opends.messages.JebMessages.
- ERR_JEB_IMPORT_PARENT_NOT_FOUND;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.messages.JebMessages.*;
-import org.opends.server.admin.std.server.LocalDBBackendCfg;
-import org.opends.server.protocols.asn1.ASN1OctetString;
-import org.opends.server.config.ConfigException;
-import org.opends.server.core.DirectoryServer;
-
-/**
- * Import from LDIF to a JE backend.
- */
-public class ImportJob implements Thread.UncaughtExceptionHandler
-{
- /**
- * The tracer object for the debug logger.
- */
- private static final DebugTracer TRACER = getTracer();
-
- /**
- * The JE backend configuration.
- */
- private LocalDBBackendCfg config;
-
- /**
- * The root container used for this import job.
- */
- private RootContainer rootContainer;
-
- /**
- * The LDIF import configuration.
- */
- private LDIFImportConfig ldifImportConfig;
-
- /**
- * The LDIF reader.
- */
- private LDIFReader reader;
-
- /**
- * Map of base DNs to their import context.
- */
- private LinkedHashMap<DN,ImportContext> importMap =
- new LinkedHashMap<DN, ImportContext>();
-
- /**
- * The maximum number of parent ID values that we will remember.
- */
- private static final int PARENT_ID_MAP_SIZE = 50;
-
- /**
- * Map of likely parent entry DNs to their entry IDs.
- */
- private HashMap<DN,EntryID> parentIDMap =
- new HashMap<DN,EntryID>(PARENT_ID_MAP_SIZE);
-
- /**
- * The number of entries imported.
- */
- private int importedCount;
-
- /**
- * The number of entries migrated.
- */
- private int migratedCount;
-
- /**
- * The number of merge passes.
- */
- private int mergePassNumber = 1;
-
-
- /**
- * The number of milliseconds between job progress reports.
- */
- private long progressInterval = 10000;
-
- /**
- * The progress report timer.
- */
- private Timer timer;
-
- private int entriesProcessed;
- private int importPassSize;
-
-
- /**
- * The import worker threads.
- */
- private CopyOnWriteArrayList<ImportThread> threads;
-
- /**
- * Create a new import job.
- *
- * @param ldifImportConfig The LDIF import configuration.
- */
- public ImportJob(LDIFImportConfig ldifImportConfig)
- {
- this.ldifImportConfig = ldifImportConfig;
- this.threads = new CopyOnWriteArrayList<ImportThread>();
- }
-
- /**
- * Import from LDIF file to one or more base DNs. Opens the database
- * environment and deletes existing data for each base DN unless we are
- * appending to existing data. Creates a temporary working directory,
- * processes the LDIF file, then merges the resulting intermediate
- * files to load the index databases.
- *
- * @param rootContainer The root container to import into.
- *
- * @return Information about the result of the import.
- *
- * @throws DatabaseException If an error occurs in the JE database.
- * @throws IOException If a problem occurs while opening the LDIF file for
- * reading, or while reading from the LDIF file.
- * @throws JebException If an error occurs in the JE backend.
- * @throws DirectoryException if a directory server related error occurs.
- * @throws ConfigException if a configuration related error occurs.
- */
- public LDIFImportResult importLDIF(RootContainer rootContainer)
- throws DatabaseException, IOException, JebException, DirectoryException,
- ConfigException
- {
-
- // Create an LDIF reader. Throws an exception if the file does not exist.
- reader = new LDIFReader(ldifImportConfig);
- this.rootContainer = rootContainer;
- this.config = rootContainer.getConfiguration();
- this.mergePassNumber = 1;
- this.entriesProcessed = 0;
- this.importPassSize = config.getImportPassSize();
- if (importPassSize <= 0)
- {
- importPassSize = Integer.MAX_VALUE;
- }
-
- Message message;
- long startTime;
-
- try
- {
- // Divide the total buffer size by the number of threads
- // and give that much to each thread.
- int importThreadCount = config.getImportThreadCount();
- long bufferSize = config.getImportBufferSize() /
- (importThreadCount*rootContainer.getBaseDNs().size());
-
- message = INFO_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
- BUILD_ID, REVISION_NUMBER);
- logError(message);
- message = INFO_JEB_IMPORT_THREAD_COUNT.get(importThreadCount);
- logError(message);
-
- RuntimeInformation.logInfo();
- if (debugEnabled())
- {
-
- message = INFO_JEB_IMPORT_BUFFER_SIZE.get(bufferSize);
- TRACER.debugInfo(message.toString());
- message = INFO_JEB_IMPORT_ENVIRONMENT_CONFIG.get(
- rootContainer.getEnvironmentConfig().toString());
- TRACER.debugInfo(message.toString());
- }
-
- for (EntryContainer entryContainer : rootContainer.getEntryContainers())
- {
- ImportContext importContext =
- getImportContext(entryContainer, bufferSize);
-
- if(importContext != null)
- {
- importMap.put(entryContainer.getBaseDN(), importContext);
- }
- }
-
- // Make a note of the time we started.
- startTime = System.currentTimeMillis();
-
- // Create a temporary work directory.
- File parentDir = getFileForPath(config.getImportTempDirectory());
- File tempDir = new File(parentDir, config.getBackendId());
- if(!tempDir.exists() && !tempDir.mkdirs())
- {
- Message msg = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(
- String.valueOf(tempDir));
- throw new IOException(msg.toString());
- }
-
- if (tempDir.listFiles() != null)
- {
- for (File f : tempDir.listFiles())
- {
- f.delete();
- }
- }
-
- startWorkerThreads();
- try
- {
- importedCount = 0;
- migratedCount = 0;
- migrateExistingEntries();
- processLDIF();
- migrateExcludedEntries();
- }
- finally
- {
- merge(false);
- tempDir.delete();
-
- for(ImportContext importContext : importMap.values())
- {
- DN baseDN = importContext.getBaseDN();
- EntryContainer srcEntryContainer =
- importContext.getSrcEntryContainer();
- if(srcEntryContainer != null)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("Deleteing old entry container for base DN " +
- "%s and renaming temp entry container", baseDN);
- }
- EntryContainer unregEC =
- rootContainer.unregisterEntryContainer(baseDN);
- //Make sure the unregistered EC for the base DN is the same as
- //the one in the import context.
- if(unregEC != srcEntryContainer)
- {
- if(debugEnabled())
- {
- TRACER.debugInfo("Current entry container used for base DN " +
- "%s is not the same as the source entry container used " +
- "during the migration process.", baseDN);
- }
- rootContainer.registerEntryContainer(baseDN, unregEC);
- continue;
- }
- srcEntryContainer.exclusiveLock.lock();
- srcEntryContainer.delete();
- srcEntryContainer.exclusiveLock.unlock();
- EntryContainer newEC = importContext.getEntryContainer();
- newEC.exclusiveLock.lock();
- newEC.setDatabasePrefix(baseDN.toNormalizedString());
- newEC.exclusiveLock.unlock();
- rootContainer.registerEntryContainer(baseDN, newEC);
- }
- }
- }
- }
- finally
- {
- reader.close();
- }
-
- long finishTime = System.currentTimeMillis();
- long importTime = (finishTime - startTime);
-
- float rate = 0;
- if (importTime > 0)
- {
- rate = 1000f*importedCount / importTime;
- }
-
- message = INFO_JEB_IMPORT_FINAL_STATUS.
- get(reader.getEntriesRead(), importedCount - migratedCount,
- reader.getEntriesIgnored(), reader.getEntriesRejected(),
- migratedCount, importTime/1000, rate);
- logError(message);
-
- message = INFO_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get(
- getEntryLimitExceededCount());
- logError(message);
-
- return new LDIFImportResult(reader.getEntriesRead(),
- reader.getEntriesRejected(),
- reader.getEntriesIgnored());
- }
-
- /**
- * Merge the intermediate files to load the index databases.
- *
- * @param moreData <CODE>true</CODE> if this is a intermediate merge or
- * <CODE>false</CODE> if this is a final merge.
- * @throws DatabaseException If an error occurs in the JE database.
- */
- private void merge(boolean moreData) throws DatabaseException
- {
- stopWorkerThreads();
-
- try
- {
- if (moreData)
- {
- Message message =
- INFO_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE.get(mergePassNumber++);
- logError(message);
- }
- else
- {
- Message message = INFO_JEB_IMPORT_BEGINNING_FINAL_MERGE.get();
- logError(message);
- }
-
-
- long mergeStartTime = System.currentTimeMillis();
-
- ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>();
-
- ArrayList<VLVIndexMergeThread> vlvIndexMergeThreads =
- new ArrayList<VLVIndexMergeThread>();
-
- // Create merge threads for each base DN.
- for (ImportContext importContext : importMap.values())
- {
- EntryContainer entryContainer = importContext.getEntryContainer();
-
- // For each configured attribute index.
- for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
- {
- int indexEntryLimit = config.getIndexEntryLimit();
- if(attrIndex.getConfiguration().getIndexEntryLimit() != null)
- {
- indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit();
- }
-
- if (attrIndex.equalityIndex != null)
- {
- Index index = attrIndex.equalityIndex;
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig, index,
- indexEntryLimit);
- mergers.add(indexMergeThread);
- }
- if (attrIndex.presenceIndex != null)
- {
- Index index = attrIndex.presenceIndex;
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig, index,
- indexEntryLimit);
- mergers.add(indexMergeThread);
- }
- if (attrIndex.substringIndex != null)
- {
- Index index = attrIndex.substringIndex;
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig, index,
- indexEntryLimit);
- mergers.add(indexMergeThread);
- }
- if (attrIndex.orderingIndex != null)
- {
- Index index = attrIndex.orderingIndex;
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig, index,
- indexEntryLimit);
- mergers.add(indexMergeThread);
- }
- if (attrIndex.approximateIndex != null)
- {
- Index index = attrIndex.approximateIndex;
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig, index,
- indexEntryLimit);
- mergers.add(indexMergeThread);
- }
- }
-
- for(VLVIndex vlvIndex : entryContainer.getVLVIndexes())
- {
- VLVIndexMergeThread vlvIndexMergeThread =
- new VLVIndexMergeThread(config, ldifImportConfig, vlvIndex);
- vlvIndexMergeThread.setUncaughtExceptionHandler(this);
- vlvIndexMergeThreads.add(vlvIndexMergeThread);
- }
-
- // Id2Children index.
- Index id2Children = entryContainer.getID2Children();
- IndexMergeThread indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig,
- id2Children,
- config.getIndexEntryLimit());
- mergers.add(indexMergeThread);
-
- // Id2Subtree index.
- Index id2Subtree = entryContainer.getID2Subtree();
- indexMergeThread =
- new IndexMergeThread(config,
- ldifImportConfig,
- id2Subtree,
- config.getIndexEntryLimit());
- mergers.add(indexMergeThread);
- }
-
- // Run all the merge threads.
- for (IndexMergeThread imt : mergers)
- {
- imt.start();
- }
- for (VLVIndexMergeThread imt : vlvIndexMergeThreads)
- {
- imt.start();
- }
-
- // Wait for the threads to finish.
- for (IndexMergeThread imt : mergers)
- {
- try
- {
- imt.join();
- }
- catch (InterruptedException e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- }
- // Wait for the threads to finish.
- for (VLVIndexMergeThread imt : vlvIndexMergeThreads)
- {
- try
- {
- imt.join();
- }
- catch (InterruptedException e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- }
-
- long mergeEndTime = System.currentTimeMillis();
-
- if (moreData)
- {
- Message message = INFO_JEB_IMPORT_RESUMING_LDIF_PROCESSING.get(
- ((mergeEndTime-mergeStartTime)/1000));
- logError(message);
- }
- else
- {
- Message message = INFO_JEB_IMPORT_FINAL_MERGE_COMPLETED.get(
- ((mergeEndTime-mergeStartTime)/1000));
- logError(message);
- }
- }
- finally
- {
- if(moreData)
- {
- startWorkerThreads();
- }
- }
- }
-
- private void startWorkerThreads() throws DatabaseException
- {
- // Create one set of worker threads for each base DN.
- int importThreadCount = config.getImportThreadCount();
- for (ImportContext ic : importMap.values())
- {
- for (int i = 0; i < importThreadCount; i++)
- {
- ImportThread t = new ImportThread(ic, i);
- t.setUncaughtExceptionHandler(this);
- threads.add(t);
-
- t.start();
- }
- }
-
- // Start a timer for the progress report.
- timer = new Timer();
- TimerTask progressTask = new ImportJob.ProgressTask();
- timer.scheduleAtFixedRate(progressTask, progressInterval,
- progressInterval);
- }
-
- private void stopWorkerThreads()
- {
- if(threads.size() > 0)
- {
- // Wait for the queues to be drained.
- for (ImportContext ic : importMap.values())
- {
- while (ic.getQueue().size() > 0)
- {
- try
- {
- Thread.sleep(100);
- } catch (Exception e)
- {
- // No action needed.
- }
- }
- }
- }
-
- // Order the threads to stop.
- for (ImportThread t : threads)
- {
- t.stopProcessing();
- }
-
- // Wait for each thread to stop.
- for (ImportThread t : threads)
- {
- try
- {
- t.join();
- importedCount += t.getImportedCount();
- }
- catch (InterruptedException ie)
- {
- // No action needed?
- }
- }
-
- timer.cancel();
- }
-
- /**
- * Create a set of worker threads, one set for each base DN.
- * Read each entry from the LDIF and determine which
- * base DN the entry belongs to. Write the dn2id database, then put the
- * entry on the appropriate queue for the worker threads to consume.
- * Record the entry count for each base DN when all entries have been
- * processed.
- *
- * pass size was reached), false if the entire LDIF file has been read.
- *
- * @throws JebException If an error occurs in the JE backend.
- * @throws DatabaseException If an error occurs in the JE database.
- * @throws IOException If a problem occurs while opening the LDIF file for
- * reading, or while reading from the LDIF file.
- */
- private void processLDIF()
- throws JebException, DatabaseException, IOException
- {
- Message message = INFO_JEB_IMPORT_LDIF_START.get();
- logError(message);
-
- do
- {
- if (ldifImportConfig.isCancelled())
- {
- break;
- }
-
- if(threads.size() <= 0)
- {
- message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
- throw new JebException(message);
- }
- try
- {
- // Read the next entry.
- Entry entry = reader.readEntry();
-
- // Check for end of file.
- if (entry == null)
- {
- message = INFO_JEB_IMPORT_LDIF_END.get();
- logError(message);
-
- break;
- }
-
- // Route it according to base DN.
- ImportContext importContext = getImportConfig(entry.getDN());
-
- processEntry(importContext, entry);
-
- entriesProcessed++;
- if (entriesProcessed >= importPassSize)
- {
- merge(true);
- entriesProcessed = 0;
- }
- }
- catch (LDIFException e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- catch (DirectoryException e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- } while (true);
- }
-
- private void migrateExistingEntries()
- throws JebException, DatabaseException, DirectoryException
- {
- for(ImportContext importContext : importMap.values())
- {
- EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
- if(srcEntryContainer != null &&
- !importContext.getIncludeBranches().isEmpty())
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- LockMode lockMode = LockMode.DEFAULT;
- OperationStatus status;
-
- Message message = INFO_JEB_IMPORT_MIGRATION_START.get(
- "existing", String.valueOf(importContext.getBaseDN()));
- logError(message);
-
- Cursor cursor =
- srcEntryContainer.getDN2ID().openCursor(null,
- CursorConfig.READ_COMMITTED);
- try
- {
- status = cursor.getFirst(key, data, lockMode);
-
- while(status == OperationStatus.SUCCESS &&
- !ldifImportConfig.isCancelled())
- {
- if(threads.size() <= 0)
- {
- message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
- throw new JebException(message);
- }
-
- DN dn = DN.decode(new ASN1OctetString(key.getData()));
- if(!importContext.getIncludeBranches().contains(dn))
- {
- EntryID id = new EntryID(data);
- Entry entry = srcEntryContainer.getID2Entry().get(null, id);
- processEntry(importContext, entry);
-
- entriesProcessed++;
- migratedCount++;
- if (entriesProcessed >= importPassSize)
- {
- merge(true);
- entriesProcessed = 0;
- }
- status = cursor.getNext(key, data, lockMode);
- }
- else
- {
- // This is the base entry for a branch that will be included
- // in the import so we don't want to copy the branch to the new
- // entry container.
-
- /**
- * Advance the cursor to next entry at the same level in the DIT
- * skipping all the entries in this branch.
- * Set the next starting value to a value of equal length but
- * slightly greater than the previous DN. Since keys are compared
- * in reverse order we must set the first byte (the comma).
- * No possibility of overflow here.
- */
- byte[] begin =
- StaticUtils.getBytes("," + dn.toNormalizedString());
- begin[0] = (byte) (begin[0] + 1);
- key.setData(begin);
- status = cursor.getSearchKeyRange(key, data, lockMode);
- }
- }
- }
- finally
- {
- cursor.close();
- }
- }
- }
- }
-
- private void migrateExcludedEntries()
- throws JebException, DatabaseException
- {
- for(ImportContext importContext : importMap.values())
- {
- EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
- if(srcEntryContainer != null &&
- !importContext.getExcludeBranches().isEmpty())
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- LockMode lockMode = LockMode.DEFAULT;
- OperationStatus status;
-
- Message message = INFO_JEB_IMPORT_MIGRATION_START.get(
- "excluded", String.valueOf(importContext.getBaseDN()));
- logError(message);
-
- Cursor cursor =
- srcEntryContainer.getDN2ID().openCursor(null,
- CursorConfig.READ_COMMITTED);
- Comparator<byte[]> dn2idComparator =
- srcEntryContainer.getDN2ID().getComparator();
- try
- {
- for(DN excludedDN : importContext.getExcludeBranches())
- {
- byte[] suffix =
- StaticUtils.getBytes(excludedDN.toNormalizedString());
- key.setData(suffix);
- status = cursor.getSearchKeyRange(key, data, lockMode);
-
- if(status == OperationStatus.SUCCESS &&
- Arrays.equals(key.getData(), suffix))
- {
- // This is the base entry for a branch that was excluded in the
- // import so we must migrate all entries in this branch over to
- // the new entry container.
-
- byte[] end =
- StaticUtils.getBytes("," + excludedDN.toNormalizedString());
- end[0] = (byte) (end[0] + 1);
-
- while(status == OperationStatus.SUCCESS &&
- dn2idComparator.compare(key.getData(), end) < 0 &&
- !ldifImportConfig.isCancelled())
- {
- if(threads.size() <= 0)
- {
- message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
- throw new JebException(message);
- }
-
- EntryID id = new EntryID(data);
- Entry entry = srcEntryContainer.getID2Entry().get(null, id);
- processEntry(importContext, entry);
-
- entriesProcessed++;
- migratedCount++;
- if (entriesProcessed >= importPassSize)
- {
- merge(true);
- entriesProcessed = 0;
- }
- status = cursor.getNext(key, data, lockMode);
- }
- }
- }
- }
- finally
- {
- cursor.close();
- }
- }
- }
- }
-
- /**
- * Process an entry to be imported. Read dn2id to check if the entry already
- * exists, and write dn2id if it does not. Put the entry on the worker
- * thread queue.
- *
- * @param importContext The import context for this entry.
- * @param entry The entry to be imported.
- * @throws DatabaseException If an error occurs in the JE database.
- * @throws JebException If an error occurs in the JE backend.
- */
- private void processEntry(ImportContext importContext, Entry entry)
- throws JebException, DatabaseException
- {
- DN entryDN = entry.getDN();
- LDIFImportConfig ldifImportConfig = importContext.getLDIFImportConfig();
-
- Transaction txn = null;
- if (ldifImportConfig.appendToExistingData())
- {
- txn = importContext.getEntryContainer().beginTransaction();
- }
-
- DN2ID dn2id = importContext.getEntryContainer().getDN2ID();
- ID2Entry id2entry = importContext.getEntryContainer().getID2Entry();
-
- try
- {
- // See if the entry already exists.
- EntryID entryID = dn2id.get(txn, entryDN);
- if (entryID != null)
- {
- // See if we are allowed to replace the entry that exists.
- if (ldifImportConfig.appendToExistingData() &&
- ldifImportConfig.replaceExistingEntries())
- {
- // Read the existing entry contents.
- Entry oldEntry = id2entry.get(txn, entryID);
-
- // Attach the ID to the old entry.
- oldEntry.setAttachment(entryID);
-
- // Attach the old entry to the new entry.
- entry.setAttachment(oldEntry);
-
- // Put the entry on the queue.
- try
- {
- importContext.getQueue().put(entry);
- }
- catch (InterruptedException e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- }
- else
- {
- // Reject the entry.
-
- Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
- importContext.getLDIFReader().rejectLastEntry(msg);
- return;
- }
- }
- else
- {
- // Make sure the parent entry exists, unless this entry is a base DN.
- EntryID parentID = null;
- DN parentDN = importContext.getEntryContainer().
- getParentWithinBase(entryDN);
- if (parentDN != null)
- {
- parentID = getParentID(parentDN, dn2id, txn);
- if (parentID == null)
- {
- // Reject the entry.
- Message msg =
- ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
- importContext.getLDIFReader().rejectLastEntry(msg);
- return;
- }
- }
-
- // Assign a new entry identifier and write the new DN.
- entryID = rootContainer.getNextEntryID();
- dn2id.insert(txn, entryDN, entryID);
-
- // Construct a list of IDs up the DIT.
- ArrayList<EntryID> IDs;
- if (parentDN != null && importContext.getParentDN() != null &&
- parentDN.equals(importContext.getParentDN()))
- {
- // Reuse the previous values.
- IDs = new ArrayList<EntryID>(importContext.getIDs());
- IDs.set(0, entryID);
- }
- else
- {
- IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
- IDs.add(entryID);
- if (parentID != null)
- {
- IDs.add(parentID);
- EntryContainer ec = importContext.getEntryContainer();
- for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
- dn = ec.getParentWithinBase(dn))
- {
- // Read the ID from dn2id.
- EntryID nodeID = dn2id.get(txn, dn);
- IDs.add(nodeID);
- }
- }
- }
- importContext.setParentDN(parentDN);
- importContext.setIDs(IDs);
-
- // Attach the list of IDs to the entry.
- entry.setAttachment(IDs);
-
- // Put the entry on the queue.
- try
- {
- while(!importContext.getQueue().offer(entry, 1000,
- TimeUnit.MILLISECONDS))
- {
- if(threads.size() <= 0)
- {
- // All worker threads died. We must stop now.
- return;
- }
- }
- }
- catch (InterruptedException e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- }
-
- if (txn != null)
- {
- EntryContainer.transactionCommit(txn);
- txn = null;
- }
- }
- finally
- {
- if (txn != null)
- {
- EntryContainer.transactionAbort(txn);
- }
- }
- }
-
- /**
- * Retrieves the entry ID for the entry with the given DN. This will use an
- * in-memory hash if possible, or will go to the database if it's not in
- * cache. This should only be used for cacheable operations (like getting the
- * entry ID for the parent entry) where the same parent ID is likely to be
- * used multiple times.
- *
- * @param parentDN The DN of the parent entry for which to retrieve the
- * corresponding entry ID.
- * @param dn2id The handle to the dn2id database to use if the parent DN
- * isn't found in the local cache.
- * @param txn The transaction to use when interacting with the dn2id
- * database.
- *
- * @return The entry ID for the entry with the given DN, or {@code null} if
- * no such entry exists.
- */
- private EntryID getParentID(DN parentDN, DN2ID dn2id, Transaction txn)
- throws DatabaseException
- {
- EntryID parentID = parentIDMap.get(parentDN);
- if (parentID != null)
- {
- return parentID;
- }
-
- parentID = dn2id.get(txn, parentDN);
- if (parentID != null)
- {
- if (parentIDMap.size() >= PARENT_ID_MAP_SIZE)
- {
- Iterator<DN> iterator = parentIDMap.keySet().iterator();
- iterator.next();
- iterator.remove();
- }
-
- parentIDMap.put(parentDN, parentID);
- }
-
- return parentID;
- }
-
- /**
- * Get a statistic of the number of keys that reached the entry limit.
- * @return The number of keys that reached the entry limit.
- */
- private int getEntryLimitExceededCount()
- {
- int count = 0;
- for (ImportContext ic : importMap.values())
- {
- count += ic.getEntryContainer().getEntryLimitExceededCount();
- }
- return count;
- }
-
- /**
- * Method invoked when the given thread terminates due to the given uncaught
- * exception. <p>Any exception thrown by this method will be ignored by the
- * Java Virtual Machine.
- *
- * @param t the thread
- * @param e the exception
- */
- public void uncaughtException(Thread t, Throwable e)
- {
- threads.remove(t);
- Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get(
- t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause()));
- logError(msg);
- }
-
- /**
- * Determine the appropriate import context for an entry.
- *
- * @param dn The DN of an entry
- * @return The import context.
- * @throws DirectoryException If the entry DN does not match any
- * of the base DNs.
- */
- private ImportContext getImportConfig(DN dn) throws DirectoryException
- {
- ImportContext importContext = null;
- DN nodeDN = dn;
-
- while (importContext == null && nodeDN != null)
- {
- importContext = importMap.get(nodeDN);
- if (importContext == null)
- {
- nodeDN = nodeDN.getParentDNInSuffix();
- }
- }
-
- if (nodeDN == null)
- {
- // The entry should not have been given to this backend.
- Message message =
- JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn));
- throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
- }
-
- return importContext;
- }
-
- private ImportContext getImportContext(EntryContainer entryContainer,
- long bufferSize)
- throws DatabaseException, JebException, ConfigException
- {
- DN baseDN = entryContainer.getBaseDN();
- EntryContainer srcEntryContainer = null;
- List<DN> includeBranches = new ArrayList<DN>();
- List<DN> excludeBranches = new ArrayList<DN>();
-
- if(!ldifImportConfig.appendToExistingData() &&
- !ldifImportConfig.clearBackend())
- {
- for(DN dn : ldifImportConfig.getExcludeBranches())
- {
- if(baseDN.equals(dn))
- {
- // This entire base DN was explicitly excluded. Skip.
- return null;
- }
- if(baseDN.isAncestorOf(dn))
- {
- excludeBranches.add(dn);
- }
- }
-
- if(!ldifImportConfig.getIncludeBranches().isEmpty())
- {
- for(DN dn : ldifImportConfig.getIncludeBranches())
- {
- if(baseDN.isAncestorOf(dn))
- {
- includeBranches.add(dn);
- }
- }
-
- if(includeBranches.isEmpty())
- {
- // There are no branches in the explicitly defined include list under
- // this base DN. Skip this base DN alltogether.
-
- return null;
- }
-
- // Remove any overlapping include branches.
- Iterator<DN> includeBranchIterator = includeBranches.iterator();
- while(includeBranchIterator.hasNext())
- {
- DN includeDN = includeBranchIterator.next();
- boolean keep = true;
- for(DN dn : includeBranches)
- {
- if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
- {
- keep = false;
- break;
- }
- }
- if(!keep)
- {
- includeBranchIterator.remove();
- }
- }
-
- // Remvoe any exclude branches that are not are not under a include
- // branch since they will be migrated as part of the existing entries
- // outside of the include branches anyways.
- Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
- while(excludeBranchIterator.hasNext())
- {
- DN excludeDN = excludeBranchIterator.next();
- boolean keep = false;
- for(DN includeDN : includeBranches)
- {
- if(includeDN.isAncestorOf(excludeDN))
- {
- keep = true;
- break;
- }
- }
- if(!keep)
- {
- excludeBranchIterator.remove();
- }
- }
-
- if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
- includeBranches.get(0).equals(baseDN))
- {
- // This entire base DN is explicitly included in the import with
- // no exclude branches that we need to migrate. Just clear the entry
- // container.
- entryContainer.exclusiveLock.lock();
- entryContainer.clear();
- entryContainer.exclusiveLock.unlock();
- }
- else
- {
- // Create a temp entry container
- srcEntryContainer = entryContainer;
- entryContainer =
- rootContainer.openEntryContainer(baseDN,
- baseDN.toNormalizedString() +
- "_importTmp");
- }
- }
- }
-
- // Create an import context.
- ImportContext importContext = new ImportContext();
- importContext.setBufferSize(bufferSize);
- importContext.setConfig(config);
- importContext.setLDIFImportConfig(this.ldifImportConfig);
- importContext.setLDIFReader(reader);
-
- importContext.setBaseDN(baseDN);
- importContext.setEntryContainer(entryContainer);
- importContext.setSrcEntryContainer(srcEntryContainer);
- importContext.setBufferSize(bufferSize);
-
- // Create an entry queue.
- LinkedBlockingQueue<Entry> queue =
- new LinkedBlockingQueue<Entry>(config.getImportQueueSize());
- importContext.setQueue(queue);
-
- // Set the include and exclude branches
- importContext.setIncludeBranches(includeBranches);
- importContext.setExcludeBranches(excludeBranches);
-
- return importContext;
- }
-
- /**
- * This class reports progress of the import job at fixed intervals.
- */
- private final class ProgressTask extends TimerTask
- {
- /**
- * The number of entries that had been read at the time of the
- * previous progress report.
- */
- private long previousCount = 0;
-
- /**
- * The time in milliseconds of the previous progress report.
- */
- private long previousTime;
-
- /**
- * The environment statistics at the time of the previous report.
- */
- private EnvironmentStats prevEnvStats;
-
- /**
- * The number of bytes in a megabyte.
- * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB).
- */
- private static final int bytesPerMegabyte = 1024*1024;
-
- /**
- * Create a new import progress task.
- * @throws DatabaseException If an error occurs in the JE database.
- */
- public ProgressTask() throws DatabaseException
- {
- previousTime = System.currentTimeMillis();
- prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
- }
-
- /**
- * The action to be performed by this timer task.
- */
- public void run()
- {
- long latestCount = reader.getEntriesRead() + migratedCount;
- long deltaCount = (latestCount - previousCount);
- long latestTime = System.currentTimeMillis();
- long deltaTime = latestTime - previousTime;
-
- if (deltaTime == 0)
- {
- return;
- }
-
- long numRead = reader.getEntriesRead();
- long numIgnored = reader.getEntriesIgnored();
- long numRejected = reader.getEntriesRejected();
- float rate = 1000f*deltaCount / deltaTime;
-
- Message message = INFO_JEB_IMPORT_PROGRESS_REPORT.get(
- numRead, numIgnored, numRejected, migratedCount, rate);
- logError(message);
-
- try
- {
- Runtime runtime = Runtime.getRuntime();
- long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
-
- EnvironmentStats envStats =
- rootContainer.getEnvironmentStats(new StatsConfig());
- long nCacheMiss =
- envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
-
- float cacheMissRate = 0;
- if (deltaCount > 0)
- {
- cacheMissRate = nCacheMiss/(float)deltaCount;
- }
-
- message = INFO_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(
- freeMemory, cacheMissRate);
- logError(message);
-
- prevEnvStats = envStats;
- }
- catch (DatabaseException e)
- {
- // Unlikely to happen and not critical.
- }
-
-
- previousCount = latestCount;
- previousTime = latestTime;
- }
- }
-
-}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportThread.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportThread.java
deleted file mode 100644
index 90cc7e1..0000000
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportThread.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2008 Sun Microsystems, Inc.
- */
-package org.opends.server.backends.jeb;
-
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.types.DebugLogLevel;
-import org.opends.server.api.DirectoryThread;
-import org.opends.server.types.Entry;
-
-import com.sleepycat.je.Transaction;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.ArrayList;
-
-/**
- * A thread to process import entries from a queue. Multiple instances of
- * this class process entries from a single shared queue.
- */
-public class ImportThread extends DirectoryThread
-{
- /**
- * The tracer object for the debug logger.
- */
- private static final DebugTracer TRACER = getTracer();
-
-
- /**
- * The import context of this thread.
- */
- private ImportContext importContext;
-
- /**
- * The destination entry entryContainer for entries read from the queue.
- */
- private EntryContainer entryContainer;
-
- /**
- * The entry database of the destination entry entryContainer.
- */
- private ID2Entry id2entry;
-
- /**
- * The referral database of the destination entry entryContainer.
- */
- private DN2URI dn2uri;
-
- /**
- * A set of index builder objects to construct the index databases.
- */
- private ArrayList<IndexBuilder> builders = new ArrayList<IndexBuilder>();
-
- /**
- * This queue is the source of entries to be processed.
- */
- private BlockingQueue<Entry> queue;
-
- /**
- * The number of entries imported by this thread.
- */
- private int importedCount = 0;
-
- /**
- * Of the total number of entries imported by this thread, this is the
- * number of new entries inserted (as opposed to existing entries that have
- * been replaced).
- */
- private int entryInsertCount = 0;
-
- /**
- * A flag that is set when the thread has been told to stop processing.
- */
- private boolean stopRequested = false;
-
- /**
- * Create a new import thread.
- * @param importContext The import context of the thread.
- * @param threadNumber A number to identify this thread instance among
- * other instances of the same class.
- */
- public ImportThread(ImportContext importContext, int threadNumber)
- {
- super("Import Worker Thread " + threadNumber);
-
- this.importContext = importContext;
- entryContainer = importContext.getEntryContainer();
- queue = importContext.getQueue();
- id2entry = entryContainer.getID2Entry();
- dn2uri = entryContainer.getDN2URI();
- }
-
- /**
- * Get the number of entries imported by this thread.
- * @return The number of entries imported by this thread.
- */
- public int getImportedCount()
- {
- return importedCount;
- }
-
- /**
- * Tells the thread to stop processing.
- */
- public void stopProcessing()
- {
- stopRequested = true;
- }
-
- /**
- * Run the thread. Creates index builder objects for each index database,
- * then polls the queue until it is told to stop processing. Each entry
- * taken from the queue is written to the entry database and processed by
- * all the index builders.
- */
- public void run()
- {
- Entry entry;
-
- // Figure out how many indexes there will be.
- int nIndexes = 0;
-
- nIndexes += 2; // For id2children and id2subtree.
-
- for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
- {
- if (attrIndex.equalityIndex != null)
- {
- nIndexes++;
- }
- if (attrIndex.presenceIndex != null)
- {
- nIndexes++;
- }
- if (attrIndex.substringIndex != null)
- {
- nIndexes++;
- }
- if (attrIndex.orderingIndex != null)
- {
- nIndexes++;
- }
- if (attrIndex.approximateIndex != null)
- {
- nIndexes++;
- }
- }
-
- nIndexes += entryContainer.getVLVIndexes().size();
-
- // Divide the total buffer size by the number of threads
- // and give that much to each index.
- long indexBufferSize = importContext.getBufferSize() / nIndexes;
-
- // Create an index builder for each attribute index database.
- for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
- {
- int indexEntryLimit =
- importContext.getConfig().getIndexEntryLimit();
- if(attrIndex.getConfiguration().getIndexEntryLimit() != null)
- {
- indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit();
- }
-
- if (attrIndex.equalityIndex != null)
- {
- AttributeIndexBuilder attributeIndexBuilder =
- new AttributeIndexBuilder(importContext,
- attrIndex.equalityIndex,
- indexEntryLimit,
- indexBufferSize);
- builders.add(attributeIndexBuilder);
- }
- if (attrIndex.presenceIndex != null)
- {
- AttributeIndexBuilder attributeIndexBuilder =
- new AttributeIndexBuilder(importContext,
- attrIndex.presenceIndex,
- indexEntryLimit,
- indexBufferSize);
- builders.add(attributeIndexBuilder);
- }
- if (attrIndex.substringIndex != null)
- {
- AttributeIndexBuilder attributeIndexBuilder =
- new AttributeIndexBuilder(importContext,
- attrIndex.substringIndex,
- indexEntryLimit,
- indexBufferSize);
- builders.add(attributeIndexBuilder);
- }
- if (attrIndex.orderingIndex != null)
- {
- AttributeIndexBuilder attributeIndexBuilder =
- new AttributeIndexBuilder(importContext, attrIndex.orderingIndex,
- indexEntryLimit,
- indexBufferSize);
- builders.add(attributeIndexBuilder);
- }
- if (attrIndex.approximateIndex != null)
- {
- AttributeIndexBuilder attributeIndexBuilder =
- new AttributeIndexBuilder(importContext, attrIndex.approximateIndex,
- indexEntryLimit,
- indexBufferSize);
- builders.add(attributeIndexBuilder);
- }
- }
-
- // Create an vlvIndex builder for each VLV index database.
- for (VLVIndex vlvIndex : entryContainer.getVLVIndexes())
- {
- VLVIndexBuilder vlvIndexBuilder =
- new VLVIndexBuilder(importContext, vlvIndex, indexBufferSize);
- builders.add(vlvIndexBuilder);
- }
-
- // Create an index builder for the children index.
- Index id2Children = entryContainer.getID2Children();
- AttributeIndexBuilder attributeIndexBuilder =
- new AttributeIndexBuilder(importContext, id2Children,
- importContext.getConfig().getIndexEntryLimit(),
- indexBufferSize);
- builders.add(attributeIndexBuilder);
-
- // Create an index builder for the subtree index.
- Index id2Subtree = entryContainer.getID2Subtree();
- attributeIndexBuilder =
- new AttributeIndexBuilder(importContext, id2Subtree,
- importContext.getConfig().getIndexEntryLimit(),
- indexBufferSize);
- builders.add(attributeIndexBuilder);
-
- for (IndexBuilder b : builders)
- {
- b.startProcessing();
- }
-
- try
- {
- do
- {
- try
- {
- entry = queue.poll(1000, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- continue;
- }
-
- if (entry != null)
- {
- Transaction txn = null;
-
- Entry existingEntry = null;
- EntryID entryID = null;
-
- if (entry.getAttachment() instanceof Entry)
- {
- // Replace an existing entry.
- existingEntry = (Entry)entry.getAttachment();
- entryID = (EntryID)existingEntry.getAttachment();
-
- // Update the referral database for referral entries.
- dn2uri.replaceEntry(txn, existingEntry, entry);
- }
- else
- {
- // Insert a new entry.
- existingEntry = null;
- ArrayList ids = (ArrayList)entry.getAttachment();
- entryID = (EntryID)ids.get(0);
- entryInsertCount++;
-
- // Update the referral database for referral entries.
- dn2uri.addEntry(txn, entry);
- }
-
- id2entry.put(txn, entryID, entry);
-
- for (IndexBuilder b : builders)
- {
- b.processEntry(existingEntry, entry, entryID);
- }
-
- importedCount++;
- }
- } while (!stopRequested);
-
- for (IndexBuilder b : builders)
- {
- b.stopProcessing();
- }
-
- // Increment the entry count.
- importContext.incrEntryInsertCount(entryInsertCount);
- }
- catch (Exception e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
-
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/Index.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/Index.java
index 0e8b3cb..748faeb 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/Index.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/Index.java
@@ -34,6 +34,8 @@
import org.opends.server.types.*;
import org.opends.server.util.StaticUtils;
+import org.opends.server.backends.jeb.importLDIF.IntegerImportIDSet;
+import org.opends.server.backends.jeb.importLDIF.ImportIDSet;
import static org.opends.messages.JebMessages.*;
import java.util.*;
@@ -112,6 +114,7 @@
*/
private boolean rebuildRunning = false;
+
/**
* Create a new index object.
* @param name The name of the index database within the entryContainer.
@@ -258,6 +261,65 @@
return success;
}
+
+ /**
+ * Add the specified import ID set to the provided key. Used during
+ * substring buffer flushing.
+ *
+ * @param txn A transaction.
+ * @param key The key to add the set to.
+ * @param importIdSet The set of import IDs.
+ * @throws DatabaseException If an database error occurs.
+ */
+ public void insert(Transaction txn, DatabaseEntry key,
+ ImportIDSet importIdSet)
+ throws DatabaseException {
+
+ OperationStatus status;
+ DatabaseEntry data = new DatabaseEntry();
+ status = read(txn, key, data, LockMode.RMW);
+ if(status == OperationStatus.SUCCESS) {
+ ImportIDSet newImportIDSet = new IntegerImportIDSet();
+ if (newImportIDSet.merge(data.getData(), importIdSet, indexEntryLimit))
+ {
+ entryLimitExceededCount++;
+ }
+ data.setData(newImportIDSet.toDatabase());
+ } else if(status == OperationStatus.NOTFOUND) {
+ if(!importIdSet.isDefined()) {
+ entryLimitExceededCount++;
+ }
+ data.setData(importIdSet.toDatabase());
+ } else {
+ //Should never happen during import.
+ throw new DatabaseException();
+ }
+ put(txn,key, data);
+ }
+
+
+ /**
+ * Add the specified entry ID to the provided keys in the keyset.
+ *
+ * @param txn A transaction.
+ * @param keySet The set containing the keys.
+ * @param entryID The entry ID.
+ * @return <CODE>True</CODE> if the insert was successful.
+ * @throws DatabaseException If a database error occurs.
+ */
+ public synchronized
+ boolean insert(Transaction txn, Set<byte[]> keySet, EntryID entryID)
+ throws DatabaseException {
+ for(byte[] key : keySet) {
+ if(insertIDWithRMW(txn, new DatabaseEntry(key), new DatabaseEntry(),
+ entryID.getDatabaseEntry(), entryID) !=
+ OperationStatus.SUCCESS) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private OperationStatus insertIDWithRMW(Transaction txn, DatabaseEntry key,
DatabaseEntry data,
DatabaseEntry entryIDData,
@@ -366,6 +428,25 @@
}
}
+ /**
+ * Delete specified entry ID from all keys in the provided key set.
+ *
+ * @param txn A Transaction.
+ * @param keySet A set of keys.
+ * @param entryID The entry ID to delete.
+ * @throws DatabaseException If a database error occurs.
+ */
+ public synchronized
+ void delete(Transaction txn, Set<byte[]> keySet, EntryID entryID)
+ throws DatabaseException {
+ setTrusted(txn, false);
+ for(byte[] key : keySet) {
+ removeIDWithRMW(txn, new DatabaseEntry(key),
+ new DatabaseEntry(), entryID);
+ }
+ setTrusted(txn, true);
+ }
+
private void removeIDWithRMW(Transaction txn, DatabaseEntry key,
DatabaseEntry data, EntryID entryID)
throws DatabaseException
@@ -833,6 +914,15 @@
}
/**
+ * Return entry limit.
+ *
+ * @return The entry limit.
+ */
+ public int getIndexEntryLimit() {
+ return this.indexEntryLimit;
+ }
+
+ /**
* Set the index trust state.
* @param txn A database transaction, or null if none is required.
* @param trusted True if this index should be trusted or false
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/IndexMergeThread.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/IndexMergeThread.java
deleted file mode 100644
index e2675a7..0000000
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/IndexMergeThread.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2008 Sun Microsystems, Inc.
- */
-package org.opends.server.backends.jeb;
-import org.opends.messages.Message;
-
-import org.opends.server.api.DirectoryThread;
-import org.opends.server.protocols.asn1.ASN1OctetString;
-import org.opends.server.types.LDIFImportConfig;
-
-import com.sleepycat.je.*;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
-import java.util.WeakHashMap;
-
-import org.opends.server.types.DebugLogLevel;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.messages.JebMessages.*;
-import org.opends.server.admin.std.server.LocalDBBackendCfg;
-import static org.opends.server.util.StaticUtils.getFileForPath;
-
-/**
- * A thread to merge a set of intermediate files from an index builder
- * into an index database.
- */
-final class IndexMergeThread extends DirectoryThread
-{
- /**
- * The tracer object for the debug logger.
- */
- private static final DebugTracer TRACER = getTracer();
-
-
- /**
- * The buffer size to use when reading data from disk.
- */
- private static final int INPUT_STREAM_BUFFER_SIZE = 65536;
-
- /**
- * The configuration of the JE backend containing the index.
- */
- private LocalDBBackendCfg config;
-
- /**
- * The LDIF import configuration, which indicates whether we are
- * appending to existing data.
- */
- private LDIFImportConfig ldifImportConfig;
-
-
- /**
- * The indexer to generate and compare index keys.
- */
- private Indexer indexer;
-
- /**
- * The index database being written.
- */
- private Index index;
-
-
- /**
- * The index entry limit.
- */
- private int entryLimit;
-
- /**
- * Indicates whether we are replacing existing data or not.
- */
- private boolean replaceExisting = false;
-
- /**
- * A weak reference hash map used to cache byte arrays for holding DB keys.
- */
- private WeakHashMap<Integer,LinkedList<byte[]>> arrayMap =
- new WeakHashMap<Integer,LinkedList<byte[]>>();
-
-
- /**
- * A file name filter to identify temporary files we have written.
- */
- private FilenameFilter filter = new FilenameFilter()
- {
- public boolean accept(File d, String name)
- {
- return name.startsWith(index.getName());
- }
- };
-
- /**
- * Create a new index merge thread.
- * @param config The configuration of the JE backend containing the index.
- * @param ldifImportConfig The LDIF import configuration, which indicates
- * whether we are appending to existing data.
- * @param index The index database to be written.
- * @param entryLimit The configured index entry limit.
- */
- public IndexMergeThread(LocalDBBackendCfg config,
- LDIFImportConfig ldifImportConfig,
- Index index, int entryLimit)
- {
- super("Index Merge Thread " + index.getName());
-
- this.config = config;
- this.ldifImportConfig = ldifImportConfig;
- this.indexer = index.indexer;
- this.index = index;
- this.entryLimit = entryLimit;
- replaceExisting =
- ldifImportConfig.appendToExistingData() &&
- ldifImportConfig.replaceExistingEntries();
- }
-
- /**
- * Run this thread.
- */
- public void run()
- {
- try
- {
- merge();
- }
- catch (Exception e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- }
- }
-
- /**
- * The merge phase builds the index from intermediate files written
- * during entry processing. Each line of an intermediate file has data for
- * one index key and the keys are in order. For each index key, the data from
- * each intermediate file containing a line for that key must be merged and
- * written to the index.
- * @throws Exception If an error occurs.
- */
- private void merge() throws Exception
- {
- // An ordered map of the current input keys from each file.
- OctetStringKeyComparator comparator =
- new OctetStringKeyComparator(indexer.getComparator());
- TreeMap<ASN1OctetString, MergeValue> inputs =
- new TreeMap<ASN1OctetString, MergeValue>(comparator);
-
- // Open all the files.
- File parentDir = getFileForPath(config.getImportTempDirectory());
- File tempDir = new File(parentDir, config.getBackendId());
- File[] files = tempDir.listFiles(filter);
-
- if (files == null || files.length == 0)
- {
- if (debugEnabled())
- {
- Message message = INFO_JEB_INDEX_MERGE_NO_DATA.get(index.getName());
- TRACER.debugInfo(message.toString());
- }
-
- if(!ldifImportConfig.appendToExistingData())
- {
- index.setTrusted(null, true);
- }
-
- return;
- }
-
- if (debugEnabled())
- {
- Message message = INFO_JEB_INDEX_MERGE_START.get(
- files.length, index.getName());
- TRACER.debugInfo(message.toString());
- }
-
- MergeReader[] readers = new MergeReader[files.length];
-
- Transaction txn = null;
- DatabaseEntry dbKey = new DatabaseEntry();
- DatabaseEntry dbData = new DatabaseEntry();
- byte[] mergedBytes = new byte[0];
- Longs merged = new Longs();
-
-
- try
- {
-
- for (int i = 0; i < files.length; i++)
- {
- // Open a reader for this file.
- BufferedInputStream bufferedStream =
- new BufferedInputStream(new FileInputStream(files[i]),
- INPUT_STREAM_BUFFER_SIZE);
- DataInputStream dis = new DataInputStream(bufferedStream);
- readers[i] = new MergeReader(dis);
-
- // Read a value from each file.
- readNext(inputs, readers, i);
- }
-
- // Process the lowest input value until done.
- try
- {
- while (true)
- {
- ASN1OctetString lowest = inputs.firstKey();
- MergeValue mv = inputs.remove(lowest);
-
- byte[] keyBytes = mv.getKey();
- dbKey.setData(keyBytes);
- List<Longs> addValues = mv.getAddValues();
- List<Longs> delValues = mv.getDelValues();
-
- writeMergedValue:
- {
- merged.clear();
- if (ldifImportConfig.appendToExistingData())
- {
- if (index.read(txn, dbKey, dbData, LockMode.RMW) ==
- OperationStatus.SUCCESS)
- {
- if (dbData.getSize() == 8 &&
- (dbData.getData()[0] & 0x80) == 0x80)
- {
- // Entry limit already exceeded. Just update the
- // undefined size assuming no overlap will occur between
- // the add values and the longs in the DB.
- long undefinedSize =
- JebFormat.entryIDUndefinedSizeFromDatabase(dbData.getData());
-
- for(Longs l : addValues)
- {
- undefinedSize += l.size();
- }
-
- if(replaceExisting)
- {
- for(Longs l : delValues)
- {
- undefinedSize -= l.size();
- }
- }
-
- byte[] undefinedSizeBytes =
- JebFormat.entryIDUndefinedSizeToDatabase(undefinedSize);
- dbData.setData(undefinedSizeBytes);
- index.put(txn, dbKey, dbData);
- break writeMergedValue;
- }
- merged.decode(dbData.getData());
- }
- }
-
- for (Longs l : addValues)
- {
- merged.addAll(l);
- }
-
- if (replaceExisting)
- {
- for (Longs l : delValues)
- {
- merged.deleteAll(l);
- }
- }
-
- if (merged.size() > entryLimit)
- {
- index.incEntryLimitExceededCount();
- if(index.getMaintainCount())
- {
- byte[] undefinedSizeBytes =
- JebFormat.entryIDUndefinedSizeToDatabase(merged.size());
- dbData.setData(undefinedSizeBytes);
- index.put(txn, dbKey, dbData);
- }
- else
- {
- index.writeKey(txn, dbKey, new EntryIDSet());
- }
- }
- else
- {
- mergedBytes = merged.encode(mergedBytes);
-
- dbData.setData(mergedBytes);
- dbData.setSize(merged.encodedSize());
- index.put(txn, dbKey, dbData);
- }
-
- LinkedList<byte[]> arrayList = arrayMap.get(keyBytes.length);
- if (arrayList == null)
- {
- arrayList = new LinkedList<byte[]>();
- arrayMap.put(keyBytes.length, arrayList);
- }
-
- arrayList.add(keyBytes);
- }
-
- for (int r : mv.getReaders())
- {
- readNext(inputs, readers, r);
- }
- }
- }
- catch (NoSuchElementException e)
- {
- }
-
- if(!ldifImportConfig.appendToExistingData())
- {
- index.setTrusted(txn, true);
- }
- }
- catch (Exception e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- throw e;
- }
- finally
- {
- // Close the readers.
- if (readers != null)
- {
- for (MergeReader r : readers)
- {
- if (r != null)
- {
- r.dataInputStream.close();
- }
- }
- }
-
- // Delete all the files.
- if (files != null)
- {
- for (File f : files)
- {
- f.delete();
- }
- }
- }
-
- if (debugEnabled())
- {
- Message message = INFO_JEB_INDEX_MERGE_COMPLETE.get(index.getName());
- TRACER.debugInfo(message.toString());
- }
- }
-
- /**
- * Reads the next line from one of the merge input files.
- * @param inputs The ordered map of current input keys.
- * @param readers The array of input readers.
- * @param reader The index of the input reader we wish to read from.
- * @throws IOException If an I/O error occurs while reading the input file.
- */
- private void readNext(TreeMap<ASN1OctetString, MergeValue> inputs,
- MergeReader[] readers, int reader)
- throws IOException
- {
- MergeReader mergeReader = readers[reader];
- DataInputStream dataInputStream = mergeReader.dataInputStream;
- int len;
- try
- {
- len = dataInputStream.readInt();
- }
- catch (EOFException e)
- {
- // End of file.
- return;
- }
-
- byte[] keyBytes;
- LinkedList<byte[]> arrayList = arrayMap.get(len);
- if (arrayList == null)
- {
- keyBytes = new byte[len];
- arrayList = new LinkedList<byte[]>();
- arrayMap.put(len, arrayList);
- }
- else if (arrayList.isEmpty())
- {
- keyBytes = new byte[len];
- }
- else
- {
- keyBytes = arrayList.removeFirst();
- }
-
- dataInputStream.readFully(keyBytes);
-
- Longs addData = mergeReader.addData;
- addData.decode(dataInputStream);
-
- Longs delData = mergeReader.delData;
- if (replaceExisting)
- {
- delData.decode(dataInputStream);
- }
-
- // If this key is not yet in the ordered map then insert it,
- // otherwise merge the data into the existing data for the key.
-
- ASN1OctetString mk = new ASN1OctetString(keyBytes);
- MergeValue mv = inputs.get(mk);
- if (mv == null)
- {
- mv = new MergeValue(readers.length, entryLimit);
- mv.setKey(keyBytes);
- inputs.put(mk, mv);
- }
- else
- {
- arrayList.add(keyBytes);
- }
-
- mv.mergeData(reader, addData, delData);
- }
-
-}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/JebFormat.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/JebFormat.java
index 631ad53..7a63e5d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/JebFormat.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/JebFormat.java
@@ -365,6 +365,30 @@
}
/**
+ * Decode a integer array using the specified byte array read from DB.
+ *
+ * @param bytes The byte array.
+ * @return An integer array.
+ */
+ public static int[] intArrayFromDatabaseBytes(byte[] bytes) {
+ byte[] decodedBytes = bytes;
+
+ int count = decodedBytes.length / 8;
+ int[] entryIDList = new int[count];
+ for (int pos = 0, i = 0; i < count; i++) {
+ int v = 0;
+ pos +=4;
+ v |= (decodedBytes[pos++] & 0xFFL) << 24;
+ v |= (decodedBytes[pos++] & 0xFFL) << 16;
+ v |= (decodedBytes[pos++] & 0xFFL) << 8;
+ v |= (decodedBytes[pos++] & 0xFFL);
+ entryIDList[i] = v;
+ }
+
+ return entryIDList;
+ }
+
+ /**
* Encode an entry ID value to its database representation.
* @param id The entry ID value to be encoded.
* @return The encoded database value of the entry ID.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/RootContainer.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/RootContainer.java
index 21feae2..ee30ca5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/RootContainer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/RootContainer.java
@@ -26,17 +26,14 @@
*/
package org.opends.server.backends.jeb;
import org.opends.messages.Message;
-
import com.sleepycat.je.config.EnvironmentParams;
import com.sleepycat.je.config.ConfigParam;
import com.sleepycat.je.*;
-
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.*;
import java.io.File;
import java.io.FilenameFilter;
-
import org.opends.server.monitors.DatabaseEnvironmentMonitor;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DN;
@@ -48,7 +45,6 @@
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.core.DirectoryServer;
import org.opends.server.config.ConfigException;
-
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
@@ -79,6 +75,9 @@
*/
private Environment env;
+ //Used to force a checkpoint during import.
+ private CheckpointConfig importForceCheckPoint = new CheckpointConfig();
+
/**
* The backend configuration.
*/
@@ -129,6 +128,7 @@
this.compressedSchema = null;
config.addLocalDBChangeListener(this);
+ importForceCheckPoint.setForce(true);
}
/**
@@ -1029,4 +1029,27 @@
messages);
return ccr;
}
+
+ /**
+ * Force a checkpoint.
+ *
+ * @throws DatabaseException If a database error occurs.
+ */
+ public void importForceCheckPoint() throws DatabaseException {
+ env.checkpoint(importForceCheckPoint);
+ }
+
+ /**
+ * Run the cleaner and return the number of files cleaned.
+ *
+ * @return The number of logs cleaned.
+ * @throws DatabaseException If a database error occurs.
+ */
+ public int cleanedLogFiles() throws DatabaseException {
+ int cleaned, totalCleaned = 0;
+ while((cleaned = env.cleanLog()) > 0) {
+ totalCleaned += cleaned;
+ }
+ return totalCleaned;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/VLVIndexBuilder.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/VLVIndexBuilder.java
deleted file mode 100644
index 1fd5903..0000000
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/VLVIndexBuilder.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2008 Sun Microsystems, Inc.
- */
-package org.opends.server.backends.jeb;
-
-import static org.opends.server.util.StaticUtils.getFileForPath;
-import org.opends.server.types.*;
-
-import java.util.*;
-import java.io.*;
-
-import com.sleepycat.je.DatabaseException;
-
-/**
- * This class is used to create an VLV vlvIndex for an import process.
- * It is used as follows.
- * <pre>
- * startProcessing();
- * processEntry(entry);
- * processEntry(entry);
- * ...
- * stopProcessing();
- * merge();
- * </pre>
- */
-public class VLVIndexBuilder implements IndexBuilder
-{
- /**
- * The directory in which temporary merge files are held.
- */
- private final File tempDir;
-
- /**
- * The vlvIndex database.
- */
- private final VLVIndex vlvIndex;
-
- /**
- * The add write buffer.
- */
- private TreeMap<SortValues,EntryID> addBuffer;
-
- /**
- * The delete write buffer.
- */
- private TreeMap<SortValues,EntryID> delBuffer;
-
- /**
- * The write buffer size.
- */
- private final int bufferSize;
-
- /**
- * Current output file number.
- */
- private int fileNumber = 0;
-
- /**
- * A unique prefix for temporary files to prevent conflicts.
- */
- private final String fileNamePrefix;
-
- /**
- * Indicates whether we are replacing existing data or not.
- */
- private final boolean replaceExisting;
-
- /**
- * A file name filter to identify temporary files we have written.
- */
- private final FilenameFilter filter = new FilenameFilter()
- {
- public boolean accept(File d, String name)
- {
- return name.startsWith(fileNamePrefix);
- }
- };
-
- /**
- * Construct an vlvIndex builder.
- *
- * @param importContext The import context.
- * @param vlvIndex The vlvIndex database we are writing.
- * @param bufferSize The amount of memory available for buffering.
- */
- public VLVIndexBuilder(ImportContext importContext,
- VLVIndex vlvIndex, long bufferSize)
- {
- File parentDir = getFileForPath(importContext.getConfig()
- .getImportTempDirectory());
- this.tempDir = new File(parentDir,
- importContext.getConfig().getBackendId());
-
- this.vlvIndex = vlvIndex;
- this.bufferSize = (int)bufferSize/100;
- long tid = Thread.currentThread().getId();
- this.fileNamePrefix = vlvIndex.getName() + "_" + tid + "_";
- this.replaceExisting =
- importContext.getLDIFImportConfig().appendToExistingData() &&
- importContext.getLDIFImportConfig().replaceExistingEntries();
- }
-
- /**
- * {@inheritDoc}
- */
- public void startProcessing()
- {
- // Clean up any work files left over from a previous run.
- File[] files = tempDir.listFiles(filter);
- if (files != null)
- {
- for (File f : files)
- {
- f.delete();
- }
- }
-
- addBuffer = new TreeMap<SortValues,EntryID>();
- delBuffer = new TreeMap<SortValues, EntryID>();
- }
-
- /**
- * {@inheritDoc}
- */
- public void processEntry(Entry oldEntry, Entry newEntry, EntryID entryID)
- throws DatabaseException, IOException, DirectoryException
- {
- SortValues newValues = new SortValues(entryID, newEntry,
- vlvIndex.sortOrder);
- // Update the vlvIndex for this entry.
- if (oldEntry != null)
- {
- if(vlvIndex.shouldInclude(oldEntry))
- {
- // This is an entry being replaced.
- SortValues oldValues = new SortValues(entryID, oldEntry,
- vlvIndex.sortOrder);
- removeValues(oldValues, entryID);
- }
-
- }
-
- if(vlvIndex.shouldInclude(newEntry))
- {
- insertValues(newValues, entryID);
- }
- }
-
-
-
- /**
- * {@inheritDoc}
- */
- public void stopProcessing() throws IOException
- {
- flushBuffer();
- }
-
- /**
- * Record the insertion of an entry ID.
- * @param sortValues The sort values.
- * @param entryID The entry ID.
- * @throws IOException If an I/O error occurs while writing an intermediate
- * file.
- */
- private void insertValues(SortValues sortValues, EntryID entryID)
- throws IOException
- {
- if (addBuffer.size() + delBuffer.size() >= bufferSize)
- {
- flushBuffer();
- }
-
- addBuffer.put(sortValues, entryID);
- }
-
- /**
- * Record the deletion of an entry ID.
- * @param sortValues The sort values to remove.
- * @param entryID The entry ID.
- * @throws IOException If an I/O error occurs while writing an intermediate
- * file.
- */
- private void removeValues(SortValues sortValues, EntryID entryID)
- throws IOException
- {
- if (addBuffer.size() + delBuffer.size() >= bufferSize)
- {
- flushBuffer();
- }
-
- delBuffer.remove(sortValues);
- }
-
- /**
- * Called when the buffer is full. It first sorts the buffer using the same
- * key comparator used by the vlvIndex database. Then it merges all the
- * IDs for the same key together and writes each key and its list of IDs
- * to an intermediate binary file.
- * A list of deleted IDs is only present if we are replacing existing entries.
- *
- * @throws IOException If an I/O error occurs while writing an intermediate
- * file.
- */
- private void flushBuffer() throws IOException
- {
- if (addBuffer.size() + delBuffer.size() == 0)
- {
- return;
- }
-
- // Start a new file.
- fileNumber++;
- String fileName = fileNamePrefix + String.valueOf(fileNumber) + "_add";
- File file = new File(tempDir, fileName);
- BufferedOutputStream bufferedStream =
- new BufferedOutputStream(new FileOutputStream(file));
- DataOutputStream dataStream = new DataOutputStream(bufferedStream);
-
- try
- {
- for (SortValues values : addBuffer.keySet())
- {
- dataStream.writeLong(values.getEntryID());
- for(AttributeValue value : values.getValues())
- {
- if(value != null)
- {
- byte[] valueBytes = value.getValueBytes();
- dataStream.writeInt(valueBytes.length);
- dataStream.write(valueBytes);
- }
- else
- {
- dataStream.writeInt(0);
- }
- }
- }
- }
- finally
- {
- dataStream.close();
- }
-
- if (replaceExisting)
- {
- fileName = fileNamePrefix + String.valueOf(fileNumber) + "_del";
- file = new File(tempDir, fileName);
- bufferedStream =
- new BufferedOutputStream(new FileOutputStream(file));
- dataStream = new DataOutputStream(bufferedStream);
-
- try
- {
-
- for (SortValues values : delBuffer.keySet())
- {
- dataStream.writeLong(values.getEntryID());
- for(AttributeValue value : values.getValues())
- {
- byte[] valueBytes = value.getValueBytes();
- dataStream.writeInt(valueBytes.length);
- dataStream.write(valueBytes);
- }
- }
- }
- finally
- {
- dataStream.close();
- }
- }
-
- addBuffer = new TreeMap<SortValues,EntryID>();
- delBuffer = new TreeMap<SortValues, EntryID>();
- }
-
- /**
- * Get a string that identifies this vlvIndex builder.
- *
- * @return A string that identifies this vlvIndex builder.
- */
- public String toString()
- {
- return vlvIndex.toString() + " builder";
- }
-}
-
-
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/VLVIndexMergeThread.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/VLVIndexMergeThread.java
deleted file mode 100644
index 645023a..0000000
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/VLVIndexMergeThread.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2008 Sun Microsystems, Inc.
- */
-package org.opends.server.backends.jeb;
-import org.opends.messages.Message;
-
-import org.opends.server.api.DirectoryThread;
-import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import org.opends.server.admin.std.server.LocalDBBackendCfg;
-import org.opends.server.types.*;
-import org.opends.server.protocols.asn1.ASN1OctetString;
-import static org.opends.server.util.StaticUtils.getFileForPath;
-import static org.opends.messages.JebMessages.
- INFO_JEB_INDEX_MERGE_NO_DATA;
-import static org.opends.messages.JebMessages.
- INFO_JEB_INDEX_MERGE_START;
-import static org.opends.messages.JebMessages.
- INFO_JEB_INDEX_MERGE_COMPLETE;
-import java.util.*;
-import java.io.*;
-
-import com.sleepycat.je.Transaction;
-
-/**
- * A thread to merge a set of intermediate files from an vlvIndex builder
- * into an vlvIndex database.
- */
-class VLVIndexMergeThread extends DirectoryThread
-{
- /**
- * The tracer object for the debug logger.
- */
- private static final DebugTracer TRACER = getTracer();
-
-
- /**
- * The buffer size to use when reading data from disk.
- */
- private static final int INPUT_STREAM_BUFFER_SIZE = 65536;
-
- /**
- * The configuration of the JE backend containing the vlvIndex.
- */
- private LocalDBBackendCfg config;
-
- /**
- * The LDIF import configuration, which indicates whether we are
- * appending to existing data.
- */
- private LDIFImportConfig ldifImportConfig;
-
- /**
- * The vlvIndex database being written.
- */
- private VLVIndex vlvIndex;
-
- /**
- * Indicates whether we are replacing existing data or not.
- */
- private boolean replaceExisting = false;
-
- private List<DataInputStream> addDataStreams;
- private List<DataInputStream> delDataStreams;
-
- /**
- * A weak reference hash map used to cache last sort values read from files.
- */
- private HashMap<DataInputStream,SortValues> lastAddValues =
- new HashMap<DataInputStream,SortValues>();
-
- private HashMap<DataInputStream,SortValues> lastDelValues =
- new HashMap<DataInputStream,SortValues>();
-
-
- /**
- * A file name filter to identify temporary files we have written.
- */
- private FilenameFilter filter = new FilenameFilter()
- {
- public boolean accept(File d, String name)
- {
- return name.startsWith(vlvIndex.getName());
- }
- };
-
- /**
- * Create a new vlvIndex merge thread.
- * @param config The configuration of the JE backend containing the vlvIndex.
- * @param ldifImportConfig The LDIF import configuration, which indicates
- * whether we are appending to existing data.
- * @param vlvIndex The vlvIndex database to be written.
- */
- public VLVIndexMergeThread(LocalDBBackendCfg config,
- LDIFImportConfig ldifImportConfig,
- VLVIndex vlvIndex)
- {
- super("Index Merge Thread " + vlvIndex.getName());
-
- this.config = config;
- this.ldifImportConfig = ldifImportConfig;
- this.vlvIndex = vlvIndex;
- replaceExisting =
- ldifImportConfig.appendToExistingData() &&
- ldifImportConfig.replaceExistingEntries();
- addDataStreams = new ArrayList<DataInputStream>();
- delDataStreams = new ArrayList<DataInputStream>();
- lastAddValues = new HashMap<DataInputStream, SortValues>();
- lastDelValues = new HashMap<DataInputStream, SortValues>();
- }
-
- /**
- * Run this thread.
- */
- public void run()
- {
- try
- {
- merge();
- }
- catch (Exception e)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
-
- throw new RuntimeException(e);
- }
- }
-
- /**
- * The merge phase builds the vlvIndex from intermediate files written
- * during entry processing. Each line of an intermediate file has data for
- * one vlvIndex key and the keys are in order. For each vlvIndex key, the data
- * from each intermediate file containing a line for that key must be merged
- * and written to the vlvIndex.
- * @throws Exception If an error occurs.
- */
- public void merge() throws Exception
- {
- // Open all the files.
- File parentDir = getFileForPath(config.getImportTempDirectory());
- File tempDir = new File(parentDir, config.getBackendId());
- File[] files = tempDir.listFiles(filter);
-
- if (files == null || files.length == 0)
- {
- Message message = INFO_JEB_INDEX_MERGE_NO_DATA.get(vlvIndex.getName());
- logError(message);
- return;
- }
-
- if (debugEnabled())
- {
- Message message = INFO_JEB_INDEX_MERGE_START.get(
- files.length, vlvIndex.getName());
- TRACER.debugInfo(message.toString());
- }
-
- Transaction txn = null;
-
- try
- {
- for (int i = 0; i < files.length; i++)
- {
- // Open a reader for this file.
- BufferedInputStream bufferedStream =
- new BufferedInputStream(new FileInputStream(files[i]),
- INPUT_STREAM_BUFFER_SIZE);
- DataInputStream dis = new DataInputStream(bufferedStream);
- if(files[i].getName().endsWith("_add"))
- {
- addDataStreams.add(dis);
- }
- else if(files[i].getName().endsWith("_del"))
- {
- delDataStreams.add(dis);
- }
- }
-
- while(true)
- {
- SortValuesSet currentSet = null;
- SortValues maxKey = null;
- // Get a set by using the smallest sort values
- SortValues addValue = readNextAdd(maxKey);
-
- // Process deletes first for this set
- if(replaceExisting)
- {
- SortValues delValue = readNextDel(maxKey);
- if(delValue != null)
- {
- if(currentSet == null)
- {
- if(addValue == null || delValue.compareTo(addValue) < 0)
- {
- // Set the current set using the del value.
- currentSet = vlvIndex.getSortValuesSet(txn,
- delValue.getEntryID(),
- delValue.getValues());
- }
- else
- {
- // Set the current set using the add value.
- currentSet = vlvIndex.getSortValuesSet(txn,
- addValue.getEntryID(),
- addValue.getValues());
- }
- maxKey = currentSet.getKeySortValues();
- }
- }
-
- while(delValue != null)
- {
- currentSet.remove(delValue.getEntryID(), delValue.getValues());
- delValue = readNextDel(maxKey);
- }
- }
-
- if(addValue != null)
- {
- if(currentSet == null)
- {
- currentSet = vlvIndex.getSortValuesSet(txn, addValue.getEntryID(),
- addValue.getValues());
- maxKey = currentSet.getKeySortValues();
- }
-
- while(addValue != null)
- {
- currentSet.add(addValue.getEntryID(), addValue.getValues());
- if(currentSet.size() > vlvIndex.getSortedSetCapacity())
- {
- // Need to split the set as it has exceeded the entry limit.
- SortValuesSet splitSortValuesSet =
- currentSet.split(currentSet.size() / 2);
- // Find where the set split and see if the last added values
- // is before or after the split.
- SortValues newKey = currentSet.getKeySortValues();
-
- if(debugEnabled())
- {
- TRACER.debugInfo("SortValuesSet with key %s has reached" +
- " the entry size of %d. Spliting into two sets with " +
- " keys %s and %s.", maxKey, currentSet.size(), newKey,
- maxKey);
- }
-
- if(addValue.compareTo(newKey) < 0)
- {
- // The last added values is before the split so we have to
- // keep adding to it.
- vlvIndex.putSortValuesSet(txn, splitSortValuesSet);
- maxKey = newKey;
- }
- else
- {
- // The last added values is after the split so we can add to
- // the newly split set.
- vlvIndex.putSortValuesSet(txn, currentSet);
- currentSet = splitSortValuesSet;
- }
- }
- addValue = readNextAdd(maxKey);
- }
- }
-
- // We should have made all the modifications to this set. Store it back
- // to database.
- vlvIndex.putSortValuesSet(txn, currentSet);
-
- if(maxKey == null)
- {
- // If we reached here, we should have processed all the sets and
- // there should be nothing left to add or delete.
- break;
- }
- }
-
- if(!ldifImportConfig.appendToExistingData())
- {
- vlvIndex.setTrusted(txn, true);
- }
- }
- finally
- {
- for(DataInputStream stream : addDataStreams)
- {
- stream.close();
- }
-
- for(DataInputStream stream : delDataStreams)
- {
- stream.close();
- }
-
- // Delete all the files.
- if (files != null)
- {
- for (File f : files)
- {
- f.delete();
- }
- }
- }
-
- if (debugEnabled())
- {
- Message message = INFO_JEB_INDEX_MERGE_COMPLETE.get(vlvIndex.getName());
- TRACER.debugInfo(message.toString());
- }
- }
-
- /**
- * Reads the next sort values from the files that is smaller then the max.
- * @throws IOException If an I/O error occurs while reading the input file.
- */
- private SortValues readNextAdd(SortValues maxValues)
- throws IOException
- {
- for(DataInputStream dataInputStream : addDataStreams)
- {
- if(lastAddValues.get(dataInputStream) == null)
- {
- try
- {
- SortKey[] sortKeys = vlvIndex.sortOrder.getSortKeys();
- EntryID id = new EntryID(dataInputStream.readLong());
- AttributeValue[] attrValues =
- new AttributeValue[sortKeys.length];
- for(int i = 0; i < sortKeys.length; i++)
- {
- SortKey sortKey = sortKeys[i];
- int length = dataInputStream.readInt();
- if(length > 0)
- {
- byte[] valueBytes = new byte[length];
- if(length == dataInputStream.read(valueBytes, 0, length))
- {
- attrValues[i] =
- new AttributeValue(sortKey.getAttributeType(),
- new ASN1OctetString(valueBytes));
- }
- }
-
- }
- lastAddValues.put(dataInputStream,
- new SortValues(id, attrValues, vlvIndex.sortOrder));
- }
- catch (EOFException e)
- {
- continue;
- }
- }
- }
-
- Map.Entry<DataInputStream, SortValues> smallestEntry = null;
- for(Map.Entry<DataInputStream, SortValues> entry :
- lastAddValues.entrySet())
- {
- if(smallestEntry == null ||
- entry.getValue().compareTo(smallestEntry.getValue()) < 0)
- {
- smallestEntry = entry;
- }
- }
-
- if(smallestEntry != null)
- {
- SortValues smallestValues = smallestEntry.getValue();
- if(maxValues == null || smallestValues.compareTo(maxValues) <= 0)
- {
- lastAddValues.remove(smallestEntry.getKey());
- return smallestValues;
- }
- }
-
- return null;
- }
-
- /**
- * Reads the next sort values from the files that is smaller then the max.
- * @throws IOException If an I/O error occurs while reading the input file.
- */
- private SortValues readNextDel(SortValues maxValues)
- throws IOException
- {
- for(DataInputStream dataInputStream : delDataStreams)
- {
- if(lastDelValues.get(dataInputStream) == null)
- {
- try
- {
- EntryID id = new EntryID(dataInputStream.readLong());
- AttributeValue[] attrValues =
- new AttributeValue[vlvIndex.sortOrder.getSortKeys().length];
- int i = 0;
- for(SortKey sortKey : vlvIndex.sortOrder.getSortKeys())
- {
- int length = dataInputStream.readInt();
- if(length > 0)
- {
- byte[] valueBytes = new byte[length];
- if(length == dataInputStream.read(valueBytes, 0, length))
- {
- attrValues[i] =
- new AttributeValue(sortKey.getAttributeType(),
- new ASN1OctetString(valueBytes));
- }
- }
- }
- lastDelValues.put(dataInputStream,
- new SortValues(id, attrValues,
- vlvIndex.sortOrder));
- }
- catch (EOFException e)
- {
- continue;
- }
- }
- }
-
- Map.Entry<DataInputStream, SortValues> smallestEntry = null;
- for(Map.Entry<DataInputStream, SortValues> entry :
- lastDelValues.entrySet())
- {
- if(smallestEntry == null ||
- entry.getValue().compareTo(smallestEntry.getValue()) < 0)
- {
- smallestEntry = entry;
- }
- }
-
- if(smallestEntry != null)
- {
- SortValues smallestValues = smallestEntry.getValue();
- if(maxValues == null || smallestValues.compareTo(maxValues) <= 0)
- {
- lastDelValues.remove(smallestEntry.getKey());
- return smallestValues;
- }
- }
-
- return null;
- }
-}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
new file mode 100644
index 0000000..10dbf07
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/BufferManager.java
@@ -0,0 +1,359 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2008 Sun Microsystems, Inc.
+ */
+
+
+package org.opends.server.backends.jeb.importLDIF;
+
+import org.opends.server.types.Entry;
+import org.opends.server.backends.jeb.Index;
+import org.opends.server.backends.jeb.EntryID;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.dbi.MemoryBudget;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import org.opends.messages.Message;
+import static org.opends.messages.JebMessages.*;
+import java.util.*;
+
+
+/**
+ * Manages a shared cache among worker threads that caches substring
+ * key/value pairs to avoid DB cache access. Once the cache is above it's
+ * memory usage limit, it will start slowly flushing keys (similar to the
+ * JEB eviction process) until it is under the limit.
+ */
+
+public class BufferManager {
+
+ //Memory usage counter.
+ private long memoryUsage=0;
+
+ //Memory limit.
+ private long memoryLimit;
+
+ //Next element in the cache to start flushing at during next flushAll cycle.
+ private KeyHashElement nextElem;
+
+ //Extra bytes to flushAll.
+ private final int extraBytes = 1024 * 1024;
+
+ //Counters for statistics, total is number of accesses, hit is number of
+ //keys found in cache.
+ private long total=0, hit=0;
+
+ //Actual map used to buffer keys.
+ private TreeMap<KeyHashElement, KeyHashElement> elementMap =
+ new TreeMap<KeyHashElement, KeyHashElement>();
+
+ //Overhead values determined from using JHAT. They appear to be the same
+ //for both 32 and 64 bit machines. Close enough.
+ private final static int TREEMAP_ENTRY_OVERHEAD = 29;
+ private final static int KEY_ELEMENT_OVERHEAD = 28;
+
+ //Count of number of worker threads.
+ private int importThreadCount;
+
+ //Used to prevent memory flush
+ private boolean limitFlush = true;
+
+
+ /**
+ * Create buffer manager instance.
+ *
+ * @param memoryLimit The memory limit.
+ * @param importThreadCount The count of import worker threads.
+ */
+ public BufferManager(long memoryLimit, int importThreadCount) {
+ this.memoryLimit = memoryLimit;
+ this.nextElem = null;
+ this.importThreadCount = importThreadCount;
+ }
+
+ /**
+ * Insert an entry ID into the buffer using the both the specified index and
+ * entry to build a key set. Will flush the buffer if over the memory limit.
+ *
+ * @param index The index to use.
+ * @param entry The entry used to build the key set.
+ * @param entryID The entry ID to insert into the key set.
+ * @param txn A transaction.
+ * @throws DatabaseException If a problem happened during a flushAll cycle.
+ */
+ void insert(Index index, Entry entry,
+ EntryID entryID, Transaction txn)
+ throws DatabaseException {
+ int entryLimit = index.getIndexEntryLimit();
+ Set<byte[]> keySet = new HashSet<byte[]>();
+ index.indexer.indexEntry(txn, entry, keySet);
+ synchronized(elementMap) {
+ for(byte[] key : keySet) {
+ KeyHashElement elem = new KeyHashElement(key, index, entryID);
+ total++;
+ if(!elementMap.containsKey(elem)) {
+ elementMap.put(elem, elem);
+ memoryUsage += TREEMAP_ENTRY_OVERHEAD + elem.getMemorySize();
+ } else {
+ KeyHashElement curElem = elementMap.get(elem);
+ if(curElem.isDefined()) {
+ int oldSize = curElem.getMemorySize();
+ curElem.addEntryID(entryID, entryLimit);
+ int newSize = curElem.getMemorySize();
+ //Might have moved from defined to undefined.
+ memoryUsage += (newSize - oldSize);
+ hit++;
+ }
+ }
+ }
+ //If over the memory limit and import hasn't completed
+ //flush some keys from the cache to make room.
+ if((memoryUsage > memoryLimit) && limitFlush) {
+ flushUntilUnderLimit();
+ }
+ }
+ }
+
+ /**
+ * Flush the buffer to DB until the buffer is under the memory limit.
+ *
+ * @throws DatabaseException If a problem happens during an index insert.
+ */
+ private void flushUntilUnderLimit() throws DatabaseException {
+ Iterator<KeyHashElement> iter;
+ if(nextElem == null) {
+ iter = elementMap.keySet().iterator();
+ } else {
+ iter = elementMap.tailMap(nextElem).keySet().iterator();
+ }
+ while((memoryUsage + extraBytes > memoryLimit) && limitFlush) {
+ if(iter.hasNext()) {
+ KeyHashElement curElem = iter.next();
+ //Never flush undefined elements.
+ if(curElem.isDefined()) {
+ Index index = curElem.getIndex();
+ index.insert(null, new DatabaseEntry(curElem.getKey()),
+ curElem.getIDSet());
+ memoryUsage -= TREEMAP_ENTRY_OVERHEAD + curElem.getMemorySize();
+ if(limitFlush) {
+ iter.remove();
+ }
+ }
+ } else {
+ //Wrapped around, start at the first element.
+ nextElem = elementMap.firstKey();
+ iter = elementMap.keySet().iterator();
+ }
+ }
+ //Start at this element next flushAll cycle.
+ nextElem = iter.next();
+ }
+
+ /**
+ * Called from main thread to prepare for final buffer flush at end of
+ * ldif load.
+ */
+ void prepareFlush() {
+ limitFlush=false;
+ Message msg =
+ INFO_JEB_IMPORT_LDIF_BUFFER_FLUSH.get(elementMap.size(), total, hit);
+ logError(msg);
+ }
+
+ /**
+ * Writes all of the buffer elements to DB. The specific id is used to
+ * share the buffer among the worker threads so this function can be
+ * multi-threaded.
+ *
+ * @param id The thread id.
+ * @throws DatabaseException If an error occurred during the insert.
+ */
+ void flushAll(int id) throws DatabaseException {
+ TreeSet<KeyHashElement> tSet =
+ new TreeSet<KeyHashElement>(elementMap.keySet());
+ Iterator<KeyHashElement> iter = tSet.iterator();
+ int i=0;
+ while(iter.hasNext()) {
+ KeyHashElement curElem = iter.next();
+ Index index = curElem.getIndex();
+ //Each thread handles a piece of the buffer based on its thread id.
+ if((i % importThreadCount) == id) {
+ index.insert(null, new DatabaseEntry(curElem.getKey()),
+ curElem.getIDSet());
+ }
+ i++;
+ }
+ }
+
+ /**
+ * Class used to represent an element in the buffer.
+ */
+ class KeyHashElement implements Comparable {
+
+ //Bytes representing the key.
+ private byte[] key;
+
+ //Hash code returned from the System.identityHashCode method on the index
+ //object.
+ private int indexHashCode;
+
+ //Index related to the element.
+ private Index index;
+
+ //The set of IDs related to the key.
+ private ImportIDSet importIDSet;
+
+ /**
+ * Create instance of an element for the specified key and index, the add
+ * the specified entry ID to the ID set.
+ *
+ * @param key The key.
+ * @param index The index.
+ * @param entryID The entry ID to start off with.
+ */
+ public KeyHashElement(byte[] key, Index index, EntryID entryID) {
+ this.key = key;
+ this.index = index;
+ //Use the integer set for right now. This is good up to 2G number of
+ //entries. There is also a LongImportSet, but it currently isn't used.
+ this.importIDSet = new IntegerImportIDSet(entryID);
+ //Used if there when there are conflicts if two or more indexes have
+ //the same key.
+ this.indexHashCode = System.identityHashCode(index);
+ }
+
+ /**
+ * Add an entry ID to the set.
+ *
+ * @param entryID The entry ID to add.
+ * @param entryLimit The entry limit
+ */
+ void addEntryID(EntryID entryID, int entryLimit) {
+ importIDSet.addEntryID(entryID, entryLimit);
+ }
+
+ /**
+ * Return the index.
+ *
+ * @return The index.
+ */
+ Index getIndex(){
+ return index;
+ }
+
+ /**
+ * Return the key.
+ *
+ * @return The key.
+ */
+ byte[] getKey() {
+ return key;
+ }
+
+ /**
+ * Return the ID set.
+ * @return The import ID set.
+ */
+ ImportIDSet getIDSet() {
+ return importIDSet;
+ }
+
+ /**
+ * Return if the ID set is defined or not.
+ *
+ * @return <CODE>True</CODE> if the ID set is defined.
+ */
+ boolean isDefined() {
+ return importIDSet.isDefined();
+ }
+
+ /**
+ * Compare the bytes of two keys.
+ *
+ * @param a Key a.
+ * @param b Key b.
+ * @return 0 if the keys are equal, -1 if key a is less than key b, 1 if
+ * key a is greater than key b.
+ */
+ private int compare(byte[] a, byte[] b) {
+ int i;
+ for (i = 0; i < a.length && i < b.length; i++) {
+ if (a[i] > b[i]) {
+ return 1;
+ }
+ else if (a[i] < b[i]) {
+ return -1;
+ }
+ }
+ if (a.length == b.length) {
+ return 0;
+ }
+ if (a.length > b.length){
+ return 1;
+ }
+ else {
+ return -1;
+ }
+ }
+
+ /**
+ * Compare the specified object to the current object. If the keys are
+ * equal, then the indexHashCode value is used as a tie-breaker.
+ *
+ * @param o The object representing a KeyHashElement.
+ * @return 0 if the objects are equal, -1 if the current object is less
+ * than the specified object, 1 otherwise.
+ */
+ public int compareTo(Object o) {
+ if (o == null) {
+ throw new NullPointerException();
+ }
+ KeyHashElement inElem = (KeyHashElement) o;
+ int keyCompare = compare(key, inElem.key);
+ if(keyCompare == 0) {
+ if(indexHashCode == inElem.indexHashCode) {
+ return 0;
+ } else if(indexHashCode < inElem.indexHashCode) {
+ return -1;
+ } else {
+ return 1;
+ }
+ } else {
+ return keyCompare;
+ }
+ }
+
+ /**
+ * Return the current total memory size of the element.
+ * @return The memory size estimate of a KeyHashElement.
+ */
+ int getMemorySize() {
+ return KEY_ELEMENT_OVERHEAD +
+ MemoryBudget.byteArraySize(key.length) +
+ importIDSet.getMemorySize();
+ }
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportContext.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/DNContext.java
similarity index 60%
rename from opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportContext.java
rename to opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/DNContext.java
index 5a8cd61..3924313 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportContext.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/DNContext.java
@@ -24,24 +24,27 @@
*
* Copyright 2006-2008 Sun Microsystems, Inc.
*/
-package org.opends.server.backends.jeb;
+package org.opends.server.backends.jeb.importLDIF;
import org.opends.server.types.DN;
-import org.opends.server.types.Entry;
import org.opends.server.types.LDIFImportConfig;
+import org.opends.server.types.AttributeType;
import org.opends.server.util.LDIFReader;
import org.opends.server.admin.std.server.LocalDBBackendCfg;
+import org.opends.server.backends.jeb.*;
+
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Transaction;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
/**
* This class represents the import context for a destination base DN.
*/
-public class ImportContext
-{
+public class DNContext {
/**
* The destination base DN.
@@ -84,15 +87,35 @@
private EntryContainer srcEntryContainer;
/**
- * The amount of buffer memory available in bytes.
- */
- private long bufferSize;
-
- /**
- * A queue of entries that have been read from the LDIF and are ready
+ * A queue of elements that have been read from the LDIF and are ready
* to be imported.
*/
- private BlockingQueue<Entry> queue;
+
+ private BlockingQueue<WorkElement> workQueue;
+
+
+ //This currently isn't used.
+ private ArrayList<VLVIndex> vlvIndexes = new ArrayList<VLVIndex>();
+
+ /**
+ * The maximum number of parent ID values that we will remember.
+ */
+ private static final int PARENT_ID_MAP_SIZE = 100;
+
+ /**
+ * Map of likely parent entry DNs to their entry IDs.
+ */
+ private HashMap<DN,EntryID> parentIDMap =
+ new HashMap<DN,EntryID>(PARENT_ID_MAP_SIZE);
+
+ //Map of pending DNs added to the work queue. Used to check if a parent
+ //entry has been added, but isn't in the dn2id DB.
+ private ConcurrentHashMap<DN,DN> pendingMap =
+ new ConcurrentHashMap<DN, DN>() ;
+
+ //Used to synchronize the parent ID map, since multiple worker threads
+ //can be accessing it.
+ private Object synchObject = new Object();
/**
* The number of LDAP entries added to the database, used to update the
@@ -114,22 +137,28 @@
*/
private ArrayList<EntryID> IDs;
- /**
- * Get the import entry queue.
- * @return The import entry queue.
- */
- public BlockingQueue<Entry> getQueue()
- {
- return queue;
- }
+ //The buffer manager used to hold the substring cache.
+ private BufferManager bufferManager;
+
/**
- * Set the import entry queue.
- * @param queue The import entry queue.
+ * Get the work queue.
+ *
+ * @return The work queue.
*/
- public void setQueue(BlockingQueue<Entry> queue)
- {
- this.queue = queue;
+ public BlockingQueue<WorkElement> getWorkQueue() {
+ return workQueue;
+ }
+
+
+ /**
+ * Set the work queue to the specified work queue.
+ *
+ * @param workQueue The work queue.
+ */
+ public void
+ setWorkQueue(BlockingQueue<WorkElement> workQueue) {
+ this.workQueue = workQueue;
}
/**
@@ -242,24 +271,6 @@
}
/**
- * Get the available buffer size in bytes.
- * @return The available buffer size.
- */
- public long getBufferSize()
- {
- return bufferSize;
- }
-
- /**
- * Set the available buffer size in bytes.
- * @param bufferSize The available buffer size in bytes.
- */
- public void setBufferSize(long bufferSize)
- {
- this.bufferSize = bufferSize;
- }
-
- /**
* Get the number of new LDAP entries imported into the entry database.
* @return The number of new LDAP entries imported into the entry database.
*/
@@ -384,4 +395,141 @@
}
}
-}
+
+ /**
+ * Return the attribute type attribute index map.
+ *
+ * @return The attribute type attribute index map.
+ */
+ public Map<AttributeType, AttributeIndex> getAttrIndexMap() {
+ return entryContainer.getAttributeIndexMap();
+ }
+
+ /**
+ * Set all the indexes to trusted.
+ *
+ * @throws DatabaseException If the trusted value cannot be updated in the
+ * index DB.
+ */
+ public void setIndexesTrusted() throws DatabaseException {
+ entryContainer.getID2Children().setTrusted(null,true);
+ entryContainer.getID2Subtree().setTrusted(null, true);
+ for(AttributeIndex attributeIndex :
+ entryContainer.getAttributeIndexes()) {
+ Index index;
+ if((index = attributeIndex.getEqualityIndex()) != null) {
+ index.setTrusted(null, true);
+ }
+ if((index=attributeIndex.getPresenceIndex()) != null) {
+ index.setTrusted(null, true);
+ }
+ if((index=attributeIndex.getSubstringIndex()) != null) {
+ index.setTrusted(null, true);
+ }
+ if((index=attributeIndex.getOrderingIndex()) != null) {
+ index.setTrusted(null, true);
+ }
+ if((index=attributeIndex.getApproximateIndex()) != null) {
+ index.setTrusted(null, true);
+ }
+ }
+ }
+
+
+ /**
+ * Get the Entry ID of the parent entry.
+ * @param parentDN The parent DN.
+ * @param dn2id The DN2ID DB.
+ * @param txn A database transaction,
+ * @return The entry ID of the parent entry.
+ * @throws DatabaseException If a DB error occurs.
+ */
+ public
+ EntryID getParentID(DN parentDN, DN2ID dn2id, Transaction txn)
+ throws DatabaseException {
+ EntryID parentID;
+ synchronized(synchObject) {
+ parentID = parentIDMap.get(parentDN);
+ if (parentID != null) {
+ return parentID;
+ }
+ }
+ int i=0;
+ //If the parent is in the pending map, another thread is working on the
+ //parent entry; wait until that thread is done with the parent.
+ while(isPending(parentDN)) {
+ try {
+ Thread.sleep(50);
+ if(i == 3) {
+ return null;
+ }
+ i++;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ parentID = dn2id.get(txn, parentDN);
+ //If the parent is in dn2id, add it to the cache.
+ if (parentID != null) {
+ synchronized(synchObject) {
+ if (parentIDMap.size() >= PARENT_ID_MAP_SIZE) {
+ Iterator<DN> iterator = parentIDMap.keySet().iterator();
+ iterator.next();
+ iterator.remove();
+ }
+ parentIDMap.put(parentDN, parentID);
+ }
+ }
+ return parentID;
+ }
+
+ /**
+ * Check if the parent DN is in the pending map.
+ *
+ * @param parentDN The DN of the parent.
+ * @return <CODE>True</CODE> if the parent is in the pending map.
+ */
+ private boolean isPending(DN parentDN) {
+ boolean ret = false;
+ if(pendingMap.containsKey(parentDN)) {
+ ret = true;
+ }
+ return ret;
+ }
+
+ /**
+ * Add specified DN to the pending map.
+ *
+ * @param dn The DN to add to the map.
+ */
+ public void addPending(DN dn) {
+ pendingMap.putIfAbsent(dn, dn);
+ }
+
+ /**
+ * Remove the specified DN from the pending map.
+ *
+ * @param dn The DN to remove from the map.
+ */
+ public void removePending(DN dn) {
+ pendingMap.remove(dn);
+ }
+
+ /**
+ * Set the substring buffer manager to the specified buffer manager.
+ *
+ * @param bufferManager The buffer manager.
+ */
+ public void setBufferManager(BufferManager bufferManager) {
+ this.bufferManager = bufferManager;
+ }
+
+ /**
+ * Return the buffer manager.
+ *
+ * @return The buffer manager.
+ */
+ public BufferManager getBufferManager() {
+ return bufferManager;
+ }
+ }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
new file mode 100644
index 0000000..63bf8fd
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
@@ -0,0 +1,83 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2008 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.backends.jeb.importLDIF;
+
+import org.opends.server.backends.jeb.EntryID;
+
+/**
+ * Interface defining and import ID set.
+ */
+public interface ImportIDSet {
+
+ /**
+ * Add an entry ID to the set.
+ *
+ * @param entryID The entry ID to add.
+ * @param entryLimit The entry limit.
+ */
+ public void addEntryID(EntryID entryID, int entryLimit);
+
+ /**
+ * Return if a set is defined or not.
+ *
+ * @return <CODE>True</CODE> if a set is defined.
+ */
+ public boolean isDefined();
+
+ /**
+ * Return the memory size of a set.
+ *
+ * @return The sets current memory size.
+ */
+ public int getMemorySize();
+
+ /**
+ * Convert a set to a byte array suitable for saving to DB.
+ *
+ * @return A byte array representing the set.
+ */
+ public byte[] toDatabase();
+
+ /**
+ * Return the size of the set.
+ *
+ * @return The size of the ID set.
+ */
+ public int size();
+
+ /**
+ * Merge a byte array read from DB with a ID set.
+ *
+ * @param dbBytes The byte array read from DB.
+ * @param bufImportIDSet The import ID set to merge.
+ * @param entryLimit The entry limit.
+ * @return <CODE>True</CODE> if the merged set is undefined.
+ */
+ public boolean merge(byte[] dbBytes, ImportIDSet bufImportIDSet,
+ int entryLimit);
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
new file mode 100644
index 0000000..ce98383
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -0,0 +1,1096 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2008 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.backends.jeb.importLDIF;
+
+import org.opends.server.types.*;
+import org.opends.server.loggers.debug.DebugTracer;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import org.opends.server.admin.std.server.LocalDBBackendCfg;
+import org.opends.server.util.LDIFReader;
+import org.opends.server.util.StaticUtils;
+import org.opends.server.util.LDIFException;
+import org.opends.server.util.RuntimeInformation;
+import static org.opends.server.util.DynamicConstants.BUILD_ID;
+import static org.opends.server.util.DynamicConstants.REVISION_NUMBER;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.backends.jeb.*;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.messages.Message;
+import org.opends.messages.JebMessages;
+import static org.opends.messages.JebMessages.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.io.IOException;
+
+import com.sleepycat.je.*;
+
+/**
+ * Performs a LDIF import.
+ */
+
+public class Importer implements Thread.UncaughtExceptionHandler {
+
+
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
+ /**
+ * The JE backend configuration.
+ */
+ private LocalDBBackendCfg config;
+
+ /**
+ * The root container used for this import job.
+ */
+ private RootContainer rootContainer;
+
+ /**
+ * The LDIF import configuration.
+ */
+ private LDIFImportConfig ldifImportConfig;
+
+ /**
+ * The LDIF reader.
+ */
+ private LDIFReader reader;
+
+ /**
+ * Map of base DNs to their import context.
+ */
+ private LinkedHashMap<DN, DNContext> importMap =
+ new LinkedHashMap<DN, DNContext>();
+
+
+ /**
+ * The number of entries migrated.
+ */
+ private int migratedCount;
+
+ /**
+ * The number of entries imported.
+ */
+ private int importedCount;
+
+ /**
+ * The number of milliseconds between job progress reports.
+ */
+ private long progressInterval = 10000;
+
+ /**
+ * The progress report timer.
+ */
+ private Timer timer;
+
+ //Thread array.
+ private CopyOnWriteArrayList<WorkThread> threads;
+
+ //Progress task.
+ private ProgressTask pTask;
+
+ //Number of entries import before checking if cleaning is needed after
+ //eviction has been detected.
+ private static final int entryCleanInterval = 250000;
+
+ //Minimum buffer amount to give to a buffer manager.
+ private static final long minBuffer = 1024 * 1024;
+
+ //Total available memory for the buffer managers.
+ private long totalAvailBufferMemory = 0;
+
+ //Memory size to be used for the DB cache in string format.
+ private String dbCacheSizeStr;
+
+ //Used to do an initial clean after eviction has been detected.
+ private boolean firstClean=false;
+
+ //A thread threw an Runtime exception stop the import.
+ private boolean unCaughtExceptionThrown = false;
+
+ /**
+ * Create a new import job with the specified ldif import config.
+ *
+ * @param ldifImportConfig The LDIF import config.
+ */
+ public Importer(LDIFImportConfig ldifImportConfig)
+ {
+ this.ldifImportConfig = ldifImportConfig;
+ this.threads = new CopyOnWriteArrayList<WorkThread>();
+ calcMemoryLimits();
+ }
+
+ /**
+ * Start the worker threads.
+ *
+ * @throws DatabaseException If a DB problem occurs.
+ */
+ private void startWorkerThreads()
+ throws DatabaseException {
+
+ int importThreadCount = config.getImportThreadCount();
+ //Figure out how much buffer memory to give to each context.
+ int contextCount = importMap.size();
+ long memoryPerContext = totalAvailBufferMemory / contextCount;
+ //Below min, use the min value.
+ if(memoryPerContext < minBuffer) {
+ Message msg =
+ INFO_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM.get(memoryPerContext,
+ minBuffer);
+ logError(msg);
+ memoryPerContext = minBuffer;
+ }
+ // Create one set of worker threads/buffer managers for each base DN.
+ for (DNContext context : importMap.values()) {
+ BufferManager bufferManager = new BufferManager(memoryPerContext,
+ importThreadCount);
+ context.setBufferManager(bufferManager);
+ for (int i = 0; i < importThreadCount; i++) {
+ WorkThread t = new WorkThread(context.getWorkQueue(), i,
+ bufferManager, rootContainer);
+ t.setUncaughtExceptionHandler(this);
+ threads.add(t);
+ t.start();
+ }
+ }
+ // Start a timer for the progress report.
+ timer = new Timer();
+ TimerTask progressTask = new ProgressTask();
+ //Used to get at extra functionality such as eviction detected.
+ pTask = (ProgressTask) progressTask;
+ timer.scheduleAtFixedRate(progressTask, progressInterval,
+ progressInterval);
+
+ }
+
+
+ /**
+ * Import a ldif using the specified root container.
+ *
+ * @param rootContainer The root container.
+ * @return A LDIF result.
+ * @throws DatabaseException If a DB error occurs.
+ * @throws IOException If a IO error occurs.
+ * @throws org.opends.server.backends.jeb.JebException If a JEB error occurs.
+ * @throws DirectoryException If a directory error occurs.
+ * @throws ConfigException If a configuration has an error.
+ */
+ public LDIFImportResult processImport(RootContainer rootContainer)
+ throws DatabaseException, IOException, JebException, DirectoryException,
+ ConfigException {
+
+ // Create an LDIF reader. Throws an exception if the file does not exist.
+ reader = new LDIFReader(ldifImportConfig);
+ this.rootContainer = rootContainer;
+ this.config = rootContainer.getConfiguration();
+
+ Message message;
+ long startTime;
+ try {
+ int importThreadCount = config.getImportThreadCount();
+ message = INFO_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(),
+ BUILD_ID, REVISION_NUMBER);
+ logError(message);
+ message = INFO_JEB_IMPORT_THREAD_COUNT.get(importThreadCount);
+ logError(message);
+ RuntimeInformation.logInfo();
+ for (EntryContainer entryContainer : rootContainer.getEntryContainers()) {
+ DNContext DNContext = getImportContext(entryContainer);
+ if(DNContext != null) {
+ importMap.put(entryContainer.getBaseDN(), DNContext);
+ }
+ }
+ // Make a note of the time we started.
+ startTime = System.currentTimeMillis();
+ startWorkerThreads();
+ try {
+ importedCount = 0;
+ migratedCount = 0;
+ migrateExistingEntries();
+ processLDIF();
+ migrateExcludedEntries();
+ } finally {
+ if(!unCaughtExceptionThrown) {
+ cleanUp();
+ switchContainers();
+ }
+ }
+ }
+ finally {
+ reader.close();
+ }
+ importProlog(startTime);
+ return new LDIFImportResult(reader.getEntriesRead(),
+ reader.getEntriesRejected(),
+ reader.getEntriesIgnored());
+ }
+
+ /**
+ * Switch containers if the migrated entries were written to the temporary
+ * container.
+ *
+ * @throws DatabaseException If a DB problem occurs.
+ * @throws JebException If a JEB problem occurs.
+ */
+ private void switchContainers() throws DatabaseException, JebException {
+
+ for(DNContext importContext : importMap.values()) {
+ DN baseDN = importContext.getBaseDN();
+ EntryContainer srcEntryContainer =
+ importContext.getSrcEntryContainer();
+ if(srcEntryContainer != null) {
+ if (debugEnabled()) {
+ TRACER.debugInfo("Deleteing old entry container for base DN " +
+ "%s and renaming temp entry container", baseDN);
+ }
+ EntryContainer unregEC =
+ rootContainer.unregisterEntryContainer(baseDN);
+ //Make sure the unregistered EC for the base DN is the same as
+ //the one in the import context.
+ if(unregEC != srcEntryContainer) {
+ if(debugEnabled()) {
+ TRACER.debugInfo("Current entry container used for base DN " +
+ "%s is not the same as the source entry container used " +
+ "during the migration process.", baseDN);
+ }
+ rootContainer.registerEntryContainer(baseDN, unregEC);
+ continue;
+ }
+ srcEntryContainer.lock();
+ srcEntryContainer.delete();
+ srcEntryContainer.unlock();
+ EntryContainer newEC = importContext.getEntryContainer();
+ newEC.lock();
+ newEC.setDatabasePrefix(baseDN.toNormalizedString());
+ newEC.unlock();
+ rootContainer.registerEntryContainer(baseDN, newEC);
+ }
+ }
+ }
+
+ /**
+ * Create and log messages at the end of the successful import.
+ *
+ * @param startTime The time the import started.
+ */
+ private void importProlog(long startTime) {
+ Message message;
+ long finishTime = System.currentTimeMillis();
+ long importTime = (finishTime - startTime);
+
+ float rate = 0;
+ if (importTime > 0)
+ {
+ rate = 1000f*importedCount / importTime;
+ }
+
+ message = INFO_JEB_IMPORT_FINAL_STATUS.
+ get(reader.getEntriesRead(), importedCount,
+ reader.getEntriesIgnored(), reader.getEntriesRejected(),
+ migratedCount, importTime/1000, rate);
+ logError(message);
+
+ message = INFO_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get(
+ getEntryLimitExceededCount());
+ logError(message);
+
+ }
+
+
+ /**
+ * Run the cleaner if it is needed.
+ *
+ * @param entriesRead The number of entries read so far.
+ * @param evictEntryNumber The number of entries to run the cleaner after
+ * being read.
+ * @throws DatabaseException If a DB problem occurs.
+ */
+ private void
+ runCleanerIfNeeded(long entriesRead, long evictEntryNumber)
+ throws DatabaseException {
+ if(!firstClean || (entriesRead % evictEntryNumber) == 0) {
+ //Make sure work queue is empty before starting.
+ drainWorkQueue();
+ Message msg = INFO_JEB_IMPORT_LDIF_CLEAN.get();
+ runCleaner(msg);
+ if(!firstClean) {
+ firstClean=true;
+ }
+ }
+ }
+
+ /**
+ * Run the cleaner, pausing the task thread output.
+ *
+ * @param header Message to be printed before cleaning.
+ * @throws DatabaseException If a DB problem occurs.
+ */
+ private void runCleaner(Message header) throws DatabaseException {
+ Message msg;
+ long startTime = System.currentTimeMillis();
+ //Need to force a checkpoint.
+ rootContainer.importForceCheckPoint();
+ logError(header);
+ pTask.setPause(true);
+ //Actually clean the files.
+ int cleaned = rootContainer.cleanedLogFiles();
+ //This checkpoint removes the files if any were cleaned.
+ if(cleaned > 0) {
+ msg = INFO_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS.get(cleaned);
+ logError(msg);
+ rootContainer.importForceCheckPoint();
+ }
+ pTask.setPause(false);
+ long finishTime = System.currentTimeMillis();
+ long cleanTime = (finishTime - startTime) / 1000;
+ msg = INFO_JEB_IMPORT_LDIF_CLEANER_RUN_DONE.get(cleanTime, cleaned);
+ logError(msg);
+ }
+
+ /**
+ * Process a LDIF reader.
+ *
+ * @throws JebException If a JEB problem occurs.
+ * @throws DatabaseException If a DB problem occurs.
+ * @throws IOException If an IO exception occurs.
+ */
+ private void
+ processLDIF() throws JebException, DatabaseException, IOException {
+ Message message = INFO_JEB_IMPORT_LDIF_START.get();
+ logError(message);
+ do {
+ if (ldifImportConfig.isCancelled()) {
+ break;
+ }
+ if(threads.size() <= 0) {
+ message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
+ throw new JebException(message);
+ }
+ if(unCaughtExceptionThrown) {
+ abortImport();
+ }
+ try {
+ // Read the next entry.
+ Entry entry = reader.readEntry();
+ // Check for end of file.
+ if (entry == null) {
+ message = INFO_JEB_IMPORT_LDIF_END.get();
+ logError(message);
+
+ break;
+ }
+ // Route it according to base DN.
+ DNContext DNContext = getImportConfig(entry.getDN());
+ processEntry(DNContext, entry);
+ //If the progress task has noticed eviction proceeding, start running
+ //the cleaner.
+ if(pTask.isEvicting()) {
+ runCleanerIfNeeded(reader.getEntriesRead(), entryCleanInterval);
+ }
+ } catch (LDIFException e) {
+ if (debugEnabled()) {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ } catch (DirectoryException e) {
+ if (debugEnabled()) {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ } catch (DatabaseException e) {
+ if (debugEnabled()) {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ } while (true);
+ }
+
+ /**
+ * Process an entry using the specified import context.
+ *
+ * @param DNContext The import context.
+ * @param entry The entry to process.
+ */
+ private void processEntry(DNContext DNContext, Entry entry) {
+ //Add this DN to the pending map.
+ DNContext.addPending(entry.getDN());
+ addEntryQueue(DNContext, entry);
+ }
+
+ /**
+ * Add work item to specified import context's queue.
+ * @param context The import context.
+ * @param item The work item to add.
+ * @return <CODE>True</CODE> if the the work item was added to the queue.
+ */
+ private boolean
+ addQueue(DNContext context, WorkElement item) {
+ try {
+ while(!context.getWorkQueue().offer(item, 1000,
+ TimeUnit.MILLISECONDS)) {
+ if(threads.size() <= 0) {
+ // All worker threads died. We must stop now.
+ return false;
+ }
+ }
+ } catch (InterruptedException e) {
+ if (debugEnabled()) {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ return true;
+ }
+
+
+ /**
+ * Wait until the work queue is empty.
+ */
+ private void drainWorkQueue() {
+ if(threads.size() > 0) {
+ for (DNContext context : importMap.values()) {
+ while (context.getWorkQueue().size() > 0) {
+ try {
+ Thread.sleep(100);
+ } catch (Exception e) {
+ // No action needed.
+ }
+ }
+ }
+ }
+ }
+
+ private void abortImport() throws JebException {
+ //Stop work threads telling them to skip substring flush.
+ stopWorkThreads(false, true);
+ timer.cancel();
+ Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
+ throw new JebException(message);
+ }
+
+ /**
+ * Stop work threads.
+ *
+ * @param flushBuffer Flag telling threads that it should do substring flush.
+ * @param abort <CODE>True</CODE> if stop work threads was called from an
+ * abort.
+ * @throws JebException if a Jeb error occurs.
+ */
+ private void
+ stopWorkThreads(boolean flushBuffer, boolean abort) throws JebException {
+ for (WorkThread t : threads) {
+ if(!flushBuffer) {
+ t.setFlush(false);
+ }
+ t.stopProcessing();
+ }
+ // Wait for each thread to stop.
+ for (WorkThread t : threads) {
+ try {
+ if(!abort && unCaughtExceptionThrown) {
+ timer.cancel();
+ Message message = ERR_JEB_IMPORT_LDIF_ABORT.get();
+ throw new JebException(message);
+ }
+ t.join();
+ importedCount += t.getImportedCount();
+ } catch (InterruptedException ie) {
+ // No action needed?
+ }
+ }
+ }
+
+ /**
+ * Clean up after a successful import.
+ *
+ * @throws DatabaseException If a DB error occurs.
+ * @throws JebException If a Jeb error occurs.
+ */
+ private void cleanUp() throws DatabaseException, JebException {
+ Message msg;
+ //Drain the work queue.
+ drainWorkQueue();
+ //Prepare the buffer managers to flush.
+ for(DNContext context : importMap.values()) {
+ context.getBufferManager().prepareFlush();
+ }
+ pTask.setPause(true);
+ long startTime = System.currentTimeMillis();
+ stopWorkThreads(true, false);
+ long finishTime = System.currentTimeMillis();
+ long flushTime = (finishTime - startTime) / 1000;
+ msg = INFO_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime);
+ logError(msg);
+ timer.cancel();
+ for(DNContext context : importMap.values()) {
+ context.setIndexesTrusted();
+ }
+ msg = INFO_JEB_IMPORT_LDIF_FINAL_CLEAN.get();
+ //Run the cleaner.
+ runCleaner(msg);
+ }
+
+ /**
+ * Uncaught exception handler.
+ *
+ * @param t The thread working when the exception was thrown.
+ * @param e The exception.
+ */
+ public void uncaughtException(Thread t, Throwable e) {
+ unCaughtExceptionThrown = true;
+ threads.remove(t);
+ Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get(
+ t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause()));
+ logError(msg);
+ }
+
+ /**
+ * Get the entry limit exceeded counts from the indexes.
+ *
+ * @return Count of the index with entry limit exceeded values.
+ */
+ private int getEntryLimitExceededCount() {
+ int count = 0;
+ for (DNContext ic : importMap.values())
+ {
+ count += ic.getEntryContainer().getEntryLimitExceededCount();
+ }
+ return count;
+ }
+
+ /**
+ * Return an import context related to the specified DN.
+ * @param dn The dn.
+ * @return An import context.
+ * @throws DirectoryException If an directory error occurs.
+ */
+ private DNContext getImportConfig(DN dn) throws DirectoryException {
+ DNContext DNContext = null;
+ DN nodeDN = dn;
+
+ while (DNContext == null && nodeDN != null) {
+ DNContext = importMap.get(nodeDN);
+ if (DNContext == null)
+ {
+ nodeDN = nodeDN.getParentDNInSuffix();
+ }
+ }
+
+ if (nodeDN == null) {
+ // The entry should not have been given to this backend.
+ Message message =
+ JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn));
+ throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
+ }
+
+ return DNContext;
+ }
+
+ /**
+ * Creates an import context for the specified entry container.
+ *
+ * @param entryContainer The entry container.
+ * @return Import context to use during import.
+ * @throws DatabaseException If a database error occurs.
+ * @throws JebException If a JEB error occurs.
+ * @throws ConfigException If a configuration contains error.
+ */
+ private DNContext getImportContext(EntryContainer entryContainer)
+ throws DatabaseException, JebException, ConfigException {
+ DN baseDN = entryContainer.getBaseDN();
+ EntryContainer srcEntryContainer = null;
+ List<DN> includeBranches = new ArrayList<DN>();
+ List<DN> excludeBranches = new ArrayList<DN>();
+
+ if(!ldifImportConfig.appendToExistingData() &&
+ !ldifImportConfig.clearBackend())
+ {
+ for(DN dn : ldifImportConfig.getExcludeBranches())
+ {
+ if(baseDN.equals(dn))
+ {
+ // This entire base DN was explicitly excluded. Skip.
+ return null;
+ }
+ if(baseDN.isAncestorOf(dn))
+ {
+ excludeBranches.add(dn);
+ }
+ }
+
+ if(!ldifImportConfig.getIncludeBranches().isEmpty())
+ {
+ for(DN dn : ldifImportConfig.getIncludeBranches())
+ {
+ if(baseDN.isAncestorOf(dn))
+ {
+ includeBranches.add(dn);
+ }
+ }
+
+ if(includeBranches.isEmpty())
+ {
+ // There are no branches in the explicitly defined include list under
+ // this base DN. Skip this base DN alltogether.
+
+ return null;
+ }
+
+ // Remove any overlapping include branches.
+ Iterator<DN> includeBranchIterator = includeBranches.iterator();
+ while(includeBranchIterator.hasNext())
+ {
+ DN includeDN = includeBranchIterator.next();
+ boolean keep = true;
+ for(DN dn : includeBranches)
+ {
+ if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
+ {
+ keep = false;
+ break;
+ }
+ }
+ if(!keep)
+ {
+ includeBranchIterator.remove();
+ }
+ }
+
+ // Remvoe any exclude branches that are not are not under a include
+ // branch since they will be migrated as part of the existing entries
+ // outside of the include branches anyways.
+ Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
+ while(excludeBranchIterator.hasNext())
+ {
+ DN excludeDN = excludeBranchIterator.next();
+ boolean keep = false;
+ for(DN includeDN : includeBranches)
+ {
+ if(includeDN.isAncestorOf(excludeDN))
+ {
+ keep = true;
+ break;
+ }
+ }
+ if(!keep)
+ {
+ excludeBranchIterator.remove();
+ }
+ }
+
+ if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
+ includeBranches.get(0).equals(baseDN))
+ {
+ // This entire base DN is explicitly included in the import with
+ // no exclude branches that we need to migrate. Just clear the entry
+ // container.
+ entryContainer.lock();
+ entryContainer.clear();
+ entryContainer.unlock();
+ }
+ else
+ {
+ // Create a temp entry container
+ srcEntryContainer = entryContainer;
+ entryContainer =
+ rootContainer.openEntryContainer(baseDN,
+ baseDN.toNormalizedString() +
+ "_importTmp");
+ }
+ }
+ }
+
+ // Create an import context.
+ DNContext DNContext = new DNContext();
+ DNContext.setConfig(config);
+ DNContext.setLDIFImportConfig(this.ldifImportConfig);
+ DNContext.setLDIFReader(reader);
+
+ DNContext.setBaseDN(baseDN);
+ DNContext.setEntryContainer(entryContainer);
+ DNContext.setSrcEntryContainer(srcEntryContainer);
+
+ //Create queue.
+ LinkedBlockingQueue<WorkElement> works =
+ new LinkedBlockingQueue<WorkElement>
+ (config.getImportQueueSize());
+ DNContext.setWorkQueue(works);
+
+ // Set the include and exclude branches
+ DNContext.setIncludeBranches(includeBranches);
+ DNContext.setExcludeBranches(excludeBranches);
+
+ return DNContext;
+ }
+
+ /**
+ * Add specified context and entry to the work queue.
+ *
+ * @param context The context related to the entry DN.
+ * @param entry The entry to work on.
+ * @return <CODE>True</CODE> if the element was added to the work queue.
+ */
+ private boolean
+ addEntryQueue(DNContext context, Entry entry) {
+ WorkElement element =
+ WorkElement.decode(entry, context);
+ return addQueue(context, element);
+ }
+
+ /**
+ * Calculate the memory usage for the substring buffer and the DB cache.
+ */
+ private void calcMemoryLimits() {
+ Message msg;
+ Runtime runtime = Runtime.getRuntime();
+ long freeMemory = runtime.freeMemory();
+ long maxMemory = runtime.maxMemory();
+ long totMemory = runtime.totalMemory();
+ long totFreeMemory = (freeMemory + (maxMemory - totMemory));
+ long dbCacheLimit = (totFreeMemory * 40) / 100;
+ dbCacheSizeStr = Long.toString(dbCacheLimit);
+ totalAvailBufferMemory = (totFreeMemory * 10) / 100;
+ if(totalAvailBufferMemory < (10 * minBuffer)) {
+ msg =
+ INFO_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM.get(totalAvailBufferMemory,
+ (10 * minBuffer));
+ logError(msg);
+ totalAvailBufferMemory = (10 * minBuffer);
+ }
+ msg=INFO_JEB_IMPORT_LDIF_MEMORY_INFO.get(dbCacheLimit,
+ totalAvailBufferMemory);
+ logError(msg);
+ }
+
+ /**
+ * Return the string representation of the DB cache size.
+ *
+ * @return DB cache size string.
+ */
+ public String getDBCacheSize() {
+ return dbCacheSizeStr;
+ }
+
+ /**
+ * Migrate any existing entries.
+ *
+ * @throws JebException If a JEB error occurs.
+ * @throws DatabaseException If a DB error occurs.
+ * @throws DirectoryException If a directory error occurs.
+ */
+ private void migrateExistingEntries()
+ throws JebException, DatabaseException, DirectoryException {
+ for(DNContext context : importMap.values()) {
+ EntryContainer srcEntryContainer = context.getSrcEntryContainer();
+ if(srcEntryContainer != null &&
+ !context.getIncludeBranches().isEmpty()) {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ LockMode lockMode = LockMode.DEFAULT;
+ OperationStatus status;
+ Message message = INFO_JEB_IMPORT_MIGRATION_START.get(
+ "existing", String.valueOf(context.getBaseDN()));
+ logError(message);
+ Cursor cursor =
+ srcEntryContainer.getDN2ID().openCursor(null,
+ CursorConfig.READ_COMMITTED);
+ try {
+ status = cursor.getFirst(key, data, lockMode);
+ while(status == OperationStatus.SUCCESS &&
+ !ldifImportConfig.isCancelled()) {
+ if(threads.size() <= 0) {
+ message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
+ throw new JebException(message);
+ }
+ DN dn = DN.decode(new ASN1OctetString(key.getData()));
+ if(!context.getIncludeBranches().contains(dn)) {
+ EntryID id = new EntryID(data);
+ Entry entry = srcEntryContainer.getID2Entry().get(null, id);
+ processEntry(context, entry);
+ migratedCount++;
+ status = cursor.getNext(key, data, lockMode);
+ } else {
+ // This is the base entry for a branch that will be included
+ // in the import so we don't want to copy the branch to the new
+ // entry container.
+
+ /**
+ * Advance the cursor to next entry at the same level in the DIT
+ * skipping all the entries in this branch.
+ * Set the next starting value to a value of equal length but
+ * slightly greater than the previous DN. Since keys are compared
+ * in reverse order we must set the first byte (the comma).
+ * No possibility of overflow here.
+ */
+ byte[] begin =
+ StaticUtils.getBytes("," + dn.toNormalizedString());
+ begin[0] = (byte) (begin[0] + 1);
+ key.setData(begin);
+ status = cursor.getSearchKeyRange(key, data, lockMode);
+ }
+ }
+ } finally {
+ cursor.close();
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Migrate excluded entries.
+ *
+ * @throws JebException If a JEB error occurs.
+ * @throws DatabaseException If a DB error occurs.
+ * @throws DirectoryException If a directory error occurs.
+ */
+ private void migrateExcludedEntries()
+ throws JebException, DatabaseException, DirectoryException {
+ for(DNContext importContext : importMap.values()) {
+ EntryContainer srcEntryContainer = importContext.getSrcEntryContainer();
+ if(srcEntryContainer != null &&
+ !importContext.getExcludeBranches().isEmpty()) {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ LockMode lockMode = LockMode.DEFAULT;
+ OperationStatus status;
+ Message message = INFO_JEB_IMPORT_MIGRATION_START.get(
+ "excluded", String.valueOf(importContext.getBaseDN()));
+ logError(message);
+ Cursor cursor =
+ srcEntryContainer.getDN2ID().openCursor(null,
+ CursorConfig.READ_COMMITTED);
+ Comparator<byte[]> dn2idComparator =
+ srcEntryContainer.getDN2ID().getComparator();
+ try {
+ for(DN excludedDN : importContext.getExcludeBranches()) {
+ byte[] suffix =
+ StaticUtils.getBytes(excludedDN.toNormalizedString());
+ key.setData(suffix);
+ status = cursor.getSearchKeyRange(key, data, lockMode);
+ if(status == OperationStatus.SUCCESS &&
+ Arrays.equals(key.getData(), suffix)) {
+ // This is the base entry for a branch that was excluded in the
+ // import so we must migrate all entries in this branch over to
+ // the new entry container.
+ byte[] end =
+ StaticUtils.getBytes("," + excludedDN.toNormalizedString());
+ end[0] = (byte) (end[0] + 1);
+
+ while(status == OperationStatus.SUCCESS &&
+ dn2idComparator.compare(key.getData(), end) < 0 &&
+ !ldifImportConfig.isCancelled()) {
+ if(threads.size() <= 0) {
+ message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get();
+ throw new JebException(message);
+ }
+ EntryID id = new EntryID(data);
+ Entry entry = srcEntryContainer.getID2Entry().get(null, id);
+ processEntry(importContext, entry);
+ migratedCount++;
+ status = cursor.getNext(key, data, lockMode);
+ }
+ }
+ }
+ }
+ finally
+ {
+ cursor.close();
+ }
+ }
+ }
+ }
+
+
+ /**
+ * This class reports progress of the import job at fixed intervals.
+ */
+ private final class ProgressTask extends TimerTask
+ {
+ /**
+ * The number of entries that had been read at the time of the
+ * previous progress report.
+ */
+ private long previousCount = 0;
+
+ /**
+ * The time in milliseconds of the previous progress report.
+ */
+ private long previousTime;
+
+ /**
+ * The environment statistics at the time of the previous report.
+ */
+ private EnvironmentStats prevEnvStats;
+
+ /**
+ * The number of bytes in a megabyte.
+ * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB).
+ */
+ public static final int bytesPerMegabyte = 1024*1024;
+
+ //Determines if the ldif is being read.
+ private boolean ldifRead = false;
+
+ //Determines if eviction has been detected.
+ private boolean evicting = false;
+
+ //Entry count when eviction was detected.
+ private long evictionEntryCount = 0;
+
+ //Suspend output.
+ private boolean pause = false;
+
+ /**
+ * Create a new import progress task.
+ * @throws DatabaseException If an error occurs in the JE database.
+ */
+ public ProgressTask() throws DatabaseException
+ {
+ previousTime = System.currentTimeMillis();
+ prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig());
+ }
+
+ /**
+ * Return if reading the LDIF file.
+ */
+ public void ldifRead() {
+ ldifRead=true;
+ }
+
+ /**
+ * Return value of evicting flag.
+ *
+ * @return <CODE>True</CODE> if eviction is detected.
+ */
+ public boolean isEvicting() {
+ return evicting;
+ }
+
+ /**
+ * Return count of entries when eviction was detected.
+ *
+ * @return The entry count when eviction was detected.
+ */
+ public long getEvictionEntryCount() {
+ return evictionEntryCount;
+ }
+
+ /**
+ * Suspend output if true.
+ *
+ * @param v The value to set the suspend value to.
+ */
+ public void setPause(boolean v) {
+ pause=v;
+ }
+
+ /**
+ * The action to be performed by this timer task.
+ */
+ public void run() {
+ long latestCount = reader.getEntriesRead() + 0;
+ long deltaCount = (latestCount - previousCount);
+ long latestTime = System.currentTimeMillis();
+ long deltaTime = latestTime - previousTime;
+ Message message;
+ if (deltaTime == 0) {
+ return;
+ }
+ if(pause) {
+ return;
+ }
+ if(!ldifRead) {
+ long numRead = reader.getEntriesRead();
+ long numIgnored = reader.getEntriesIgnored();
+ long numRejected = reader.getEntriesRejected();
+ float rate = 1000f*deltaCount / deltaTime;
+ message = INFO_JEB_IMPORT_PROGRESS_REPORT.get(
+ numRead, numIgnored, numRejected, 0, rate);
+ logError(message);
+ }
+ try
+ {
+ Runtime runtime = Runtime.getRuntime();
+ long freeMemory = runtime.freeMemory() / bytesPerMegabyte;
+ EnvironmentStats envStats =
+ rootContainer.getEnvironmentStats(new StatsConfig());
+ long nCacheMiss =
+ envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
+
+ float cacheMissRate = 0;
+ if (deltaCount > 0) {
+ cacheMissRate = nCacheMiss/(float)deltaCount;
+ }
+ message = INFO_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(
+ freeMemory, cacheMissRate);
+ logError(message);
+ long evictPasses = envStats.getNEvictPasses();
+ long evictNodes = envStats.getNNodesExplicitlyEvicted();
+ long evictBinsStrip = envStats.getNBINsStripped();
+ int cleanerRuns = envStats.getNCleanerRuns();
+ int cleanerDeletions = envStats.getNCleanerDeletions();
+ int cleanerEntriesRead = envStats.getNCleanerEntriesRead();
+ int cleanerINCleaned = envStats.getNINsCleaned();
+ int checkPoints = envStats.getNCheckpoints();
+ if(evictPasses != 0) {
+ if(!evicting) {
+ evicting=true;
+ if(!ldifRead) {
+ evictionEntryCount=reader.getEntriesRead();
+ message =
+ INFO_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount);
+ logError(message);
+ }
+ }
+ message =
+ INFO_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses,
+ evictNodes, evictBinsStrip);
+ logError(message);
+ }
+ if(cleanerRuns != 0) {
+ message = INFO_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns,
+ cleanerDeletions, cleanerEntriesRead, cleanerINCleaned);
+ logError(message);
+ }
+ if(checkPoints > 1) {
+ message = INFO_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints);
+ logError(message);
+ }
+ prevEnvStats = envStats;
+ } catch (DatabaseException e) {
+ // Unlikely to happen and not critical.
+ }
+ previousCount = latestCount;
+ previousTime = latestTime;
+ }
+ }
+}
+
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/IntegerImportIDSet.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/IntegerImportIDSet.java
new file mode 100644
index 0000000..79658a0
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/IntegerImportIDSet.java
@@ -0,0 +1,354 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.backends.jeb.importLDIF;
+
+import org.opends.server.backends.jeb.EntryID;
+import org.opends.server.backends.jeb.JebFormat;
+import com.sleepycat.je.dbi.MemoryBudget;
+
+/**
+ * An import ID set backed by an array of ints.
+ */
+public class IntegerImportIDSet implements ImportIDSet {
+
+ //Gleamed from JHAT. The same for 32/64 bit.
+ private final static int THIS_OVERHEAD = 17;
+
+ /**
+ * The internal array where elements are stored.
+ */
+ private int[] array = null;
+
+
+ /**
+ * The number of valid elements in the array.
+ */
+ private int count = 0;
+
+
+ //Boolean to keep track if the instance is defined or not.
+ private boolean isDefined=true;
+
+ /**
+ * Create an empty import set.
+ */
+ public IntegerImportIDSet() {
+ }
+
+ /**
+ * Create an import set and add the specified entry ID to it.
+ *
+ * @param id The entry ID.
+ */
+ public IntegerImportIDSet(EntryID id) {
+ this.array = new int[1];
+ this.array[0] = (int) id.longValue();
+ count=1;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isDefined() {
+ return isDefined;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public int getMemorySize() {
+ if(array != null) {
+ return THIS_OVERHEAD + MemoryBudget.byteArraySize(array.length * 4);
+ } else {
+ return THIS_OVERHEAD;
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean merge(byte[] dBbytes, ImportIDSet importIdSet, int limit) {
+ boolean incrLimitCount=false;
+ boolean dbUndefined = ((dBbytes[0] & 0x80) == 0x80);
+
+ if(dbUndefined) {
+ isDefined=false;
+ } else if(!importIdSet.isDefined()) {
+ isDefined=false;
+ incrLimitCount=true;
+ } else {
+ array = JebFormat.intArrayFromDatabaseBytes(dBbytes);
+ if(array.length + importIdSet.size() > limit) {
+ isDefined=false;
+ incrLimitCount=true;
+ count = 0;
+ } else {
+ count = array.length;
+ addAll((IntegerImportIDSet) importIdSet);
+ }
+ }
+ return incrLimitCount;
+ }
+
+ /**
+ * Add all of the specified import ID set to the import set.
+ *
+ * @param that The import ID set to add.
+ */
+ private void addAll(IntegerImportIDSet that) {
+ resize(this.count+that.count);
+
+ if (that.count == 0)
+ {
+ return;
+ }
+
+ // Optimize for the case where the two sets are sure to have no overlap.
+ if (this.count == 0 || that.array[0] > this.array[this.count-1])
+ {
+ System.arraycopy(that.array, 0, this.array, this.count, that.count);
+ count += that.count;
+ return;
+ }
+
+ if (this.array[0] > that.array[that.count-1])
+ {
+ System.arraycopy(this.array, 0, this.array, that.count, this.count);
+ System.arraycopy(that.array, 0, this.array, 0, that.count);
+ count += that.count;
+ return;
+ }
+
+ int destPos = binarySearch(this.array, this.count, that.array[0]);
+ if (destPos < 0)
+ {
+ destPos = -(destPos+1);
+ }
+
+ // Make space for the copy.
+ int aCount = this.count - destPos;
+ int aPos = destPos + that.count;
+ int aEnd = aPos + aCount;
+ System.arraycopy(this.array, destPos, this.array, aPos, aCount);
+
+ // Optimize for the case where there is no overlap.
+ if (this.array[aPos] > that.array[that.count-1])
+ {
+ System.arraycopy(that.array, 0, this.array, destPos, that.count);
+ count += that.count;
+ return;
+ }
+
+ int bPos;
+ for ( bPos = 0; aPos < aEnd && bPos < that.count; )
+ {
+ if ( this.array[aPos] < that.array[bPos] )
+ {
+ this.array[destPos++] = this.array[aPos++];
+ }
+ else if ( this.array[aPos] > that.array[bPos] )
+ {
+ this.array[destPos++] = that.array[bPos++];
+ }
+ else
+ {
+ this.array[destPos++] = this.array[aPos++];
+ bPos++;
+ }
+ }
+
+ // Copy any remainder.
+ int aRemain = aEnd - aPos;
+ if (aRemain > 0)
+ {
+ System.arraycopy(this.array, aPos, this.array, destPos, aRemain);
+ destPos += aRemain;
+ }
+
+ int bRemain = that.count - bPos;
+ if (bRemain > 0)
+ {
+ System.arraycopy(that.array, bPos, this.array, destPos, bRemain);
+ destPos += bRemain;
+ }
+
+ count = destPos;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public int size() {
+ return count;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void addEntryID(EntryID entryID, int limit) {
+ if(!isDefined()) {
+ return;
+ }
+ if(isDefined() && ((count + 1) > limit)) {
+ isDefined = false;
+ array = null;
+ count = 0;
+ } else {
+ add((int)entryID.longValue());
+ }
+ }
+
+ /**
+ * Add the specified integer to the import set.
+ *
+ * @param v The integer value to add.
+ * @return <CODE>True</CODE> if the value was added.
+ */
+ private boolean add(int v) {
+ resize(count+1);
+
+ if (count == 0 || v > array[count-1])
+ {
+ array[count++] = v;
+ return true;
+ }
+
+ int pos = binarySearch(array, count, v);
+ if (pos >=0)
+ {
+ return false;
+ }
+
+ // For a negative return value r, the index -(r+1) gives the array
+ // index at which the specified value can be inserted to maintain
+ // the sorted order of the array.
+ pos = -(pos+1);
+
+ System.arraycopy(array, pos, array, pos+1, count-pos);
+ array[pos] = v;
+ count++;
+ return true;
+ }
+
+ /**
+ * Perform binary search for the specified key in the specified array.
+ *
+ * @param a The array to search in.
+ * @param count The max value in the array.
+ * @param key The key value.
+ * @return Position in array key is found or a negative if it wasn't found.
+ */
+ private static int binarySearch(int[] a, int count, int key) {
+ int low = 0;
+ int high = count-1;
+
+ while (low <= high)
+ {
+ int mid = (low + high) >> 1;
+ int midVal = a[mid];
+
+ if (midVal < key)
+ low = mid + 1;
+ else if (midVal > key)
+ high = mid - 1;
+ else
+ return mid; // key found
+ }
+ return -(low + 1); // key not found.
+ }
+
+ /**
+ * Resize the array to the specified size if needed.
+ *
+ * @param size The required size.
+ */
+ private void resize(int size) {
+ if (array == null)
+ {
+ array = new int[size];
+ }
+ else if (array.length < size)
+ {
+ // Expand the size of the array in powers of two.
+ int newSize = array.length == 0 ? 1 : array.length;
+ do
+ {
+ newSize *= 2;
+ } while (newSize < size);
+
+ int[] newBytes = new int[newSize];
+ System.arraycopy(array, 0, newBytes, 0, count);
+ array = newBytes;
+ }
+
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public byte[] toDatabase() {
+ if(isDefined) {
+ return encode(null);
+ } else {
+ return JebFormat.entryIDUndefinedSizeToDatabase(Long.MAX_VALUE);
+ }
+ }
+
+ /**
+ * Encode the integer array to a byte array suitable for writing to DB.
+ *
+ * @param bytes The byte array to use in the encoding.
+ * @return A byte array suitable to write to DB.
+ */
+ private byte[] encode(byte[] bytes) {
+ int encodedSize = count * 8;
+ if (bytes == null || bytes.length < encodedSize) {
+ bytes = new byte[encodedSize];
+ }
+
+ for (int pos = 0, i = 0; i < count; i++) {
+ long v = (long)array[i] & 0x00ffffffffL;
+ bytes[pos++] = (byte) ((v >>> 56) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 48) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 40) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 32) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 24) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 16) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 8) & 0xFF);
+ bytes[pos++] = (byte) (v & 0xFF);
+ }
+
+ return bytes;
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/LongImportIDSet.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/LongImportIDSet.java
new file mode 100644
index 0000000..a5a8507
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/LongImportIDSet.java
@@ -0,0 +1,405 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2006-2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.backends.jeb.importLDIF;
+
+import com.sleepycat.je.dbi.MemoryBudget;
+import org.opends.server.util.RuntimeInformation;
+import org.opends.server.backends.jeb.EntryID;
+import org.opends.server.backends.jeb.JebFormat;
+
+
+/**
+ * A import ID set backed by an array of longs.
+ */
+public class LongImportIDSet implements ImportIDSet {
+
+
+ //Overhead values gleamed from JHAT.
+ private final static int LONGS_OVERHEAD;
+ private final static int LONGS_OVERHEAD_32 = 25;
+ private final static int LONGS_OVERHEAD_64 = 25;
+
+ /**
+ * The internal array where elements are stored.
+ */
+ private long[] array = null;
+
+
+ /**
+ * The number of valid elements in the array.
+ */
+ private int count = 0;
+
+
+ //Boolean to keep track if the instance is defined or not.
+ boolean isDefined=true;
+
+ static {
+ if(RuntimeInformation.is64Bit()) {
+ LONGS_OVERHEAD = LONGS_OVERHEAD_64;
+ } else {
+ LONGS_OVERHEAD = LONGS_OVERHEAD_32;
+ }
+ }
+
+ /**
+ * Create an empty instance.
+ */
+ public LongImportIDSet() {
+ }
+
+ /**
+ * Create instance and add specified entry ID to the set.
+ *
+ * @param id The entry ID.
+ */
+ public LongImportIDSet(EntryID id) {
+ this.array = new long[1];
+ this.array[0] = id.longValue();
+ count=1;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isDefined() {
+ return isDefined;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public int getMemorySize() {
+ if(array != null) {
+ return LONGS_OVERHEAD + MemoryBudget.byteArraySize(array.length * 8);
+ } else {
+ return LONGS_OVERHEAD;
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean merge(byte[] DBbytes, ImportIDSet importIdSet, int limit) {
+ boolean incrLimitCount=false;
+ boolean dbUndefined = ((DBbytes[0] & 0x80) == 0x80);
+
+ if(dbUndefined) {
+ isDefined=false;
+ } else if(!importIdSet.isDefined()) {
+ isDefined=false;
+ incrLimitCount=true;
+ } else {
+ array = JebFormat.entryIDListFromDatabase(DBbytes);
+ if(array.length + importIdSet.size() > limit) {
+ isDefined=false;
+ incrLimitCount=true;
+ } else {
+ count = array.length;
+ addAll((LongImportIDSet) importIdSet);
+ }
+ }
+ return incrLimitCount;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void addEntryID(EntryID entryID, int limit) {
+ if(!isDefined()) {
+ return;
+ }
+ if(isDefined() && ((count + 1) > limit)) {
+ isDefined = false;
+ array = null;
+ count = 0;
+ } else {
+ add(entryID.longValue());
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public byte[] toDatabase() {
+ if (isDefined()) return encode(null);
+ else return JebFormat.entryIDUndefinedSizeToDatabase(Long.MAX_VALUE);
+ }
+
+ /**
+ * Decodes a set from a byte array.
+ * @param bytes The encoded value.
+ */
+ void decode(byte[] bytes)
+ {
+ if (bytes == null)
+ {
+ count = 0;
+ return;
+ }
+
+ int count = bytes.length / 8;
+ resize(count);
+
+ for (int pos = 0, i = 0; i < count; i++)
+ {
+ long v = 0;
+ v |= (bytes[pos++] & 0xFFL) << 56;
+ v |= (bytes[pos++] & 0xFFL) << 48;
+ v |= (bytes[pos++] & 0xFFL) << 40;
+ v |= (bytes[pos++] & 0xFFL) << 32;
+ v |= (bytes[pos++] & 0xFFL) << 24;
+ v |= (bytes[pos++] & 0xFFL) << 16;
+ v |= (bytes[pos++] & 0xFFL) << 8;
+ v |= (bytes[pos++] & 0xFFL);
+ array[i] = v;
+ }
+ this.count = count;
+ }
+
+
+ /**
+ * Encode this value into a byte array.
+ * @param bytes The array into which the value will be encoded. If the
+ * provided array is null, or is not big enough, a new array will be
+ * allocated.
+ * @return The encoded array. If the provided array was bigger than needed
+ * to encode the value then the provided array is returned and the number
+ * of bytes of useful data is given by the encodedSize method.
+ */
+ byte[] encode(byte[] bytes) {
+ int encodedSize = count * 8;
+ if (bytes == null || bytes.length < encodedSize)
+ {
+ bytes = new byte[encodedSize];
+ }
+
+ for (int pos = 0, i = 0; i < count; i++)
+ {
+ long v = array[i];
+ bytes[pos++] = (byte) ((v >>> 56) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 48) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 40) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 32) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 24) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 16) & 0xFF);
+ bytes[pos++] = (byte) ((v >>> 8) & 0xFF);
+ bytes[pos++] = (byte) (v & 0xFF);
+ }
+
+ return bytes;
+ }
+
+
+
+ /**
+ * This is very much like Arrays.binarySearch except that it searches only
+ * an initial portion of the provided array.
+ * @param a The array to be searched.
+ * @param count The number of initial elements in the array to be searched.
+ * @param key The element to search for.
+ * @return See Arrays.binarySearch.
+ */
+ private static int binarySearch(long[] a, int count, long key) {
+ int low = 0;
+ int high = count-1;
+
+ while (low <= high)
+ {
+ int mid = (low + high) >> 1;
+ long midVal = a[mid];
+
+ if (midVal < key)
+ low = mid + 1;
+ else if (midVal > key)
+ high = mid - 1;
+ else
+ return mid; // key found
+ }
+ return -(low + 1); // key not found.
+ }
+
+
+
+ /**
+ * Add a new value to the set.
+ * @param v The value to be added.
+ * @return true if the value was added, false if it was already present
+ * in the set.
+ */
+ private boolean add(long v) {
+ resize(count+1);
+
+ if (count == 0 || v > array[count-1])
+ {
+ array[count++] = v;
+ return true;
+ }
+
+ int pos = binarySearch(array, count, v);
+ if (pos >=0)
+ {
+ return false;
+ }
+
+ // For a negative return value r, the index -(r+1) gives the array
+ // index at which the specified value can be inserted to maintain
+ // the sorted order of the array.
+ pos = -(pos+1);
+
+ System.arraycopy(array, pos, array, pos+1, count-pos);
+ array[pos] = v;
+ count++;
+ return true;
+ }
+
+ /**
+ * Adds all the elements of a provided set to this set if they are not
+ * already present.
+ * @param that The set of elements to be added.
+ */
+ private void addAll(LongImportIDSet that) {
+ resize(this.count+that.count);
+
+ if (that.count == 0)
+ {
+ return;
+ }
+
+ // Optimize for the case where the two sets are sure to have no overlap.
+ if (this.count == 0 || that.array[0] > this.array[this.count-1])
+ {
+ System.arraycopy(that.array, 0, this.array, this.count, that.count);
+ count += that.count;
+ return;
+ }
+
+ if (this.array[0] > that.array[that.count-1])
+ {
+ System.arraycopy(this.array, 0, this.array, that.count, this.count);
+ System.arraycopy(that.array, 0, this.array, 0, that.count);
+ count += that.count;
+ return;
+ }
+
+ int destPos = binarySearch(this.array, this.count, that.array[0]);
+ if (destPos < 0)
+ {
+ destPos = -(destPos+1);
+ }
+
+ // Make space for the copy.
+ int aCount = this.count - destPos;
+ int aPos = destPos + that.count;
+ int aEnd = aPos + aCount;
+ System.arraycopy(this.array, destPos, this.array, aPos, aCount);
+
+ // Optimize for the case where there is no overlap.
+ if (this.array[aPos] > that.array[that.count-1])
+ {
+ System.arraycopy(that.array, 0, this.array, destPos, that.count);
+ count += that.count;
+ return;
+ }
+
+ int bPos;
+ for ( bPos = 0; aPos < aEnd && bPos < that.count; )
+ {
+ if ( this.array[aPos] < that.array[bPos] )
+ {
+ this.array[destPos++] = this.array[aPos++];
+ }
+ else if ( this.array[aPos] > that.array[bPos] )
+ {
+ this.array[destPos++] = that.array[bPos++];
+ }
+ else
+ {
+ this.array[destPos++] = this.array[aPos++];
+ bPos++;
+ }
+ }
+
+ // Copy any remainder.
+ int aRemain = aEnd - aPos;
+ if (aRemain > 0)
+ {
+ System.arraycopy(this.array, aPos, this.array, destPos, aRemain);
+ destPos += aRemain;
+ }
+
+ int bRemain = that.count - bPos;
+ if (bRemain > 0)
+ {
+ System.arraycopy(that.array, bPos, this.array, destPos, bRemain);
+ destPos += bRemain;
+ }
+
+ count = destPos;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public int size() {
+ return count;
+ }
+
+
+ /**
+ * Ensures capacity of the internal array for a given number of elements.
+ * @param size The internal array will be guaranteed to be at least this
+ * size.
+ */
+ private void resize(int size) {
+ if (array == null)
+ {
+ array = new long[size];
+ }
+ else if (array.length < size)
+ {
+ // Expand the size of the array in powers of two.
+ int newSize = array.length == 0 ? 1 : array.length;
+ do
+ {
+ newSize *= 2;
+ } while (newSize < size);
+
+ long[] newBytes = new long[newSize];
+ System.arraycopy(array, 0, newBytes, 0, count);
+ array = newBytes;
+ }
+ }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkElement.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkElement.java
new file mode 100644
index 0000000..bd03f81
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkElement.java
@@ -0,0 +1,104 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2008 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.backends.jeb.importLDIF;
+
+import org.opends.server.types.Entry;
+
+/**
+ * A work element passed on the work queue.
+ */
+public class WorkElement {
+
+ //The entry to import.
+ private Entry entry;
+
+ //Used in replace mode, this is the entry to replace.
+ private Entry existingEntry;
+
+ //The context related to the entry.
+ private DNContext context;
+
+ /**
+ * Create a work element instance.
+ *
+ * @param entry The entry to import.
+ * @param context The context related to the entry.
+ */
+ private WorkElement(Entry entry, DNContext context ) {
+ this.entry = entry;
+ this.context = context;
+ }
+
+ /**
+ * Static to create an work element.
+ *
+ * @param entry The entry to import.
+ * @param context The context related to the entry.
+ * @return A work element to put on the queue.
+ */
+ public static
+ WorkElement decode(Entry entry, DNContext context ) {
+ return new WorkElement(entry, context);
+ }
+
+ /**
+ * Return the entry to import.
+ *
+ * @return The entry to import.
+ */
+ public Entry getEntry() {
+ return entry;
+ }
+
+ /**
+ * Return the context related to the entry.
+ *
+ * @return The context.
+ */
+ public DNContext getContext() {
+ return context;
+ }
+
+ /**
+ * Return an existing entry, used during replace mode.
+ *
+ * @return An existing entry.
+ */
+ public Entry getExistingEntry() {
+ return existingEntry;
+ }
+
+ /**
+ * Set the existing entry.
+ *
+ * @param existingEntry The existing entry to set.
+ */
+ public void setExistingEntry(Entry existingEntry) {
+ this.existingEntry = existingEntry;
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
new file mode 100644
index 0000000..f8c11c2
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/jeb/importLDIF/WorkThread.java
@@ -0,0 +1,457 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2008 Sun Microsystems, Inc.
+ */
+
+package org.opends.server.backends.jeb.importLDIF;
+
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.types.*;
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.backends.jeb.*;
+import org.opends.messages.Message;
+import static org.opends.messages.JebMessages.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.*;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Transaction;
+
+/**
+ * A thread to process import entries from a queue. Multiple instances of
+ * this class process entries from a single shared queue.
+ */
+public class WorkThread extends DirectoryThread {
+
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
+ /*
+ * Work queue of work items.
+ */
+ private BlockingQueue<WorkElement> workQueue;
+
+
+ /**
+ * The number of entries imported by this thread.
+ */
+ private int importedCount = 0;
+
+ //Root container.
+ private RootContainer rootContainer;
+
+ /**
+ * A flag that is set when the thread has been told to stop processing.
+ */
+ private boolean stopRequested = false;
+
+ //The thread number related to a thread.
+ private int threadNumber;
+
+ //The substring buffer manager to use.
+ private BufferManager bufferMgr;
+
+ //Flag set when substring buffer should be flushed.
+ private boolean flushBuffer = true;
+
+ /**
+ * Create a work thread instance using the specified parameters.
+ *
+ * @param workQueue The work queue to pull work off of.
+ * @param threadNumber The thread number.
+ * @param bufferMgr The buffer manager to use.
+ * @param rootContainer The root container.
+ */
+ public WorkThread(BlockingQueue<WorkElement> workQueue, int threadNumber,
+ BufferManager bufferMgr,
+ RootContainer rootContainer) {
+ super("Import Worker Thread " + threadNumber);
+ this.threadNumber = threadNumber;
+ this.workQueue = workQueue;
+ this.bufferMgr = bufferMgr;
+ this.rootContainer = rootContainer;
+ }
+
+ /**
+ * Get the number of entries imported by this thread.
+ * @return The number of entries imported by this thread.
+ */
+ int getImportedCount() {
+ return importedCount;
+ }
+
+ /**
+ * Tells the thread to stop processing.
+ */
+ void stopProcessing() {
+ stopRequested = true;
+ }
+
+
+ /**
+ * Tells thread to flush substring buffer.
+ *
+ * @param flush Set to false if substring flush should be skipped.
+ */
+ void setFlush(boolean flush) {
+ this.flushBuffer = flush;
+ }
+ /**
+ * Run the thread. Read from item from queue and give it to the
+ * buffer manage, unless told to stop. Once stopped, ask buffer manager
+ * to flush and exit.
+ *
+ */
+ public void run()
+ {
+ try {
+ do {
+ try {
+ WorkElement element = workQueue.poll(1000, TimeUnit.MILLISECONDS);
+ if(element != null) {
+ process(element);
+ }
+ }
+ catch (InterruptedException e) {
+ if (debugEnabled()) {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ }
+ } while (!stopRequested);
+ if(flushBuffer) {
+ bufferMgr.flushAll(threadNumber);
+ }
+ } catch (Exception e) {
+ if (debugEnabled()) {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Process a work element.
+ *
+ * @param element The work elemenet to process.
+ *
+ * @throws DatabaseException If a database error occurs.
+ * @throws DirectoryException If a directory error occurs.
+ * @throws JebException If a JEB error occurs.
+ */
+ private void process(WorkElement element)
+ throws DatabaseException, DirectoryException, JebException {
+ Transaction txn = null;
+ EntryID entryID;
+ if((entryID = processDN2ID(element, txn)) == null)
+ return;
+ if(!processParent(element, entryID, txn))
+ return;
+ if(!processID2Entry(element, entryID, txn))
+ return;
+ procesID2SCEntry(element, entryID, txn);
+ processIndexesEntry(element, entryID, txn);
+ }
+
+ /**
+ * Delete all indexes related to the specified entry ID using the specified
+ * entry to generate the keys.
+ *
+ * @param element The work element.
+ * @param existingEntry The existing entry to replace.
+ * @param entryID The entry ID to remove from the keys.
+ * @param txn A transaction.
+ * @throws DatabaseException If a database error occurs.
+ */
+ private void
+ processIndexesEntryDelete(WorkElement element, Entry existingEntry,
+ EntryID entryID, Transaction txn)
+ throws DatabaseException {
+ DNContext context = element.getContext();
+ Map<AttributeType, AttributeIndex> attrIndexMap =
+ context.getAttrIndexMap();
+ for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
+ attrIndexMap.entrySet()) {
+ AttributeType attrType = mapEntry.getKey();
+ if(existingEntry.hasAttribute(attrType)) {
+ AttributeIndex attributeIndex = mapEntry.getValue();
+ Index index;
+ if((index=attributeIndex.getEqualityIndex()) != null) {
+ delete(index, existingEntry, entryID, txn);
+ }
+ if((index=attributeIndex.getPresenceIndex()) != null) {
+ delete(index, existingEntry, entryID, txn);
+ }
+ if((index=attributeIndex.getSubstringIndex()) != null) {
+ delete(index, existingEntry, entryID, txn);
+ }
+ if((index=attributeIndex.getOrderingIndex()) != null) {
+ delete(index, existingEntry, entryID, txn);
+ }
+ if((index=attributeIndex.getApproximateIndex()) != null) {
+ delete(index, existingEntry, entryID, txn);
+ }
+ }
+ }
+ }
+
+ /**
+ * Process all indexes using the specified entry ID.
+ *
+ * @param element The work element.
+ * @param entryID The entry ID to process.
+ * @param txn A transaction.
+ * @throws DatabaseException If an database error occurs.
+ */
+ private void
+ processIndexesEntry(WorkElement element, EntryID entryID, Transaction txn)
+ throws DatabaseException {
+ Entry entry = element.getEntry();
+ DNContext context = element.getContext();
+ LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
+ if (ldifImportConfig.appendToExistingData() &&
+ ldifImportConfig.replaceExistingEntries()) {
+ Entry existingEntry = element.getExistingEntry();
+ processIndexesEntryDelete(element, existingEntry, entryID, txn);
+ }
+ Map<AttributeType, AttributeIndex> attrIndexMap =
+ context.getAttrIndexMap();
+ for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
+ attrIndexMap.entrySet()) {
+ AttributeType attrType = mapEntry.getKey();
+ if(entry.hasAttribute(attrType)) {
+ AttributeIndex attributeIndex = mapEntry.getValue();
+ Index index;
+ if((index=attributeIndex.getEqualityIndex()) != null) {
+ insert(index, entry, entryID, txn);
+ }
+ if((index=attributeIndex.getPresenceIndex()) != null) {
+ insert(index, entry, entryID, txn);
+ }
+ if((index=attributeIndex.getSubstringIndex()) != null) {
+ bufferMgr.insert(index,entry, entryID, txn);
+ }
+ if((index=attributeIndex.getOrderingIndex()) != null) {
+ insert(index, entry, entryID, txn);
+ }
+ if((index=attributeIndex.getApproximateIndex()) != null) {
+ insert(index, entry, entryID, txn);
+ }
+ }
+ }
+ }
+
+ /**
+ * Process id2children/id2subtree indexes for the specified entry ID.
+ *
+ * @param element The work element.
+ * @param entryID The entry ID to process.
+ * @param txn A transaction.
+ * @throws DatabaseException If an database error occurs.
+ */
+ private void
+ procesID2SCEntry(WorkElement element, EntryID entryID,
+ Transaction txn) throws DatabaseException {
+ Entry entry = element.getEntry();
+ DNContext context = element.getContext();
+ LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
+ if (ldifImportConfig.appendToExistingData() &&
+ ldifImportConfig.replaceExistingEntries()) {
+ return;
+ }
+ Index id2children = context.getEntryContainer().getID2Children();
+ Index id2subtree = context.getEntryContainer().getID2Subtree();
+ insert(id2children, entry, entryID, txn);
+ insert(id2subtree, entry, entryID, txn);
+ }
+
+ /**
+ * Insert specified entry ID into the specified index using the entry to
+ * generate the keys.
+ *
+ * @param index The index to insert into.
+ * @param entry The entry to generate the keys from.
+ * @param entryID The entry ID to insert.
+ * @param txn A transaction.
+ * @return <CODE>True</CODE> if insert succeeded.
+ * @throws DatabaseException If a database error occurs.
+ */
+ private boolean
+ insert(Index index, Entry entry, EntryID entryID,
+ Transaction txn) throws DatabaseException {
+ Set<byte[]> keySet = new HashSet<byte[]>();
+ index.indexer.indexEntry(txn, entry, keySet);
+ return index.insert(txn, keySet, entryID);
+ }
+
+ /**
+ * Delete specified entry ID into the specified index using the entry to
+ * generate the keys.
+ *
+ * @param index The index to insert into.
+ * @param entry The entry to generate the keys from.
+ * @param entryID The entry ID to insert.
+ * @param txn A transaction.
+ * @throws DatabaseException If a database error occurs.
+ */
+ private void
+ delete(Index index, Entry entry, EntryID entryID,
+ Transaction txn) throws DatabaseException {
+ Set<byte[]> keySet = new HashSet<byte[]>();
+ index.indexer.indexEntry(txn, entry, keySet);
+ index.delete(txn, keySet, entryID);
+ }
+
+ /**
+ * Insert entry from work element into id2entry DB.
+ *
+ * @param element The work element containing the entry.
+ * @param entryID The entry ID to use as the key.
+ * @param txn A transaction.
+ * @return <CODE>True</CODE> If the insert succeeded.
+ * @throws DatabaseException If a database error occurs.
+ * @throws DirectoryException If a directory error occurs.
+ */
+ private boolean
+ processID2Entry(WorkElement element, EntryID entryID, Transaction txn)
+ throws DatabaseException, DirectoryException {
+ boolean ret;
+ Entry entry = element.getEntry();
+ DNContext context = element.getContext();
+ ID2Entry id2entry = context.getEntryContainer().getID2Entry();
+ DN2URI dn2uri = context.getEntryContainer().getDN2URI();
+ ret=id2entry.put(txn, entryID, entry);
+ if(ret) {
+ importedCount++;
+ LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
+ if (ldifImportConfig.appendToExistingData() &&
+ ldifImportConfig.replaceExistingEntries()) {
+ Entry existingEntry = element.getExistingEntry();
+ dn2uri.replaceEntry(txn, existingEntry, entry);
+ } else {
+ ret= dn2uri.addEntry(txn, entry);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Process entry from work element checking if it's parent exists.
+ *
+ * @param element The work element containing the entry.
+ * @param entryID The entry ID to use as the key.
+ * @param txn A transaction.
+ * @return <CODE>True</CODE> If the insert succeeded.
+ * @throws DatabaseException If a database error occurs.
+ */
+ private boolean
+ processParent(WorkElement element, EntryID entryID, Transaction txn)
+ throws DatabaseException {
+ Entry entry = element.getEntry();
+ DNContext context = element.getContext();
+ LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
+ if (ldifImportConfig.appendToExistingData() &&
+ ldifImportConfig.replaceExistingEntries()) {
+ return true;
+ }
+ EntryID parentID = null;
+ DN entryDN = entry.getDN();
+ DN parentDN = context.getEntryContainer().getParentWithinBase(entryDN);
+ DN2ID dn2id = context.getEntryContainer().getDN2ID();
+ if (parentDN != null) {
+ parentID = context.getParentID(parentDN, dn2id, txn);
+ if (parentID == null) {
+ dn2id.remove(txn, entryDN);
+ Message msg =
+ ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString());
+ context.getLDIFReader().rejectLastEntry(msg);
+ return false;
+ }
+ }
+ ArrayList<EntryID> IDs;
+ if (parentDN != null && context.getParentDN() != null &&
+ parentDN.equals(context.getParentDN())) {
+ IDs = new ArrayList<EntryID>(context.getIDs());
+ IDs.set(0, entryID);
+ }
+ else {
+ IDs = new ArrayList<EntryID>(entryDN.getNumComponents());
+ IDs.add(entryID);
+ if (parentID != null)
+ {
+ IDs.add(parentID);
+ EntryContainer ec = context.getEntryContainer();
+ for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
+ dn = ec.getParentWithinBase(dn)) {
+ EntryID nodeID = dn2id.get(txn, dn);
+ IDs.add(nodeID);
+ }
+ }
+ }
+ context.setParentDN(parentDN);
+ context.setIDs(IDs);
+ entry.setAttachment(IDs);
+ return true;
+ }
+
+ /**
+ * Process the a entry from the work element into the dn2id DB.
+ *
+ * @param element The work element containing the entry.
+ * @param txn A transaction.
+ * @return An entry ID.
+ * @throws DatabaseException If a database error occurs.
+ * @throws JebException If a JEB error occurs.
+ */
+ private EntryID
+ processDN2ID(WorkElement element, Transaction txn)
+ throws DatabaseException, JebException {
+ Entry entry = element.getEntry();
+ DNContext context = element.getContext();
+ DN2ID dn2id = context.getEntryContainer().getDN2ID();
+ LDIFImportConfig ldifImportConfig = context.getLDIFImportConfig();
+ DN entryDN = entry.getDN();
+ EntryID entryID = dn2id.get(txn, entryDN);
+ if (entryID != null) {
+ if (ldifImportConfig.appendToExistingData() &&
+ ldifImportConfig.replaceExistingEntries()) {
+ ID2Entry id2entry = context.getEntryContainer().getID2Entry();
+ Entry existingEntry = id2entry.get(txn, entryID);
+ element.setExistingEntry(existingEntry);
+ } else {
+ Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
+ context.getLDIFReader().rejectLastEntry(msg);
+ entryID = null;
+ }
+ } else {
+ entryID = rootContainer.getNextEntryID();
+ dn2id.insert(txn, entryDN, entryID);
+ }
+ context.removePending(entryDN);
+ return entryID;
+ }
+}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/resource/config-changes.ldif b/opendj-sdk/opends/tests/unit-tests-testng/resource/config-changes.ldif
index 3fe188f..011bb87 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/resource/config-changes.ldif
+++ b/opendj-sdk/opends/tests/unit-tests-testng/resource/config-changes.ldif
@@ -422,10 +422,7 @@
ds-cfg-index-entry-limit: 1
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-temp-directory: import-tmp
-ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
-ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
@@ -659,10 +656,7 @@
ds-cfg-index-entry-limit: 10
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-temp-directory: importTmp
-ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
-ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
@@ -836,10 +830,7 @@
ds-cfg-index-entry-limit: 10
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-temp-directory: importTmp
-ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
-ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
@@ -1008,10 +999,7 @@
ds-cfg-index-entry-limit: 10
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-temp-directory: importTmp
-ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
-ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
@@ -1214,10 +1202,7 @@
ds-cfg-index-entry-limit: 13
ds-cfg-subtree-delete-size-limit: 100000
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-temp-directory: importTmp
-ds-cfg-import-buffer-size: 256 megabytes
ds-cfg-import-queue-size: 100
-ds-cfg-import-pass-size: 0
ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-deadlock-retry-limit: 10
--
Gitblit v1.10.0