From 0d9383e9bdcfc20e808968f4b7fe6c1ac0f48fa6 Mon Sep 17 00:00:00 2001
From: dugan <dugan@localhost>
Date: Mon, 17 Aug 2009 00:23:12 +0000
Subject: [PATCH] These changes allow import-ldif to support multiple suffixes and fix some problems with the include/exclude options.
---
opends/resource/schema/02-config.ldif | 7
opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java | 127 +
opends/src/server/org/opends/server/backends/jeb/SubstringIndexer.java | 12
opends/src/messages/messages/tools.properties | 10
opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java | 635 +++++++++--
opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java | 1904 +++++++++++++++++++++++++---------
opends/src/server/org/opends/server/types/LDIFImportConfig.java | 34
opends/resource/config/config.ldif | 1
opends/src/server/org/opends/server/backends/jeb/Index.java | 56
opends/src/server/org/opends/server/protocols/asn1/ASN1OutputStreamWriter.java | 3
opends/src/server/org/opends/server/tools/ImportLDIF.java | 74
opends/tests/unit-tests-testng/resource/config-changes.ldif | 5
opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml | 40
opends/src/server/org/opends/server/backends/jeb/BackendImpl.java | 50
opends/src/server/org/opends/server/util/LDIFReader.java | 33
opends/tests/unit-tests-testng/src/server/org/opends/server/backends/jeb/TestImportJob.java | 22
opends/src/messages/messages/jeb.properties | 5
opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java | 167 +-
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 2
opends/src/server/org/opends/server/tasks/ImportTask.java | 16
opends/src/server/org/opends/server/config/ConfigConstants.java | 12
21 files changed, 2,292 insertions(+), 923 deletions(-)
diff --git a/opends/resource/config/config.ldif b/opends/resource/config/config.ldif
index 20dafcd..dddfa54 100644
--- a/opends/resource/config/config.ldif
+++ b/opends/resource/config/config.ldif
@@ -182,7 +182,6 @@
ds-cfg-db-directory-permissions: 700
ds-cfg-index-entry-limit: 4000
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-compact-encoding: true
ds-cfg-db-cache-percent: 10
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 979bd43..cbbb1cf 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -910,6 +910,11 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.222
+ NAME 'ds-cfg-import-queue-size'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+ SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.184
NAME 'ds-task-restart-server'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
@@ -2470,6 +2475,7 @@
MAY ( ds-cfg-index-entry-limit $
ds-cfg-preload-time-limit $
ds-cfg-import-thread-count $
+ ds-cfg-import-queue-size $
ds-cfg-entries-compressed $
ds-cfg-db-directory-permissions $
ds-cfg-db-cache-percent $
@@ -4042,6 +4048,7 @@
ds-cfg-ndb-num-connections $
ds-cfg-deadlock-retry-limit $
ds-cfg-import-thread-count $
+ ds-cfg-import-queue-size $
ds-cfg-index-entry-limit )
X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.197
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml
index 139295a..2ef8b8e 100644
--- a/opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml
+++ b/opends/src/admin/defn/org/opends/server/admin/std/LocalDBBackendConfiguration.xml
@@ -222,15 +222,45 @@
</ldap:attribute>
</adm:profile>
</adm:property>
+ <adm:property name="import-queue-size" advanced="true">
+ <adm:synopsis>
+ This parameter has been deprecated in OpenDS 2.1 and will be removed
+ in OpenDS 3.0. It is only being kept for migration ease and is ignored
+ in OpenDS versions after 2.0.
+ </adm:synopsis>
+ <adm:requires-admin-action>
+ <adm:none>
+ <adm:synopsis>
+ This parameter has been deprecated in OpenDS 2.1 and will be removed
+ in OpenDS 3.0. It is only being kept for migration ease and is ignored
+ in OpenDS versions after 2.0.
+ </adm:synopsis>
+ </adm:none>
+ </adm:requires-admin-action>
+ <adm:default-behavior>
+ <adm:defined>
+ <adm:value>100</adm:value>
+ </adm:defined>
+ </adm:default-behavior>
+ <adm:syntax>
+ <adm:integer lower-limit="1" upper-limit="2147483647" />
+ </adm:syntax>
+ <adm:profile name="ldap">
+ <ldap:attribute>
+ <ldap:name>ds-cfg-import-queue-size</ldap:name>
+ </ldap:attribute>
+ </adm:profile>
+ </adm:property>
<adm:property name="import-thread-count" advanced="true">
<adm:synopsis>
- Specifies the number of threads that is used for concurrent
- processing during an LDIF import.
+ This parameter has been deprecated in OpenDS 2.1 and will be removed
+ in OpenDS 3.0. It is only being kept for migration ease and is ignored
+ in OpenDS versions after 2.0.
</adm:synopsis>
<adm:description>
- This should generally be a small multiple (for example, 2x) of the number
- of CPUs in the system for a traditional system, or equal to the
- number of CPU strands for a CMT system.
+ This parameter has been deprecated in OpenDS 2.1 and will be removed
+ in OpenDS 3.0. It is only being kept for migration ease and is ignored
+ in OpenDS versions after 2.0.
</adm:description>
<adm:requires-admin-action>
<adm:none>
diff --git a/opends/src/messages/messages/jeb.properties b/opends/src/messages/messages/jeb.properties
index 905e304..8b8846d 100644
--- a/opends/src/messages/messages/jeb.properties
+++ b/opends/src/messages/messages/jeb.properties
@@ -178,10 +178,8 @@
NOTICE_JEB_EXPORT_PROGRESS_REPORT_88=Exported %d records and skipped %d (recent \
rate %.1f/sec)
NOTICE_JEB_IMPORT_THREAD_COUNT_89=Import Thread Count: %d threads
-
SEVERE_ERR_IMPORT_LDIF_LACK_MEM_90=Insufficient free memory to perform import. \
At least %dMB of free memory is required
-
INFO_JEB_IMPORT_LDIF_PROCESSING_TIME_91=LDIF processing took %d seconds
INFO_JEB_IMPORT_INDEX_PROCESSING_TIME_92=Index processing took %d seconds
NOTICE_JEB_IMPORT_CLOSING_DATABASE_93=Flushing data to disk
@@ -371,3 +369,6 @@
buffer count %d and read ahead cache size %d
NOTICE_JEB_IMPORT_LDIF_BUFF_SIZE_LESS_DEFAULT_197=Setting phase one buffer \
size to minimal %d bytes and proceeding
+SEVERE_ERR_DATABASE_ERROR_198=Database error during backend operation: %s
+SEVERE_ERR_IMPORT_LDIF_INVALID_THREAD_COUNT_199=Invalid thread count value %d, \
+value must be greater than or equal to 1
diff --git a/opends/src/messages/messages/tools.properties b/opends/src/messages/messages/tools.properties
index d6e10aa..9b254cf 100644
--- a/opends/src/messages/messages/tools.properties
+++ b/opends/src/messages/messages/tools.properties
@@ -2508,6 +2508,10 @@
INFO_LDIFIMPORT_DESCRIPTION_TEMP_DIRECTORY_1683=Path to temporary directory \
for index scratch files during LDIF import
INFO_LDIFIMPORT_TEMP_DIR_PLACEHOLDER_1684={directory}
-INFO_LDIFIMPORT_DESCRIPTION_DN_CHECK_PHASE_2_1685=Perform DN validation \
- during second phase of LDIF import
-
+INFO_LDIFIMPORT_DESCRIPTION_DN_VALIDATION_1685=Perform DN validation \
+ during later part of LDIF import
+INFO_LDIFIMPORT_DESCRIPTION_THREAD_COUNT_1686=Number of threads used to \
+ read LDIF file during import
+INFO_LDIFIMPORT_THREAD_COUNT_PLACEHOLDER_1687={count}
+SEVERE_ERR_LDIFIMPORT_CANNOT_PARSE_THREAD_COUNT_1688=The value %s for \
+threadCount cannot be parsed: %s
diff --git a/opends/src/server/org/opends/server/backends/jeb/BackendImpl.java b/opends/src/server/org/opends/server/backends/jeb/BackendImpl.java
index 7f8e84c..5fc9578 100644
--- a/opends/src/server/org/opends/server/backends/jeb/BackendImpl.java
+++ b/opends/src/server/org/opends/server/backends/jeb/BackendImpl.java
@@ -68,13 +68,10 @@
import static org.opends.server.util.ServerConstants.*;
import org.opends.server.admin.std.server.LocalDBBackendCfg;
-import org.opends.server.admin.std.server.LocalDBIndexCfg;
import org.opends.server.admin.Configuration;
import org.opends.server.admin.server.ConfigurationChangeListener;
-import org.opends.server.api.ExtensibleIndexer;
import org.opends.server.types.DN;
-import org.opends.server.api.ExtensibleMatchingRule;
/**
* This is an implementation of a Directory Server Backend which stores entries
* locally in a Berkeley DB JE database.
@@ -1133,46 +1130,9 @@
envConfig.setTxnNoSync(false);
envConfig.setConfigParam("je.env.isLocking", "true");
envConfig.setConfigParam("je.env.runCheckpointer", "false");
- //Loop through local indexes and see if any are substring.
- boolean hasSubIndex = false;
- int indexCount = cfg.listLocalDBIndexes().length;
-subIndex:
- for (String idx : cfg.listLocalDBIndexes()) {
- final LocalDBIndexCfg indexCfg = cfg.getLocalDBIndex(idx);
- Set<org.opends.server.admin.std.meta.LocalDBIndexCfgDefn.IndexType>
- indexType = indexCfg.getIndexType();
- if(indexType.contains(org.opends.server.admin.std.
- meta.LocalDBIndexCfgDefn.IndexType.SUBSTRING)) {
- hasSubIndex = true;
- break;
- }
- Set<String> matchingRules =
- indexCfg.getIndexExtensibleMatchingRule();
- for(String ruleName: matchingRules)
- {
- ExtensibleMatchingRule rule =
- DirectoryServer.getExtensibleMatchingRule(ruleName);
- if(rule == null)
- {
- continue;
- }
- for(ExtensibleIndexer indexer: rule.getIndexers(null))
- {
- String indexID = indexer.getExtensibleIndexID();
- if(indexID.equals(EXTENSIBLE_INDEXER_ID_SUBSTRING))
- {
- //The ExtensibelMatchingRule is of substring type.
- hasSubIndex = true;
- break subIndex;
- }
- }
- }
- }
-
Importer importer = new Importer(importConfig, cfg);
importer.init(envConfig);
rootContainer = initializeRootContainer(envConfig);
-
return importer.processImport(rootContainer);
}
catch (ExecutionException execEx)
@@ -1214,6 +1174,16 @@
throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
je.getMessageObject());
}
+ catch (DatabaseException ex)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, ex);
+ }
+ Message message = ERR_DATABASE_ERROR.get(ex.getMessage());
+ throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
+ message);
+ }
catch (InitializationException ie)
{
if (debugEnabled())
diff --git a/opends/src/server/org/opends/server/backends/jeb/Index.java b/opends/src/server/org/opends/server/backends/jeb/Index.java
index 98c93a6..3e39920 100644
--- a/opends/src/server/org/opends/server/backends/jeb/Index.java
+++ b/opends/src/server/org/opends/server/backends/jeb/Index.java
@@ -310,13 +310,39 @@
private void
+ deleteKey(DatabaseEntry key, ImportIDSet importIdSet,
+ DatabaseEntry data) throws DatabaseException {
+
+
+ OperationStatus status = read(null, key, data, LockMode.RMW);
+ if(status == OperationStatus.SUCCESS) {
+ ImportIDSet newImportIDSet = new ImportIDSet(data.getData().length/8,
+ indexEntryLimit, maintainCount);
+ newImportIDSet.remove(data.getData(), importIdSet);
+ if(newImportIDSet.isDefined() && (newImportIDSet.size() == 0))
+ {
+ delete(null, key);
+ }
+ else
+ {
+ data.setData(newImportIDSet.toDatabase());
+ put(null, key, data);
+ }
+ } else {
+ //Should never happen -- the keys should always be there.
+ throw new DatabaseException();
+ }
+ }
+
+
+ private void
insertKey(DatabaseEntry key, ImportIDSet importIdSet,
DatabaseEntry data) throws DatabaseException {
OperationStatus status = read(null, key, data, LockMode.RMW);
if(status == OperationStatus.SUCCESS) {
- ImportIDSet newImportIDSet = new ImportIDSet();
- if (newImportIDSet.merge(data.getData(), importIdSet, indexEntryLimit,
- maintainCount))
+ ImportIDSet newImportIDSet = new ImportIDSet(data.getData().length/8,
+ indexEntryLimit, maintainCount);
+ if (newImportIDSet.merge(data.getData(), importIdSet))
{
entryLimitExceededCount++;
}
@@ -357,6 +383,28 @@
/**
+ * Delete the specified import ID set from the import ID set associated with
+ * the key.
+ *
+ * @param key The key to delete the set from.
+ * @param importIdSet The import ID set to delete.
+ * @param data A database entry to use for data.
+ *
+ * @throws DatabaseException If a database error occurs.
+ */
+ public void
+ delete(DatabaseEntry key, ImportIDSet importIdSet,
+ DatabaseEntry data) throws DatabaseException {
+ Cursor cursor = curLocal.get();
+ if(cursor == null) {
+ cursor = openCursor(null, null);
+ curLocal.set(cursor);
+ }
+ deleteKey(key, importIdSet, data);
+ }
+
+
+ /**
* Add the specified import ID set to the provided keys in the keyset.
*
* @param importIDSet A import ID set to use.
@@ -766,7 +814,7 @@
* @param entryID The entry ID to delete.
* @throws DatabaseException If a database error occurs.
*/
- public synchronized
+ public
void delete(Transaction txn, Set<byte[]> keySet, EntryID entryID)
throws DatabaseException {
setTrusted(txn, false);
diff --git a/opends/src/server/org/opends/server/backends/jeb/SubstringIndexer.java b/opends/src/server/org/opends/server/backends/jeb/SubstringIndexer.java
index d587a05..b032de3 100644
--- a/opends/src/server/org/opends/server/backends/jeb/SubstringIndexer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/SubstringIndexer.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2008 Sun Microsystems, Inc.
+ * Copyright 2006-2009 Sun Microsystems, Inc.
*/
package org.opends.server.backends.jeb;
@@ -298,4 +298,14 @@
}
}
}
+
+ /**
+ * Return the substring length for an indexer.
+ *
+ * @return The substring length configured for an sub string indexer.
+ */
+ public int getSubStringLen()
+ {
+ return substrLength;
+ }
}
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
index 909b3b4..9c3607a 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/ImportIDSet.java
@@ -56,35 +56,25 @@
//Key related to an ID set.
private byte[] key;
+ private int limit;
+ private boolean doCount;
/**
- * Create an empty import set.
- */
- public ImportIDSet() { }
-
-
- /**
- * Create an import ID set of the specified size plus an extra 128 slots.
+ * Create an import ID set of the specified size, index limit and index
+ * maintain count boolean, plus an extra 128 slots.
*
* @param size The size of the the underlying array, plus some extra space.
+ * @param limit The index entry limit.
+ * @param doCount The index maintain count boolean.
*/
- public ImportIDSet(int size)
+ public ImportIDSet(int size, int limit, boolean doCount)
{
this.array = new long[size + 128];
+ this.limit = limit;
+ this.doCount = doCount;
}
- /**
- * Create an import set and add the specified entry ID to it.
- *
- * @param id The entry ID.
- */
- public ImportIDSet(EntryID id)
- {
- this.array = new long[1];
- this.array[0] = id.longValue();
- count=1;
- }
/**
* Return if an import ID set is defined or not.
@@ -121,16 +111,13 @@
* if the newly merged set is defined or not.
*
* @param importIDSet The import ID set to merge with.
- * @param limit The index limit to use in the undefined calculation.
- * @param maintainCount <CODE>True</CODE> if a count of the IDs should be kept
- * after going undefined.
*/
public void
- merge(ImportIDSet importIDSet, int limit, boolean maintainCount)
+ merge(ImportIDSet importIDSet)
{
if(!isDefined() && !importIDSet.isDefined()) //both undefined
{
- if(maintainCount)
+ if(doCount)
{
undefinedSize += importIDSet.getUndefinedSize();
}
@@ -138,7 +125,7 @@
}
else if(!isDefined()) //this undefined
{
- if(maintainCount)
+ if(doCount)
{
undefinedSize += importIDSet.size();
}
@@ -147,7 +134,7 @@
else if(!importIDSet.isDefined()) //other undefined
{
isDefined = false;
- if(maintainCount)
+ if(doCount)
{
undefinedSize = size() + importIDSet.getUndefinedSize();
} else {
@@ -159,7 +146,7 @@
else if ((count + importIDSet.size()) > limit) //add together => undefined
{
isDefined = false;
- if(maintainCount) {
+ if(doCount) {
undefinedSize = size() + importIDSet.size();
} else {
undefinedSize = Long.MAX_VALUE;
@@ -176,25 +163,19 @@
* Add the specified entry id to an import ID set.
*
* @param entryID The entry ID to add to an import ID set.
- * @param limit The index limit to use in the undefined calculation.
- * @param maintainCount <CODE>True</CODE> if a count of the IDs should be kept
- * after going undefined.
*/
- public void addEntryID(EntryID entryID, int limit, boolean maintainCount) {
- addEntryID(entryID.longValue(), limit, maintainCount);
+ public void addEntryID(EntryID entryID) {
+ addEntryID(entryID.longValue());
}
/**
* Add the specified long value to an import ID set.
*
* @param l The long value to add to an import ID set.
- * @param limit The index limit to use in the undefined calculation.
- * @param maintainCount <CODE>True</CODE> if a count of the IDs should be kept
- * after going undefined.
*/
- public void addEntryID(long l, int limit, boolean maintainCount) {
+ public void addEntryID(long l) {
if(!isDefined()) {
- if(maintainCount) {
+ if(doCount) {
undefinedSize++;
}
return;
@@ -202,7 +183,7 @@
if(isDefined() && ((count + 1) > limit)) {
isDefined = false;
array = null;
- if(maintainCount) {
+ if(doCount) {
undefinedSize = count + 1;
} else {
undefinedSize = Long.MAX_VALUE;
@@ -215,7 +196,7 @@
private boolean
- mergeCount(byte[] dBbytes, ImportIDSet importIdSet, int limit) {
+ mergeCount(byte[] dBbytes, ImportIDSet importIdSet) {
boolean incrLimitCount=false;
boolean dbUndefined = ((dBbytes[0] & 0x80) == 0x80);
@@ -248,6 +229,43 @@
return incrLimitCount;
}
+
+ /**
+ * Remove the specified import ID set from the byte array read from the DB.
+ *
+ * @param dBbytes The byte array read from JEB.
+ * @param importIdSet The import ID set to delete.
+ */
+ public void remove(byte[] dBbytes, ImportIDSet importIdSet)
+ {
+ boolean incrLimitCount=false;
+ boolean dbUndefined = ((dBbytes[0] & 0x80) == 0x80);
+ if(dbUndefined) {
+ isDefined=false;
+ importIdSet.setUndefined();
+ undefinedSize = Long.MAX_VALUE;
+ } else if(!importIdSet.isDefined()) {
+ isDefined=false;
+ incrLimitCount=true;
+ undefinedSize = Long.MAX_VALUE;
+ } else {
+ array = JebFormat.entryIDListFromDatabase(dBbytes);
+ if(array.length - importIdSet.size() > limit) {
+ isDefined=false;
+ incrLimitCount=true;
+ count = 0;
+ importIdSet.setUndefined();
+ undefinedSize = Long.MAX_VALUE;
+ } else {
+ count = array.length;
+ removeAll(importIdSet);
+ }
+ }
+ }
+
+
+
+
/**
* Merge the specified byte array read from a DB, with the specified import
* ID set. The specified limit and maintain count parameters define
@@ -255,18 +273,14 @@
*
* @param dBbytes The byte array of IDs read from a DB.
* @param importIdSet The import ID set to merge the byte array with.
- * @param limit The index limit to use in the undefined calculation.
- * @param maintainCount <CODE>True</CODE> if the import ID set should
- * maintain a count of import IDs.
* @return <CODE>True</CODE> if the import ID set started keeping a count as
* a result of the merge.
*/
- public boolean merge(byte[] dBbytes, ImportIDSet importIdSet,
- int limit, boolean maintainCount)
+ public boolean merge(byte[] dBbytes, ImportIDSet importIdSet)
{
boolean incrLimitCount=false;
- if(maintainCount) {
- incrLimitCount = mergeCount(dBbytes, importIdSet, limit);
+ if(doCount) {
+ incrLimitCount = mergeCount(dBbytes, importIdSet);
} else {
boolean dbUndefined = ((dBbytes[0] & 0x80) == 0x80);
if(dbUndefined) {
@@ -294,6 +308,25 @@
return incrLimitCount;
}
+
+ private void removeAll(ImportIDSet that) {
+
+ long[] newArray = new long[array.length];
+ int c = 0;
+ for(int i=0; i < count; i++)
+ {
+ int rc = binarySearch(that.array, that.count, array[i]);
+ if(rc < 0)
+ {
+ newArray[c++] = array[i];
+ }
+ }
+ array = newArray;
+ count = c;
+ }
+
+
+
private void addAll(ImportIDSet that) {
resize(this.count+that.count);
@@ -511,6 +544,6 @@
*/
public byte[] getKey()
{
- return this.key;
+ return key;
}
}
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
index 1c4040c..5cdaec2 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Importer.java
@@ -69,24 +69,29 @@
private final int MAX_DB_CACHE_SIZE = 128 * MB;
private final int MIN_DB_CACHE_SIZE = 16 * MB;
private final int MAX_DB_LOG_BUF_BYTES = 100 * MB;
- private final int MEM_PCT_PHASE_1 = 60;
+ private final int MEM_PCT_PHASE_1 = 45;
private final int MEM_PCT_PHASE_2 = 50;
private final String DIRECT_PROPERTY = "import.directphase2";
+ private static AttributeType dnType;
+ private static IndexBuffer.DNComparator dnComparator
+ = new IndexBuffer.DNComparator();
+ private static final IndexBuffer.IndexComparator indexComparator =
+ new IndexBuffer.IndexComparator();
private final AtomicInteger bufferCount = new AtomicInteger(0);
private final File tempDir;
private final int indexCount, threadCount;
- private final boolean dn2idPhase2;
+ private final boolean skipDNValidation;
private final LDIFImportConfig config;
+ private final LocalDBBackendCfg dbCfg;
private final ByteBuffer directBuffer;
-
private RootContainer rootContainer;
private LDIFReader reader;
- private int bufferSize;
+ private int bufferSize, indexBufferCount;
+ private int migratedCount;
private long dbCacheSize = 0, dbLogBufSize = 0;
-
//The executor service used for the sort tasks.
private ExecutorService sortService;
@@ -97,15 +102,15 @@
private final BlockingQueue<IndexBuffer> freeBufQue =
new LinkedBlockingQueue<IndexBuffer>();
- //Map of DB containers to que of index buffers. Used to allocate sorted
+ //Map of index keys to index buffers. Used to allocate sorted
//index buffers to a index writer thread.
private final
- Map<DatabaseContainer, BlockingQueue<IndexBuffer>> containerQueMap =
- new LinkedHashMap<DatabaseContainer, BlockingQueue<IndexBuffer>>();
+ Map<IndexKey, BlockingQueue<IndexBuffer>> indexKeyQueMap =
+ new ConcurrentHashMap<IndexKey, BlockingQueue<IndexBuffer>>();
//Map of DB containers to index managers. Used to start phase 2.
- private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap =
- new LinkedHashMap<DatabaseContainer, IndexManager>();
+ private final List<IndexManager> indexMgrList =
+ new LinkedList<IndexManager>();
//Futures used to indicate when the index file writers are done flushing
//their work queues and have exited. End of phase one.
@@ -115,25 +120,55 @@
//index file writer tasks when the LDIF file has been done.
private final List<IndexFileWriterTask> indexWriterList;
- //Map of DNs to Suffix objects. Placeholder for when multiple suffixes are
- //supported.
+
+ //Map of DNs to Suffix objects.
private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>();
+
+ private final ConcurrentHashMap<Integer, DatabaseContainer> idContainerMap =
+ new ConcurrentHashMap<Integer, DatabaseContainer>();
+
+ private final ConcurrentHashMap<Integer, EntryContainer> idECMap =
+ new ConcurrentHashMap<Integer, EntryContainer>();
+
+ private final Object synObj = new Object();
+
+ static
+ {
+ if ((dnType = DirectoryServer.getAttributeType("dn")) == null)
+ {
+ dnType = DirectoryServer.getDefaultAttributeType("dn");
+ }
+ }
+
/**
* Create a new import job with the specified ldif import config.
*
* @param config The LDIF import config.
- * @param cfg The local DB backend config.
+ * @param dbCfg The local DB backend config.
* @throws IOException If a problem occurs while opening the LDIF file for
* reading.
+ * @throws InitializationException If a problem occurs initializationing.
*/
- public Importer(LDIFImportConfig config,
- LocalDBBackendCfg cfg )
- throws IOException
+ public Importer(LDIFImportConfig config, LocalDBBackendCfg dbCfg )
+ throws IOException, InitializationException
{
this.config = config;
- threadCount = cfg.getImportThreadCount();
- indexCount = cfg.listLocalDBIndexes().length + 2;
+ this.dbCfg = dbCfg;
+ if(config.getThreadCount() == -1)
+ {
+ threadCount = Runtime.getRuntime().availableProcessors() * 2;
+ }
+ else
+ {
+ threadCount = config.getThreadCount();
+ if(threadCount <= 0)
+ {
+ Message msg = ERR_IMPORT_LDIF_INVALID_THREAD_COUNT.get(threadCount);
+ throw new InitializationException(msg);
+ }
+ }
+ indexCount = dbCfg.listLocalDBIndexes().length + 2;
indexWriterList = new ArrayList<IndexFileWriterTask>(indexCount);
indexWriterFutures = new CopyOnWriteArrayList<Future<?>>();
File parentDir;
@@ -145,7 +180,8 @@
{
parentDir = getFileForPath(config.getTmpDirectory());
}
- tempDir = new File(parentDir, cfg.getBackendId());
+
+ tempDir = new File(parentDir, dbCfg.getBackendId());
if(!tempDir.exists() && !tempDir.mkdirs())
{
Message msg = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(
@@ -159,7 +195,7 @@
f.delete();
}
}
- dn2idPhase2 = config.getDNCheckPhase2();
+ skipDNValidation = config.getSkipDNValidation();
String propString = System.getProperty(DIRECT_PROPERTY);
if(propString != null)
{
@@ -275,8 +311,8 @@
private void initIndexBuffers(int threadCount)
{
- int bufferCount = 2 * (indexCount * threadCount);
- for(int i = 0; i < bufferCount; i++)
+ indexBufferCount = 2 * (indexCount * threadCount);
+ for(int i = 0; i < indexBufferCount; i++)
{
IndexBuffer b = IndexBuffer.createIndexBuffer(bufferSize);
freeBufQue.add(b);
@@ -285,16 +321,130 @@
- private void initSuffixes()
- throws ConfigException, InitializationException
+ private void initSuffixes() throws DatabaseException, JebException,
+ ConfigException, InitializationException
{
- Iterator<EntryContainer> i = rootContainer.getEntryContainers().iterator();
- EntryContainer ec = i.next();
- Suffix suffix = Suffix.createSuffixContext(ec, config, rootContainer);
- dnSuffixMap.put(ec.getBaseDN(), suffix);
+ for(EntryContainer ec : rootContainer.getEntryContainers())
+ {
+ Suffix suffix = getSuffix(ec);
+ if(suffix != null)
+ {
+ dnSuffixMap.put(ec.getBaseDN(), suffix);
+ }
+ }
}
+ private Suffix getSuffix(EntryContainer entryContainer)
+ throws DatabaseException, JebException, ConfigException,
+ InitializationException {
+ DN baseDN = entryContainer.getBaseDN();
+ EntryContainer srcEntryContainer = null;
+ List<DN> includeBranches = new ArrayList<DN>();
+ List<DN> excludeBranches = new ArrayList<DN>();
+
+ if(!config.appendToExistingData() &&
+ !config.clearBackend())
+ {
+ for(DN dn : config.getExcludeBranches())
+ {
+ if(baseDN.equals(dn))
+ {
+ // This entire base DN was explicitly excluded. Skip.
+ return null;
+ }
+ if(baseDN.isAncestorOf(dn))
+ {
+ excludeBranches.add(dn);
+ }
+ }
+
+ if(!config.getIncludeBranches().isEmpty())
+ {
+ for(DN dn : config.getIncludeBranches())
+ {
+ if(baseDN.isAncestorOf(dn))
+ {
+ includeBranches.add(dn);
+ }
+ }
+
+ if(includeBranches.isEmpty())
+ {
+ // There are no branches in the explicitly defined include list under
+ // this base DN. Skip this base DN alltogether.
+
+ return null;
+ }
+
+ // Remove any overlapping include branches.
+ Iterator<DN> includeBranchIterator = includeBranches.iterator();
+ while(includeBranchIterator.hasNext())
+ {
+ DN includeDN = includeBranchIterator.next();
+ boolean keep = true;
+ for(DN dn : includeBranches)
+ {
+ if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN))
+ {
+ keep = false;
+ break;
+ }
+ }
+ if(!keep)
+ {
+ includeBranchIterator.remove();
+ }
+ }
+
+ // Remvoe any exclude branches that are not are not under a include
+ // branch since they will be migrated as part of the existing entries
+ // outside of the include branches anyways.
+ Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
+ while(excludeBranchIterator.hasNext())
+ {
+ DN excludeDN = excludeBranchIterator.next();
+ boolean keep = false;
+ for(DN includeDN : includeBranches)
+ {
+ if(includeDN.isAncestorOf(excludeDN))
+ {
+ keep = true;
+ break;
+ }
+ }
+ if(!keep)
+ {
+ excludeBranchIterator.remove();
+ }
+ }
+
+ if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
+ includeBranches.get(0).equals(baseDN))
+ {
+ // This entire base DN is explicitly included in the import with
+ // no exclude branches that we need to migrate. Just clear the entry
+ // container.
+ entryContainer.lock();
+ entryContainer.clear();
+ entryContainer.unlock();
+ }
+ else
+ {
+ // Create a temp entry container
+ srcEntryContainer = entryContainer;
+ entryContainer =
+ rootContainer.openEntryContainer(baseDN,
+ baseDN.toNormalizedString() +
+ "_importTmp");
+ }
+ }
+ }
+ return Suffix.createSuffixContext(entryContainer, srcEntryContainer,
+ includeBranches, excludeBranches);
+ }
+
+
/**
* Import a ldif using the specified root container.
@@ -311,12 +461,14 @@
* @throws InterruptedException If the import failed due to an interrupted
* error.
* @throws ExecutionException If the import failed due to an execution error.
+ * @throws DatabaseException If the import failed due to a database error.
*/
public LDIFImportResult
processImport(RootContainer rootContainer) throws ConfigException,
- InitializationException, IOException, JebException,
+ InitializationException, IOException, JebException, DatabaseException,
InterruptedException, ExecutionException
{
+ try {
this.rootContainer = rootContainer;
this.reader = new LDIFReader(config, rootContainer, LDIF_READER_BUF_SIZE);
Message message =
@@ -331,6 +483,7 @@
processPhaseOne();
processPhaseTwo();
setIndexesTrusted();
+ switchContainers();
tempDir.delete();
long finishTime = System.currentTimeMillis();
long importTime = (finishTime - startTime);
@@ -339,13 +492,48 @@
rate = 1000f * reader.getEntriesRead() / importTime;
message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(),
reader.getEntriesRead(), reader.getEntriesIgnored(), reader
- .getEntriesRejected(), 0, importTime / 1000, rate);
+ .getEntriesRejected(), migratedCount, importTime / 1000, rate);
logError(message);
+ }
+ finally
+ {
+ reader.close();
+ }
return new LDIFImportResult(reader.getEntriesRead(), reader
.getEntriesRejected(), reader.getEntriesIgnored());
}
+ private void switchContainers() throws DatabaseException, JebException {
+
+ for(Suffix suffix : dnSuffixMap.values()) {
+ DN baseDN = suffix.getBaseDN();
+ EntryContainer srcEntryContainer =
+ suffix.getSrcEntryContainer();
+ if(srcEntryContainer != null) {
+ EntryContainer unregEC =
+ rootContainer.unregisterEntryContainer(baseDN);
+ //Make sure the unregistered EC for the base DN is the same as
+ //the one in the import context.
+ if(unregEC != srcEntryContainer) {
+ rootContainer.registerEntryContainer(baseDN, unregEC);
+ continue;
+ }
+ srcEntryContainer.lock();
+ srcEntryContainer.close();
+ srcEntryContainer.delete();
+ srcEntryContainer.unlock();
+ EntryContainer newEC = suffix.getEntryContainer();
+ newEC.lock();
+ newEC.setDatabasePrefix(baseDN.toNormalizedString());
+ newEC.unlock();
+ rootContainer.registerEntryContainer(baseDN, newEC);
+ }
+ }
+ }
+
+
+
private void setIndexesTrusted() throws JebException
{
try {
@@ -369,21 +557,48 @@
timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL);
indexProcessService = Executors.newFixedThreadPool(2 * indexCount);
sortService = Executors.newFixedThreadPool(threadCount);
-
- //Import tasks are collective tasks.
- List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
- for (int i = 0; i < threadCount; i++)
- {
- tasks.add(new ImportTask());
- }
ExecutorService execService = Executors.newFixedThreadPool(threadCount);
- List<Future<Void>> results = execService.invokeAll(tasks);
- for (Future<Void> result : results)
- assert result.isDone();
+ List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
+
+ tasks.add(new MigrateExistingTask());
+ List<Future<Void>> results = execService.invokeAll(tasks);
+ for (Future<Void> result : results)
+ assert result.isDone();
+ tasks.clear();
+ results.clear();
+
+ if (config.appendToExistingData() &&
+ config.replaceExistingEntries())
+ {
+ for (int i = 0; i < threadCount; i++)
+ {
+ tasks.add(new AppendReplaceTask());
+ }
+ }
+ else
+ {
+ for (int i = 0; i < threadCount; i++)
+ {
+ tasks.add(new ImportTask());
+ }
+ }
+ results = execService.invokeAll(tasks);
+ for (Future<Void> result : results)
+ assert result.isDone();
+
+
+ tasks.clear();
+ results.clear();
+ tasks.add(new MigrateExcludedTask());
+ results = execService.invokeAll(tasks);
+ for (Future<Void> result : results)
+ assert result.isDone();
+
+
stopIndexWriterTasks();
for (Future<?> result : indexWriterFutures)
{
- result.get();
+ result.get();
}
execService.shutdown();
freeBufQue.clear();
@@ -396,7 +611,7 @@
private void processPhaseTwo() throws InterruptedException
{
SecondPhaseProgressTask progress2Task =
- new SecondPhaseProgressTask(containerIndexMgrMap);
+ new SecondPhaseProgressTask(indexMgrList);
Timer timer2 = new Timer();
timer2.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL, TIMER_INTERVAL);
processIndexFiles();
@@ -419,29 +634,21 @@
{
cacheSize = cacheSizeFromDirectMemory();
}
- for(Map.Entry<DatabaseContainer, IndexManager> e :
- containerIndexMgrMap.entrySet())
+ for(IndexManager idxMgr : indexMgrList)
{
- DatabaseContainer container = e.getKey();
- IndexManager indexMgr = e.getValue();
- boolean isDN2ID = false;
- if(container instanceof DN2ID)
- {
- isDN2ID = true;
- }
if(directBuffer != null)
{
- int cacheSizes = cacheSize * indexMgr.getBufferList().size();
+ int cacheSizes = cacheSize * idxMgr.getBufferList().size();
offSet += cacheSizes;
directBuffer.limit(offSet);
directBuffer.position(p);
ByteBuffer b = directBuffer.slice();
- tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, b, cacheSize));
+ tasks.add(new IndexWriteDBTask(idxMgr, b, cacheSize));
p += cacheSizes;
}
else
{
- tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, cacheSize));
+ tasks.add(new IndexWriteDBTask(idxMgr, null, cacheSize));
}
}
List<Future<Void>> results = indexProcessService.invokeAll(tasks);
@@ -497,24 +704,170 @@
/**
- * This task processes the LDIF file during phase 1.
+ * Task used to migrate excluded branch.
*/
- private final class ImportTask implements Callable<Void> {
- private final Map<Suffix, Map<DatabaseContainer, IndexBuffer>> suffixMap =
- new HashMap<Suffix, Map<DatabaseContainer, IndexBuffer>>();
+ private final class MigrateExcludedTask extends ImportTask
+ {
private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
- private final IndexBuffer.DNComparator dnComparator
- = new IndexBuffer.DNComparator();
- private final IndexBuffer.IndexComparator indexComparator =
- new IndexBuffer.IndexComparator();
+ private final
+ Map<IndexKey, IndexBuffer> indexBufferMap =
+ new HashMap<IndexKey, IndexBuffer>();
+
+ public Void call() throws Exception
+ {
+ for(Suffix suffix : dnSuffixMap.values()) {
+ EntryContainer srcEntryContainer = suffix.getSrcEntryContainer();
+ if(srcEntryContainer != null &&
+ !suffix.getExcludeBranches().isEmpty()) {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ LockMode lockMode = LockMode.DEFAULT;
+ OperationStatus status;
+ Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
+ "excluded", String.valueOf(suffix.getBaseDN()));
+ logError(message);
+ Cursor cursor =
+ srcEntryContainer.getDN2ID().openCursor(null,
+ CursorConfig.READ_COMMITTED);
+ Comparator<byte[]> dn2idComparator =
+ srcEntryContainer.getDN2ID().getComparator();
+ try {
+ for(DN excludedDN : suffix.getExcludeBranches()) {
+ byte[] bytes =
+ StaticUtils.getBytes(excludedDN.toNormalizedString());
+ key.setData(bytes);
+ status = cursor.getSearchKeyRange(key, data, lockMode);
+ if(status == OperationStatus.SUCCESS &&
+ Arrays.equals(key.getData(), bytes)) {
+ // This is the base entry for a branch that was excluded in the
+ // import so we must migrate all entries in this branch over to
+ // the new entry container.
+ byte[] end =
+ StaticUtils.getBytes("," + excludedDN.toNormalizedString());
+ end[0] = (byte) (end[0] + 1);
+
+ while(status == OperationStatus.SUCCESS &&
+ dn2idComparator.compare(key.getData(), end) < 0 &&
+ !config.isCancelled()) {
+ EntryID id = new EntryID(data);
+ Entry entry = srcEntryContainer.getID2Entry().get(null,
+ id, LockMode.DEFAULT);
+ processEntry(entry, rootContainer.getNextEntryID(),
+ suffix);
+ migratedCount++;
+ status = cursor.getNext(key, data, lockMode);
+ }
+ }
+ }
+ }
+ finally
+ {
+ cursor.close();
+ flushIndexBuffers();
+ closeCursors();
+ }
+ }
+ }
+ return null;
+ }
+ }
+
+
+ /**
+ * Task to migrate existing entries.
+ */
+ private final class MigrateExistingTask extends ImportTask
+ {
+
+ private final
+ Map<IndexKey, IndexBuffer> indexBufferMap =
+ new HashMap<IndexKey, IndexBuffer>();
+ private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
+
+ public Void call() throws Exception
+ {
+ for(Suffix suffix : dnSuffixMap.values()) {
+ EntryContainer srcEntryContainer = suffix.getSrcEntryContainer();
+ if(srcEntryContainer != null &&
+ !suffix.getIncludeBranches().isEmpty()) {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ LockMode lockMode = LockMode.DEFAULT;
+ OperationStatus status;
+ Message message = NOTE_JEB_IMPORT_MIGRATION_START.get(
+ "existing", String.valueOf(suffix.getBaseDN()));
+ logError(message);
+ Cursor cursor =
+ srcEntryContainer.getDN2ID().openCursor(null,
+ null);
+ try {
+ status = cursor.getFirst(key, data, lockMode);
+ while(status == OperationStatus.SUCCESS &&
+ !config.isCancelled()) {
+ DN dn = DN.decode(ByteString.wrap(key.getData()));
+ if(!suffix.getIncludeBranches().contains(dn)) {
+ EntryID id = new EntryID(data);
+ Entry entry =
+ srcEntryContainer.getID2Entry().get(null,
+ id, LockMode.DEFAULT);
+ processEntry(entry, rootContainer.getNextEntryID(),suffix);
+ migratedCount++;
+ status = cursor.getNext(key, data, lockMode);
+ } else {
+ // This is the base entry for a branch that will be included
+ // in the import so we don't want to copy the branch to the
+ // new entry container.
+
+ /**
+ * Advance the cursor to next entry at the same level in the
+ * DIT
+ * skipping all the entries in this branch.
+ * Set the next starting value to a value of equal length but
+ * slightly greater than the previous DN. Since keys are
+ * compared in reverse order we must set the first byte
+ * (the comma).
+ * No possibility of overflow here.
+ */
+ byte[] begin =
+ StaticUtils.getBytes("," + dn.toNormalizedString());
+ begin[0] = (byte) (begin[0] + 1);
+ key.setData(begin);
+ status = cursor.getSearchKeyRange(key, data, lockMode);
+ }
+ }
+ } finally {
+ cursor.close();
+ flushIndexBuffers();
+ closeCursors();
+ }
+ }
+ }
+ return null;
+ }
+
+ }
+
+ /**
+ * Task to handle append/replace combination.
+ */
+ private class AppendReplaceTask extends ImportTask
+ {
+
+ private final
+ Map<IndexKey, IndexBuffer> indexBufferMap =
+ new HashMap<IndexKey, IndexBuffer>();
+ private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
+ private final Set<byte[]> deleteKeySet = new HashSet<byte[]>();
+ private final EntryInformation entryInfo = new EntryInformation();
+ private Entry oldEntry;
+ private EntryID entryID;
/**
* {@inheritDoc}
*/
public Void call() throws Exception
{
- Suffix suffix = null;
while (true)
{
if (config.isCancelled())
@@ -523,36 +876,49 @@
freeBufQue.add(idxBuffer);
return null;
}
- Entry entry = reader.readEntry(dnSuffixMap);
-
+ oldEntry = null;
+ Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
if (entry == null)
{
break;
}
- DN entryDN = entry.getDN();
- EntryID entryID = (EntryID) entry.getAttachment();
- //Temporary until multiple suffixes supported.
- if(suffix == null)
- {
- suffix = getMatchSuffix(entryDN, dnSuffixMap);
- }
- if(!suffixMap.containsKey(suffix))
- {
- suffixMap.put(suffix, new HashMap<DatabaseContainer, IndexBuffer>());
- }
- if(!dn2idPhase2)
+ entryID = entryInfo.getEntryID();
+ Suffix suffix = entryInfo.getSuffix();
+ processEntry(entry, suffix);
+ }
+ flushIndexBuffers();
+ closeCursors();
+ return null;
+ }
+
+
+ void processEntry(Entry entry, Suffix suffix)
+ throws DatabaseException, ConfigException, DirectoryException,
+ JebException
+
+ {
+ DN entryDN = entry.getDN();
+ DN2ID dn2id = suffix.getDN2ID();
+ EntryID oldID = dn2id.get(null, entryDN, LockMode.DEFAULT);
+ if(oldID != null)
+ {
+ oldEntry = suffix.getID2Entry().get(null, oldID, LockMode.DEFAULT);
+ }
+ if(oldEntry == null)
+ {
+ if(!skipDNValidation)
{
if(!processParent(entryDN, entryID, entry, suffix))
{
suffix.removePending(entryDN);
- continue;
+ return;
}
if(!suffix.getDN2ID().insert(null, entryDN, entryID))
{
suffix.removePending(entryDN);
- Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
- reader.rejectEntry(entry, msg);
- continue;
+ Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
+ reader.rejectEntry(entry, msg);
+ return;
}
suffix.removePending(entryDN);
processID2SC(entryID, entry, suffix);
@@ -562,20 +928,198 @@
processDN2ID(suffix, entryDN, entryID);
suffix.removePending(entryDN);
}
- suffix.getID2Entry().put(null, entryID, entry);
+ }
+ else
+ {
+ suffix.removePending(entryDN);
+ entryID = oldID;
+ }
+ suffix.getID2Entry().put(null, entryID, entry);
+ if(oldEntry == null)
+ {
processIndexes(suffix, entry, entryID);
}
- flushIndexBuffers();
- if(!dn2idPhase2)
+ else
{
- suffix.getEntryContainer().getID2Children().closeCursor();
- suffix.getEntryContainer().getID2Subtree().closeCursor();
+ processAllIndexes(suffix, entry, entryID);
}
+ }
+
+ void
+ processAllIndexes(Suffix ctx, Entry entry, EntryID entryID) throws
+ DatabaseException, DirectoryException, JebException, ConfigException
+ {
+ Transaction txn = null;
+ Map<AttributeType, AttributeIndex> attrMap = ctx.getAttrIndexMap();
+ for(Map.Entry<AttributeType, AttributeIndex> mapEntry :
+ attrMap.entrySet()) {
+ AttributeType attrType = mapEntry.getKey();
+ AttributeIndex attributeIndex = mapEntry.getValue();
+ Index index;
+ if((index=attributeIndex.getEqualityIndex()) != null) {
+ indexAttr(index, entry, entryID,
+ new IndexKey(attrType,IndexType.EQUALITY));
+ }
+ if((index=attributeIndex.getPresenceIndex()) != null) {
+ indexAttr(index, entry, entryID,
+ new IndexKey(attrType, IndexType.PRESENCE));
+ }
+ if((index=attributeIndex.getSubstringIndex()) != null) {
+ int subLen = ((SubstringIndexer)index.indexer).getSubStringLen();
+ indexAttr(index, entry, entryID,
+ new IndexKey(attrType, IndexType.SUBSTRING, subLen));
+ }
+ if((index=attributeIndex.getOrderingIndex()) != null) {
+ indexAttr(index, entry, entryID,
+ new IndexKey(attrType, IndexType.ORDERING));
+ }
+ if((index=attributeIndex.getApproximateIndex()) != null) {
+ indexAttr(index, entry, entryID,
+ new IndexKey(attrType,IndexType.APPROXIMATE));
+ }
+ for(VLVIndex vlvIdx : ctx.getEntryContainer().getVLVIndexes()) {
+ vlvIdx.addEntry(txn, entryID, entry);
+ }
+ Map<String,Collection<Index>> extensibleMap =
+ attributeIndex.getExtensibleIndexes();
+ if(!extensibleMap.isEmpty()) {
+ Collection<Index> subIndexes =
+ attributeIndex.getExtensibleIndexes().get(
+ EXTENSIBLE_INDEXER_ID_SUBSTRING);
+ if(subIndexes != null) {
+ for(Index subIndex: subIndexes) {
+ indexAttr(subIndex, entry, entryID,
+ new IndexKey(attrType, IndexType.EX_SUBSTRING));
+ }
+ }
+ Collection<Index> sharedIndexes =
+ attributeIndex.getExtensibleIndexes().get(
+ EXTENSIBLE_INDEXER_ID_SHARED);
+ if(sharedIndexes !=null) {
+ for(Index sharedIndex:sharedIndexes) {
+ indexAttr(sharedIndex, entry, entryID,
+ new IndexKey(attrType, IndexType.EX_SHARED));
+ }
+ }
+ }
+ }
+ }
+
+
+
+ void indexAttr(Index index, Entry entry, EntryID entryID,
+ IndexKey indexKey) throws DatabaseException,
+ ConfigException
+ {
+
+ if(oldEntry != null)
+ {
+ deleteKeySet.clear();
+ index.indexer.indexEntry(oldEntry, deleteKeySet);
+ for(byte[] delKey : deleteKeySet)
+ {
+ processKey(index, delKey, entryID, indexComparator, indexKey, false);
+ }
+ }
+ insertKeySet.clear();
+ index.indexer.indexEntry(entry, insertKeySet);
+ for(byte[] key : insertKeySet)
+ {
+ processKey(index, key, entryID, indexComparator, indexKey, true);
+ }
+ }
+ }
+
+
+
+ /**
+ * This task processes the LDIF file during phase 1.
+ */
+ private class ImportTask implements Callable<Void>
+ {
+
+ private final
+ Map<IndexKey, IndexBuffer> indexBufferMap =
+ new HashMap<IndexKey, IndexBuffer>();
+ private final Set<byte[]> insertKeySet = new HashSet<byte[]>();
+ private final EntryInformation entryInfo = new EntryInformation();
+
+ /**
+ * {@inheritDoc}
+ */
+ public Void call() throws Exception
+ {
+ while (true)
+ {
+ if (config.isCancelled())
+ {
+ IndexBuffer idxBuffer = IndexBuffer.createIndexBuffer(0);
+ freeBufQue.add(idxBuffer);
+ return null;
+ }
+ Entry entry = reader.readEntry(dnSuffixMap, entryInfo);
+
+ if (entry == null)
+ {
+ break;
+ }
+ EntryID entryID = entryInfo.getEntryID();
+ Suffix suffix = entryInfo.getSuffix();
+ processEntry(entry, entryID, suffix);
+ }
+ flushIndexBuffers();
+ closeCursors();
return null;
}
- private boolean processParent(DN entryDN, EntryID entryID, Entry entry,
+ void closeCursors() throws DatabaseException
+ {
+ if(!skipDNValidation)
+ {
+ for(Suffix suffix : dnSuffixMap.values())
+ {
+ suffix.getEntryContainer().getID2Children().closeCursor();
+ suffix.getEntryContainer().getID2Subtree().closeCursor();
+ }
+ }
+ }
+
+
+ void processEntry(Entry entry, EntryID entryID, Suffix suffix)
+ throws DatabaseException, ConfigException, DirectoryException,
+ JebException
+
+ {
+ DN entryDN = entry.getDN();
+ if(!skipDNValidation)
+ {
+ if(!processParent(entryDN, entryID, entry, suffix))
+ {
+ suffix.removePending(entryDN);
+ return;
+ }
+ if(!suffix.getDN2ID().insert(null, entryDN, entryID))
+ {
+ suffix.removePending(entryDN);
+ Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get();
+ reader.rejectEntry(entry, msg);
+ return;
+ }
+ suffix.removePending(entryDN);
+ processID2SC(entryID, entry, suffix);
+ }
+ else
+ {
+ processDN2ID(suffix, entryDN, entryID);
+ suffix.removePending(entryDN);
+ }
+ suffix.getID2Entry().put(null, entryID, entry);
+ processIndexes(suffix, entry, entryID);
+ return;
+ }
+
+ boolean processParent(DN entryDN, EntryID entryID, Entry entry,
Suffix suffix) throws DatabaseException
{
EntryID parentID = null;
@@ -630,7 +1174,7 @@
return true;
}
- private void processID2SC(EntryID entryID, Entry entry, Suffix suffix)
+ void processID2SC(EntryID entryID, Entry entry, Suffix suffix)
throws DatabaseException
{
Set<byte[]> childKeySet = new HashSet<byte[]>();
@@ -642,20 +1186,20 @@
DatabaseEntry dbKey = new DatabaseEntry();
DatabaseEntry dbVal = new DatabaseEntry();
- ImportIDSet idSet = new ImportIDSet();
- idSet.addEntryID(entryID, id2children.getIndexEntryLimit(),
- id2children.getMaintainCount());
+ ImportIDSet idSet = new ImportIDSet(1, id2children.getIndexEntryLimit(),
+ id2children.getMaintainCount());
+ idSet.addEntryID(entryID);
id2children.insert(idSet, childKeySet, dbKey, dbVal);
DatabaseEntry dbSubKey = new DatabaseEntry();
DatabaseEntry dbSubVal = new DatabaseEntry();
- ImportIDSet idSubSet = new ImportIDSet();
- idSubSet.addEntryID(entryID, id2subtree.getIndexEntryLimit(),
- id2subtree.getMaintainCount());
+ ImportIDSet idSubSet = new ImportIDSet(1, id2subtree.getIndexEntryLimit(),
+ id2subtree.getMaintainCount());
+ idSubSet.addEntryID(entryID);
id2subtree.insert(idSubSet, subtreeKeySet, dbSubKey, dbSubVal);
}
- private EntryID getAncestorID(DN2ID dn2id, DN dn)
+ EntryID getAncestorID(DN2ID dn2id, DN dn)
throws DatabaseException
{
int i=0;
@@ -678,7 +1222,7 @@
- private void
+ void
processIndexes(Suffix ctx, Entry entry, EntryID entryID) throws
DatabaseException, DirectoryException, JebException, ConfigException
{
@@ -691,19 +1235,25 @@
AttributeIndex attributeIndex = mapEntry.getValue();
Index index;
if((index=attributeIndex.getEqualityIndex()) != null) {
- indexAttr(ctx, index, entry, entryID);
+ indexAttr(index, entry, entryID,
+ new IndexKey(attrType,IndexType.EQUALITY));
}
if((index=attributeIndex.getPresenceIndex()) != null) {
- indexAttr(ctx, index, entry, entryID);
+ indexAttr(index, entry, entryID,
+ new IndexKey(attrType, IndexType.PRESENCE));
}
if((index=attributeIndex.getSubstringIndex()) != null) {
- indexAttr(ctx, index, entry, entryID);
+ int subLen = ((SubstringIndexer)index.indexer).getSubStringLen();
+ indexAttr(index, entry, entryID,
+ new IndexKey(attrType, IndexType.SUBSTRING, subLen));
}
if((index=attributeIndex.getOrderingIndex()) != null) {
- indexAttr(ctx, index, entry, entryID);
+ indexAttr(index, entry, entryID,
+ new IndexKey(attrType, IndexType.ORDERING));
}
if((index=attributeIndex.getApproximateIndex()) != null) {
- indexAttr(ctx, index, entry, entryID);
+ indexAttr(index, entry, entryID,
+ new IndexKey(attrType,IndexType.APPROXIMATE));
}
for(VLVIndex vlvIdx : ctx.getEntryContainer().getVLVIndexes()) {
vlvIdx.addEntry(txn, entryID, entry);
@@ -716,7 +1266,8 @@
EXTENSIBLE_INDEXER_ID_SUBSTRING);
if(subIndexes != null) {
for(Index subIndex: subIndexes) {
- indexAttr(ctx, subIndex, entry, entryID);
+ indexAttr(subIndex, entry, entryID,
+ new IndexKey(attrType, IndexType.EX_SUBSTRING));
}
}
Collection<Index> sharedIndexes =
@@ -724,7 +1275,8 @@
EXTENSIBLE_INDEXER_ID_SHARED);
if(sharedIndexes !=null) {
for(Index sharedIndex:sharedIndexes) {
- indexAttr(ctx, sharedIndex, entry, entryID);
+ indexAttr(sharedIndex, entry, entryID,
+ new IndexKey(attrType, IndexType.EX_SHARED));
}
}
}
@@ -734,31 +1286,29 @@
- private void indexAttr(Suffix ctx, Index index, Entry entry,
- EntryID entryID)
- throws DatabaseException, ConfigException
+ void indexAttr(Index index, Entry entry, EntryID entryID,
+ IndexKey indexKey) throws DatabaseException,
+ ConfigException
{
insertKeySet.clear();
index.indexer.indexEntry(entry, insertKeySet);
for(byte[] key : insertKeySet)
{
- processKey(ctx, index, key, entryID, indexComparator, null);
+ processKey(index, key, entryID, indexComparator, indexKey, true);
}
}
- private void flushIndexBuffers() throws InterruptedException,
+ void flushIndexBuffers() throws InterruptedException,
ExecutionException
{
- Iterator<Suffix> i = dnSuffixMap.values().iterator();
- Suffix suffix = i.next();
- for(Map<DatabaseContainer, IndexBuffer> map : suffixMap.values())
- {
- for(Map.Entry<DatabaseContainer, IndexBuffer> e : map.entrySet())
+ Set<Map.Entry<IndexKey, IndexBuffer>> set = indexBufferMap.entrySet();
+ for(Map.Entry<IndexKey, IndexBuffer> e : set)
{
- DatabaseContainer container = e.getKey();
+ IndexKey indexKey = e.getKey();
IndexBuffer indexBuffer = e.getValue();
- if(container instanceof DN2ID)
+ IndexType indexType = indexKey.getIndexType();
+ if(indexType.equals(IndexType.DN))
{
indexBuffer.setComparator(dnComparator);
}
@@ -766,46 +1316,46 @@
{
indexBuffer.setComparator(indexComparator);
}
- indexBuffer.setContainer(container);
- indexBuffer.setEntryContainer(suffix.getEntryContainer());
+ indexBuffer.setIndexKey(indexKey);
Future<Void> future = sortService.submit(new SortTask(indexBuffer));
future.get();
}
- }
}
- private void
- processKey(Suffix ctx, DatabaseContainer container, byte[] key,
- EntryID entryID,IndexBuffer.ComparatorBuffer<byte[]> comparator,
- EntryContainer entryContainer) throws ConfigException
+ int
+ processKey(DatabaseContainer container, byte[] key, EntryID entryID,
+ IndexBuffer.ComparatorBuffer<byte[]> comparator, IndexKey indexKey,
+ boolean insert)
+ throws ConfigException
{
IndexBuffer indexBuffer;
- Map<DatabaseContainer, IndexBuffer> conMap = suffixMap.get(ctx);
- if(!conMap.containsKey(container))
+ if(!indexBufferMap.containsKey(indexKey))
{
indexBuffer = getNewIndexBuffer();
- conMap.put(container, indexBuffer);
+ indexBufferMap.put(indexKey, indexBuffer);
}
else
{
- indexBuffer = conMap.get(container);
+ indexBuffer = indexBufferMap.get(indexKey);
}
if(!indexBuffer.isSpaceAvailable(key))
{
- indexBuffer.setContainer(container);
indexBuffer.setComparator(comparator);
- indexBuffer.setEntryContainer(entryContainer);
+ indexBuffer.setIndexKey(indexKey);
sortService.submit(new SortTask(indexBuffer));
indexBuffer = getNewIndexBuffer();
- conMap.remove(container);
- conMap.put(container, indexBuffer);
+ indexBufferMap.remove(indexKey);
+ indexBufferMap.put(indexKey, indexBuffer);
}
- indexBuffer.add(key, entryID);
+ int id = System.identityHashCode(container);
+ idContainerMap.putIfAbsent(id, container);
+ indexBuffer.add(key, entryID, id, insert);
+ return id;
}
- private IndexBuffer getNewIndexBuffer() throws ConfigException
+ IndexBuffer getNewIndexBuffer() throws ConfigException
{
IndexBuffer indexBuffer = freeBufQue.poll();
if(indexBuffer.isPoison())
@@ -818,98 +1368,47 @@
}
- private void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
+ void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
throws ConfigException
{
DatabaseContainer dn2id = suffix.getDN2ID();
byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString());
- processKey(suffix, dn2id, dnBytes, entryID, dnComparator,
- suffix.getEntryContainer());
-
+ int id = processKey(dn2id, dnBytes, entryID, dnComparator,
+ new IndexKey(dnType, IndexType.DN), true);
+ idECMap.putIfAbsent(id, suffix.getEntryContainer());
}
}
+
/**
* The task reads the temporary index files and writes their results to the
* index database.
*/
- private final class IndexWriteDBTask implements Callable<Void> {
-
+ private final class IndexWriteDBTask implements Callable<Void>
+ {
private final IndexManager indexMgr;
- private final boolean isDN2ID;
private final DatabaseEntry dbKey, dbValue;
- private final DN2ID dn2id;
- private final Index index;
-
- private final EntryContainer entryContainer;
- private final int id2ChildLimit;
- private final boolean id2ChildMCount;
-
- private TreeMap<DN,EntryID> parentIDMap = new TreeMap<DN,EntryID>();
- private DN parentDN, lastDN;
- private EntryID parentID, lastID;
- private final Map<byte[], ImportIDSet> id2childTree;
- private final Map<byte[], ImportIDSet> id2subtreeTree;
private final int cacheSize;
private ByteBuffer directBuffer = null;
+ private final Map<Integer, DNState> dnStateMap =
+ new HashMap<Integer, DNState>();
+ private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>();
- public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID,
- ByteBuffer b, int cacheSize)
- {
- this(indexMgr, isDN2ID, cacheSize);
- directBuffer = b;
- }
-
- public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID,
- int cacheSize)
+ public IndexWriteDBTask(IndexManager indexMgr, ByteBuffer b, int cacheSize)
{
this.indexMgr = indexMgr;
- this.entryContainer = indexMgr.entryContainer;
- this.isDN2ID = isDN2ID;
+ directBuffer = b;
this.dbKey = new DatabaseEntry();
this.dbValue = new DatabaseEntry();
this.cacheSize = cacheSize;
- if(isDN2ID)
- {
- this.dn2id = indexMgr.dn2id;
- this.index = null;
- id2ChildLimit = entryContainer.getID2Children().getIndexEntryLimit();
- id2ChildMCount = entryContainer.getID2Subtree().getMaintainCount();
- Comparator<byte[]> id2ChildComparator =
- entryContainer.getID2Children().getComparator();
- Comparator<byte[]> id2SubtreeComparator =
- entryContainer.getID2Subtree().getComparator();
- id2childTree =
- new TreeMap<byte[], ImportIDSet>(id2ChildComparator);
- id2subtreeTree =
- new TreeMap<byte[], ImportIDSet>(id2SubtreeComparator);
- }
- else
- {
- this.dn2id = null;
- this.index = indexMgr.getIndex();
- id2subtreeTree = null;
- id2childTree = null;
- id2ChildLimit = 0;
- id2ChildMCount = false;
- }
}
-
- public Void call() throws Exception
+ private SortedSet<Buffer> initBuffers() throws IOException
{
-
- Comparator<byte[]> comparator = indexMgr.getComparator();
- int limit = indexMgr.getLimit();
- boolean maintainCount = indexMgr.getMaintainCount();
- byte[] cKey = null;
- ImportIDSet cIDSet = null;
- indexMgr.init();
- List<Buffer> bufferList = indexMgr.getBufferList();
- SortedSet<Buffer> bufferSet = new TreeSet<Buffer>();
int p = 0;
int offSet = cacheSize;
- for(Buffer b : bufferList)
+ SortedSet<Buffer> bufferSet = new TreeSet<Buffer>();
+ for(Buffer b : indexMgr.getBufferList())
{
if(directBuffer != null)
{
@@ -926,34 +1425,48 @@
}
bufferSet.add(b);
}
+ return bufferSet;
+ }
+
+ public Void call() throws Exception
+ {
+ byte[] cKey = null;
+ ImportIDSet cInsertIDSet = null, cDeleteIDSet = null;
+ Integer cIndexID = null;
+
+ indexMgr.init();
+ SortedSet<Buffer> bufferSet = initBuffers();
while(!bufferSet.isEmpty())
{
Buffer b;
b = bufferSet.first();
- if(b == null) {
- System.out.println("null b");
- }
bufferSet.remove(b);
- byte[] key = b.getKey();
- ImportIDSet idSet = b.getIDSet();
if(cKey == null)
{
- cKey = key;
- cIDSet = idSet;
+ cIndexID = b.getIndexID();
+ cKey = b.getKey();
+ cInsertIDSet = b.getInsertIDSet();
+ cDeleteIDSet = b.getDeleteIDSet();
+ cInsertIDSet.setKey(cKey);
+ cDeleteIDSet.setKey(cKey);
}
else
{
- if(comparator.compare(key, cKey) != 0)
+ if(b.compare(cKey, cIndexID) != 0)
{
- addToDB(cKey, cIDSet);
+ addToDB(cInsertIDSet, cDeleteIDSet, cIndexID);
indexMgr.incrKeyCount();
- cKey = key;
- cIDSet = idSet;
+ cIndexID = b.getIndexID();
+ cKey = b.getKey();
+ cInsertIDSet = b.getInsertIDSet();
+ cDeleteIDSet = b.getDeleteIDSet();
+ cInsertIDSet.setKey(cKey);
+ cDeleteIDSet.setKey(cKey);
}
else
{
- cIDSet.setKey(cKey);
- cIDSet.merge(idSet, limit, maintainCount);
+ cInsertIDSet.merge(b.getInsertIDSet());
+ cDeleteIDSet.merge(b.getDeleteIDSet());
}
}
if(b.hasMoreData())
@@ -964,7 +1477,7 @@
}
if(cKey != null)
{
- addToDB(cKey, cIDSet);
+ addToDB(cInsertIDSet, cDeleteIDSet, cIndexID);
}
cleanUP();
return null;
@@ -974,197 +1487,264 @@
private void cleanUP() throws DatabaseException, DirectoryException,
IOException
{
- if(!isDN2ID) {
- index.closeCursor();
- Message msg = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(index.getName());
+ if(indexMgr.isDN2ID() && skipDNValidation)
+ {
+ for(DNState dnState : dnStateMap.values())
+ {
+ dnState.flush();
+ }
+ Message msg = NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getDNCount());
logError(msg);
-
}
else
{
- if(dn2idPhase2)
+ for(Index index : indexMap.values())
{
- flushSubTreeChildIndexes();
+ index.closeCursor();
}
+ Message msg = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr.getName());
+ logError(msg);
}
indexMgr.setDone();
indexMgr.close();
indexMgr.deleteIndexFile();
}
-
- private void flushSubTreeChildIndexes()
- throws DatabaseException, DirectoryException
- {
- Index id2child = entryContainer.getID2Children();
- Set<Map.Entry<byte[], ImportIDSet>> id2childSet =
- id2childTree.entrySet();
- for(Map.Entry<byte[], ImportIDSet> e : id2childSet)
- {
- byte[] key = e.getKey();
- ImportIDSet idSet = e.getValue();
- dbKey.setData(key);
- id2child.insert(dbKey, idSet, dbValue);
- }
- id2child.closeCursor();
- Index id2subtree = entryContainer.getID2Subtree();
- Set<Map.Entry<byte[], ImportIDSet>> subtreeSet =
- id2subtreeTree.entrySet();
- for(Map.Entry<byte[], ImportIDSet> e : subtreeSet)
- {
- byte[] key = e.getKey();
- ImportIDSet idSet = e.getValue();
- dbKey.setData(key);
- id2subtree.insert(dbKey, idSet, dbValue);
- }
- id2subtree.closeCursor();
- Message msg =
- NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getTotDNCount());
- logError(msg);
- }
-
-
- private void addToDB(byte[] key, ImportIDSet record)
+ private void addToDB(ImportIDSet insRec, ImportIDSet delRec, int indexID)
throws InterruptedException, DatabaseException, DirectoryException
{
- record.setKey(key);
- if(!this.isDN2ID)
+ if(!indexMgr.isDN2ID())
{
- addIndex(record);
- }
- else
- {
- if(dn2idPhase2)
+ Index index;
+ if((delRec.size() > 0) || (!delRec.isDefined()))
{
- addDN2ID(record);
+ dbKey.setData(delRec.getKey());
+ index = (Index)idContainerMap.get(indexID);
+ index.delete(dbKey, delRec, dbValue);
+ if(!indexMap.containsKey(indexID))
+ {
+ indexMap.put(indexID, index);
+ }
}
+
+
+ if((insRec.size() > 0) || (!insRec.isDefined()))
+ {
+ dbKey.setData(insRec.getKey());
+ index = (Index)idContainerMap.get(indexID);
+ index.insert(dbKey, insRec, dbValue);
+ if(!indexMap.containsKey(indexID))
+ {
+ indexMap.put(indexID, index);
+ }
+ }
+ }
+ else if(skipDNValidation)
+ {
+ addDN2ID(insRec, indexID);
}
}
-
- private void id2Subtree(EntryContainer ec, EntryID childID,
- int limit, boolean mCount) throws DatabaseException
+ private void addDN2ID(ImportIDSet record, Integer indexID)
+ throws DatabaseException, DirectoryException
{
- ImportIDSet idSet;
- if(!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData()))
+ DNState dnState;
+ if(!dnStateMap.containsKey(indexID))
{
- idSet = new ImportIDSet();
- id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet);
+ dnState = new DNState(idECMap.get(indexID));
+ dnStateMap.put(indexID, dnState);
}
else
{
- idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData());
+ dnState = dnStateMap.get(indexID);
}
- idSet.addEntryID(childID, limit, mCount);
- for (DN dn = ec.getParentWithinBase(parentDN); dn != null;
- dn = ec.getParentWithinBase(dn))
+
+ if(!dnState.checkParent(record))
{
- EntryID nodeID = parentIDMap.get(dn);
- if(!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData()))
+ return;
+ }
+ dnState.writeToDB();
+ }
+
+
+ /**
+ * This class is used to by a index DB merge thread performing DN processing
+ * to keep track of the state of individual DN2ID index processing.
+ */
+ class DNState
+ {
+ //DN related stuff per suffix
+ private final DatabaseEntry dbKey1, dbValue1;
+ private final TreeMap<DN, EntryID> parentIDMap =
+ new TreeMap<DN, EntryID>();
+ private DN parentDN, lastDN;
+ private EntryID parentID, lastID, entryID;
+ private final EntryContainer entryContainer;
+ private final Map<byte[], ImportIDSet> id2childTree;
+ private final Map<byte[], ImportIDSet> id2subtreeTree;
+ private final Index childIndex, subIndex;
+ private final DN2ID dn2id;
+
+ DNState(EntryContainer entryContainer)
+ {
+ this.entryContainer = entryContainer;
+ dn2id = entryContainer.getDN2ID();
+ childIndex = entryContainer.getID2Children();
+ subIndex = entryContainer.getID2Subtree();
+ Comparator<byte[]> childComparator = childIndex.getComparator();
+ Comparator<byte[]> subComparator = subIndex.getComparator();
+ id2childTree = new TreeMap<byte[], ImportIDSet>(childComparator);
+ id2subtreeTree = new TreeMap<byte[], ImportIDSet>(subComparator);
+ this.dbKey1 = new DatabaseEntry();
+ this.dbValue1 = new DatabaseEntry();
+ }
+
+
+ private boolean checkParent(ImportIDSet record) throws DirectoryException
+ {
+ dbKey1.setData(record.getKey());
+ byte[] v = record.toDatabase();
+ long v1 = JebFormat.entryIDFromDatabase(v);
+ dbValue1.setData(v);
+ DN dn = DN.decode(ByteString.wrap(dbKey1.getData()));
+
+
+ entryID = new EntryID(v1);
+ if(parentIDMap.isEmpty())
{
- idSet = new ImportIDSet();
- id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet);
+ parentIDMap.put(dn, entryID);
+ return true;
+ }
+ else if(lastDN != null && lastDN.isAncestorOf(dn))
+ {
+ parentIDMap.put(lastDN, lastID);
+ parentDN = lastDN;
+ parentID = lastID;
+ lastDN = dn;
+ lastID = entryID;
+ return true;
+ }
+ else if(parentIDMap.lastKey().isAncestorOf(dn))
+ {
+ parentDN = parentIDMap.lastKey();
+ parentID = parentIDMap.get(parentDN);
+ lastDN = dn;
+ lastID = entryID;
+ return true;
}
else
{
- idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData());
+ DN pDN = entryContainer.getParentWithinBase(dn);
+ if(parentIDMap.containsKey(pDN)) {
+ DN lastKey = parentIDMap.lastKey();
+ Map<DN, EntryID> subMap = parentIDMap.subMap(pDN, lastKey);
+ for(Map.Entry<DN, EntryID> e : subMap.entrySet())
+ {
+ subMap.remove(e.getKey());
+ }
+ parentDN = pDN;
+ parentID = parentIDMap.get(pDN);
+ lastDN = dn;
+ lastID = entryID;
+ }
+ else
+ {
+ Message msg = NOTE_JEB_IMPORT_LDIF_DN_NO_PARENT.get(dn.toString());
+ Entry e = new Entry(dn, null, null, null);
+ reader.rejectEntry(e, msg);
+ return false;
+ }
}
- idSet.addEntryID(childID, limit, mCount);
+ return true;
}
- }
- private void id2child(EntryID childID, int limit, boolean mCount)
+
+ private void id2child(EntryID childID)
{
ImportIDSet idSet;
if(!id2childTree.containsKey(parentID.getDatabaseEntry().getData()))
{
- idSet = new ImportIDSet();
+ idSet = new ImportIDSet(1,childIndex.getIndexEntryLimit(),
+ childIndex.getMaintainCount());
id2childTree.put(parentID.getDatabaseEntry().getData(), idSet);
}
else
{
idSet = id2childTree.get(parentID.getDatabaseEntry().getData());
}
- idSet.addEntryID(childID, limit, mCount);
+ idSet.addEntryID(childID);
}
- private boolean checkParent(DN dn, EntryID id, EntryContainer ec)
- {
- if(parentIDMap.isEmpty())
+
+ private void id2Subtree(EntryID childID) throws DatabaseException
{
- parentIDMap.put(dn, id);
- return true;
- }
- else if(lastDN != null && lastDN.isAncestorOf(dn))
- {
- parentIDMap.put(lastDN, lastID);
- parentDN = lastDN;
- parentID = lastID;
- lastDN = dn;
- lastID = id;
- return true;
- }
- else if(parentIDMap.lastKey().isAncestorOf(dn))
- {
- parentDN = parentIDMap.lastKey();
- parentID = parentIDMap.get(parentDN);
- lastDN = dn;
- lastID = id;
- return true;
- }
- else
- {
- DN pDN = ec.getParentWithinBase(dn);
- if(parentIDMap.containsKey(pDN)) {
- DN lastKey = parentIDMap.lastKey();
- Map<DN, EntryID> subMap = parentIDMap.subMap(pDN, lastKey);
- for(Map.Entry<DN, EntryID> e : subMap.entrySet())
- {
- subMap.remove(e.getKey());
- }
- parentDN = pDN;
- parentID = parentIDMap.get(pDN);
- lastDN = dn;
- lastID = id;
+ ImportIDSet idSet;
+ if(!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData()))
+ {
+ idSet = new ImportIDSet(1, subIndex.getIndexEntryLimit(),
+ subIndex.getMaintainCount());
+ id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet);
}
else
{
- Message msg = NOTE_JEB_IMPORT_LDIF_DN_NO_PARENT.get(dn.toString());
- Entry e = new Entry(dn, null, null, null);
- reader.rejectEntry(e, msg);
- return false;
+ idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData());
+ }
+ idSet.addEntryID(childID);
+ for (DN dn = entryContainer.getParentWithinBase(parentDN); dn != null;
+ dn = entryContainer.getParentWithinBase(dn))
+ {
+ EntryID nodeID = parentIDMap.get(dn);
+ if(!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData()))
+ {
+ idSet = new ImportIDSet(1, subIndex.getIndexEntryLimit(),
+ subIndex.getMaintainCount());
+ id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet);
+ }
+ else
+ {
+ idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData());
+ }
+ idSet.addEntryID(childID);
}
}
- return true;
- }
- private void addDN2ID(ImportIDSet record)
- throws DatabaseException, DirectoryException
- {
- DatabaseEntry idVal = new DatabaseEntry();
- dbKey.setData(record.getKey());
- idVal.setData(record.toDatabase());
- DN dn = DN.decode(ByteString.wrap(dbKey.getData()));
- EntryID entryID = new EntryID(idVal);
- if(!checkParent(dn, entryID, entryContainer))
- {
- return;
- }
- dn2id.putRaw(null, dbKey, idVal);
+
+ public void writeToDB() throws DatabaseException
+ {
+ dn2id.putRaw(null, dbKey1, dbValue1);
indexMgr.addTotDNCount(1);
if(parentDN != null)
{
- id2child(entryID, id2ChildLimit, id2ChildMCount);
- id2Subtree(entryContainer,
- entryID, id2ChildLimit, id2ChildMCount);
+ id2child(entryID);
+ id2Subtree(entryID);
}
- }
+ }
- private void addIndex(ImportIDSet record) throws DatabaseException
- {
- dbKey.setData(record.getKey());
- index.insert(dbKey, record, dbValue);
+ public void flush() throws DatabaseException, DirectoryException
+ {
+ Set<Map.Entry<byte[], ImportIDSet>> id2childSet =
+ id2childTree.entrySet();
+ for(Map.Entry<byte[], ImportIDSet> e : id2childSet)
+ {
+ byte[] key = e.getKey();
+ ImportIDSet idSet = e.getValue();
+ dbKey1.setData(key);
+ childIndex.insert(dbKey1, idSet, dbValue1);
+ }
+ childIndex.closeCursor();
+ //Do subtree.
+ Set<Map.Entry<byte[], ImportIDSet>> subtreeSet =
+ id2subtreeTree.entrySet();
+ for(Map.Entry<byte[], ImportIDSet> e : subtreeSet)
+ {
+ byte[] key = e.getKey();
+ ImportIDSet idSet = e.getValue();
+ dbKey1.setData(key);
+ subIndex.insert(dbKey1, idSet, dbValue1);
+ }
+ subIndex.closeCursor();
+ }
}
}
@@ -1177,7 +1757,9 @@
{
private final IndexManager indexMgr;
private final BlockingQueue<IndexBuffer> que;
- private final ByteArrayOutputStream byteStream =
+ private final ByteArrayOutputStream insetByteStream =
+ new ByteArrayOutputStream(2 * bufferSize);
+ private final ByteArrayOutputStream deleteByteStream =
new ByteArrayOutputStream(2 * bufferSize);
private final DataOutputStream dataStream;
private long bufCount = 0;
@@ -1210,6 +1792,7 @@
{
long beginOffset = offset;
long bufLen;
+ /*
if(!que.isEmpty())
{
que.drainTo(l, DRAIN_TO);
@@ -1221,13 +1804,10 @@
}
freeBufQue.addAll(l);
l.clear();
- if(poisonSeen)
- {
- break;
- }
}
else
{
+ */
if(indexBuffer.isPoison())
{
break;
@@ -1235,11 +1815,15 @@
bufLen = writeIndexBuffer(indexBuffer);
indexBuffer.reset();
freeBufQue.add(indexBuffer);
- }
+ // }
offset += bufLen;
indexMgr.addBuffer(new Buffer(beginOffset, offset, bufCount));
bufCount++;
bufferCount.incrementAndGet();
+ if(poisonSeen)
+ {
+ break;
+ }
}
}
dataStream.close();
@@ -1248,7 +1832,7 @@
catch (IOException e) {
Message msg =
ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(file.getName(),
- e.getMessage());
+ e.getMessage());
logError(msg);
}
}
@@ -1259,40 +1843,44 @@
int numKeys = indexBuffer.getNumberKeys();
indexBuffer.setPos(-1);
long bufLen = 0;
- byteStream.reset();
+ insetByteStream.reset();
+ deleteByteStream.reset();
for(int i = 0; i < numKeys; i++)
{
if(indexBuffer.getPos() == -1)
{
indexBuffer.setPos(i);
- byteStream.write(indexBuffer.getID(i));
+ if(indexBuffer.isInsert(i))
+ {
+ insetByteStream.write(indexBuffer.getIDBytes(i));
+ }
+ else
+ {
+ deleteByteStream.write(indexBuffer.getIDBytes(i));
+ }
continue;
}
-
if(!indexBuffer.compare(i))
{
- int recLen = indexBuffer.getKeySize();
- recLen += byteStream.size();
- recLen += 8;
- bufLen += recLen;
- indexBuffer.writeKey(dataStream);
- dataStream.writeInt(byteStream.size());
- byteStream.writeTo(dataStream);
+ bufLen += indexBuffer.writeRecord(insetByteStream, deleteByteStream,
+ dataStream);
indexBuffer.setPos(i);
- byteStream.reset();
+ insetByteStream.reset();
+ deleteByteStream.reset();
}
- byteStream.write(indexBuffer.getID(i));
+ if(indexBuffer.isInsert(i))
+ {
+ insetByteStream.write(indexBuffer.getIDBytes(i));
+ }
+ else
+ {
+ deleteByteStream.write(indexBuffer.getIDBytes(i));
+ }
}
-
if(indexBuffer.getPos() != -1)
{
- int recLen = indexBuffer.getKeySize();
- recLen += byteStream.size();
- recLen += 8;
- bufLen += recLen;
- indexBuffer.writeKey(dataStream);
- dataStream.writeInt(byteStream.size());
- byteStream.writeTo(dataStream);
+ bufLen += indexBuffer.writeRecord(insetByteStream, deleteByteStream,
+ dataStream);
}
return bufLen;
}
@@ -1303,7 +1891,8 @@
{
long id = 0;
long bufLen = 0;
- byteStream.reset();
+ insetByteStream.reset();
+ deleteByteStream.reset();
for(IndexBuffer b : buffers)
{
if(b.isPoison())
@@ -1318,35 +1907,53 @@
}
}
byte[] saveKey = null;
+ int saveIndexID = 0;
while(!indexSortedSet.isEmpty())
{
IndexBuffer b = indexSortedSet.first();
indexSortedSet.remove(b);
- byte[] key = b.getKeyBytes(b.getPos());
if(saveKey == null)
{
- saveKey = key;
- byteStream.write(b.getID(b.getPos()));
- }
- else
- {
- if(!b.compare(saveKey))
+ saveKey = b.getKeyBytes();
+ saveIndexID = b.getIndexID();
+ if(b.isInsert(b.getPos()))
{
- int recLen = saveKey.length;
- recLen += byteStream.size();
- recLen += 8;
- bufLen += recLen;
- dataStream.writeInt(saveKey.length);
- dataStream.write(saveKey);
- dataStream.writeInt(byteStream.size());
- byteStream.writeTo(dataStream);
- byteStream.reset();
- saveKey = key;
- byteStream.write(b.getID(b.getPos()));
+ insetByteStream.write(b.getIDBytes(b.getPos()));
}
else
{
- byteStream.write(b.getID(b.getPos()));
+ deleteByteStream.write(b.getIDBytes(b.getPos()));
+ }
+ }
+ else
+ {
+ if(!b.compare(saveKey, saveIndexID))
+ {
+ bufLen += IndexBuffer.writeRecord(saveKey, saveIndexID,
+ insetByteStream, deleteByteStream, dataStream);
+ insetByteStream.reset();
+ deleteByteStream.reset();
+ saveKey = b.getKeyBytes();
+ saveIndexID = b.getIndexID();
+ if(b.isInsert(b.getPos()))
+ {
+ insetByteStream.write(b.getIDBytes(b.getPos()));
+ }
+ else
+ {
+ deleteByteStream.write(b.getIDBytes(b.getPos()));
+ }
+ }
+ else
+ {
+ if(b.isInsert(b.getPos()))
+ {
+ insetByteStream.write(b.getIDBytes(b.getPos()));
+ }
+ else
+ {
+ deleteByteStream.write(b.getIDBytes(b.getPos()));
+ }
}
}
if(b.hasMoreData())
@@ -1357,14 +1964,8 @@
}
if(saveKey != null)
{
- int recLen = saveKey.length;
- recLen += byteStream.size();
- recLen += 8;
- bufLen += recLen;
- dataStream.writeInt(saveKey.length);
- dataStream.write(saveKey);
- dataStream.writeInt(byteStream.size());
- byteStream.writeTo(dataStream);
+ bufLen += IndexBuffer.writeRecord(saveKey, saveIndexID,
+ insetByteStream, deleteByteStream, dataStream);
}
return bufLen;
}
@@ -1396,51 +1997,52 @@
{
return null;
}
+ /*
+ if(!indexBuffer.getIndexKey().getName().equals("mail.SUBSTRING"))
+ {
+ freeBufQue.add(indexBuffer);
+ return null;
+ }
+ */
indexBuffer.sort();
- if(containerQueMap.containsKey(indexBuffer.getContainer())) {
+ if(indexKeyQueMap.containsKey(indexBuffer.getIndexKey())) {
BlockingQueue<IndexBuffer> q =
- containerQueMap.get(indexBuffer.getContainer());
+ indexKeyQueMap.get(indexBuffer.getIndexKey());
q.add(indexBuffer);
}
else
{
- DatabaseContainer container = indexBuffer.getContainer();
- EntryContainer entryContainer = indexBuffer.getEntryContainer();
- createIndexWriterTask(container, entryContainer);
- BlockingQueue<IndexBuffer> q = containerQueMap.get(container);
+ createIndexWriterTask(indexBuffer.getIndexKey());
+ BlockingQueue<IndexBuffer> q =
+ indexKeyQueMap.get(indexBuffer.getIndexKey());
q.add(indexBuffer);
}
return null;
}
- private void createIndexWriterTask(DatabaseContainer container,
- EntryContainer entryContainer)
- throws FileNotFoundException
+ private void createIndexWriterTask(IndexKey indexKey)
+ throws FileNotFoundException
{
- synchronized(container) {
- if(containerQueMap.containsKey(container))
+ boolean dn2id = false;
+ synchronized(synObj)
+ {
+ if(indexKeyQueMap.containsKey(indexKey))
{
return;
}
- IndexManager indexMgr;
- if(container instanceof Index)
+ if(indexKey.getIndexType().equals(IndexType.DN))
{
- Index index = (Index) container;
- indexMgr = new IndexManager(index);
+ dn2id = true;
}
- else
- {
- DN2ID dn2id = (DN2ID) container;
- indexMgr = new IndexManager(dn2id, entryContainer);
- }
- containerIndexMgrMap.put(container, indexMgr);
+ IndexManager indexMgr = new IndexManager(indexKey.getName(), dn2id);
+ indexMgrList.add(indexMgr);
BlockingQueue<IndexBuffer> newQue =
- new ArrayBlockingQueue<IndexBuffer>(threadCount + 5);
+ new ArrayBlockingQueue<IndexBuffer>(indexBufferCount);
IndexFileWriterTask indexWriter =
new IndexFileWriterTask(newQue, indexMgr);
indexWriterList.add(indexWriter);
indexWriterFutures.add(indexProcessService.submit(indexWriter));
- containerQueMap.put(container, newQue);
+ indexKeyQueMap.put(indexKey, newQue);
}
}
}
@@ -1455,9 +2057,12 @@
private final long begin, end, id;
private long offset;
private ByteBuffer cache;
- private int keyLen, idLen;
+ private int keyLen, idLen, limit;
private byte[] key;
- private ImportIDSet idSet;
+ private ImportIDSet insertIDSet, deleteIDSet;
+ private Integer indexID = null;
+ private boolean doCount;
+ private Comparator<byte[]> comparator;
public Buffer(long begin, long end, long id)
@@ -1483,7 +2088,6 @@
}
loadCache();
cache.flip();
- getNextRecord();
}
@@ -1530,9 +2134,14 @@
return key;
}
- public ImportIDSet getIDSet()
+ public ImportIDSet getInsertIDSet()
{
- return idSet;
+ return insertIDSet;
+ }
+
+ public ImportIDSet getDeleteIDSet()
+ {
+ return deleteIDSet;
}
public long getBufID()
@@ -1540,10 +2149,44 @@
return id;
}
+ public Integer getIndexID()
+ {
+ if(indexID == null)
+ {
+ try {
+ getNextRecord();
+ } catch(IOException ex) {
+ System.out.println("MPD need some error message");
+ }
+ }
+ return indexID;
+ }
+
public void getNextRecord() throws IOException
{
+ getNextIndexID();
+ getContainerParams();
getNextKey();
- getNextIDSet();
+ getNextIDSet(true); //get insert ids
+ getNextIDSet(false); //get delete ids
+ }
+
+ private void getContainerParams()
+ {
+ limit = 1;
+ doCount = false;
+ if(!indexMgr.isDN2ID())
+ {
+ Index index = (Index) idContainerMap.get(indexID);
+ limit = index.getIndexEntryLimit();
+ doCount = index.getMaintainCount();
+ comparator = index.getComparator();
+ }
+ else
+ {
+ DN2ID dn2id = (DN2ID) idContainerMap.get(indexID);
+ comparator = dn2id.getComparator();
+ }
}
private int getInt() throws IOException
@@ -1564,23 +2207,43 @@
cache.get(b);
}
+ private void getNextIndexID() throws IOException, BufferUnderflowException
+ {
+ indexID = new Integer(getInt());
+ }
+
private void getNextKey() throws IOException, BufferUnderflowException
{
keyLen = getInt();
key = new byte[keyLen];
- getBytes(key);
+ getBytes(key);
}
-
- private void getNextIDSet() throws IOException, BufferUnderflowException
+ private void getNextIDSet(boolean insert)
+ throws IOException, BufferUnderflowException
{
idLen = getInt();
int idCount = idLen/8;
- idSet = new ImportIDSet(idCount);
+
+ if(insert)
+ {
+ insertIDSet = new ImportIDSet(idCount, limit, doCount);
+ }
+ else
+ {
+ deleteIDSet = new ImportIDSet(idCount, limit, doCount);
+ }
for(int i = 0; i < idCount; i++)
{
long l = getLong();
- idSet.addEntryID(l, indexMgr.getLimit(), indexMgr.getMaintainCount());
+ if(insert)
+ {
+ insertIDSet.addEntryID(l);
+ }
+ else
+ {
+ deleteIDSet.addEntryID(l);
+ }
}
}
@@ -1601,39 +2264,69 @@
}
}
- public int compareTo(Buffer o) {
- if(key == null) {
- if(id == o.getBufID())
- {
- return 0;
- }
- else
- {
- return id > o.getBufID() ? 1 : -1;
- }
+
+ private int compare(byte[] cKey, Integer cIndexID)
+ {
+
+ int rc;
+ if(key == null)
+ {
+ getIndexID();
}
+ if(comparator.compare(key, cKey) != 0) {
+ rc = 1;
+ }
+ else
+ {
+ rc = (indexID.intValue() == cIndexID.intValue()) ? 0 : 1;
+ }
+ return rc;
+ }
+
+
+
+ public int compareTo(Buffer o) {
+ //used in remove.
if(this.equals(o))
{
return 0;
}
- int rc = indexMgr.getComparator().compare(key, o.getKey());
+ if(key == null) {
+ getIndexID();
+ }
+ if(o.getKey() == null)
+ {
+ o.getIndexID();
+ }
+ int rc = comparator.compare(key, o.getKey());
if(rc == 0)
{
- if(idSet.isDefined())
+ if(indexID.intValue() == o.getIndexID().intValue())
{
- return -1;
+ if(insertIDSet.isDefined())
+ {
+ rc = -1;
+ }
+ else if(o.getInsertIDSet().isDefined())
+ {
+ rc = 1;
+ }
+ else if(insertIDSet.size() == o.getInsertIDSet().size())
+ {
+ rc = id > o.getBufID() ? 1 : -1;
+ }
+ else
+ {
+ rc = insertIDSet.size() - o.getInsertIDSet().size();
+ }
}
- else if(o.getIDSet().isDefined())
+ else if(indexID.intValue() > o.getIndexID().intValue())
{
- return 1;
- }
- else if(idSet.size() == o.getIDSet().size())
- {
- rc = id > o.getBufID() ? 1 : -1;
+ rc = 1;
}
else
{
- rc = idSet.size() - o.getIDSet().size();
+ rc = -1;
}
}
return rc;
@@ -1646,46 +2339,21 @@
*/
private final class IndexManager
{
- private final Index index;
- private final DN2ID dn2id;
- private final EntryContainer entryContainer;
private final File file;
-
-
private RandomAccessFile raf = null;
private final List<Buffer> bufferList = new LinkedList<Buffer>();
- private final int limit;
private long fileLength, bytesRead = 0;
- private final boolean maintainCount;
- private final Comparator<byte[]> comparator;
private boolean done = false;
private long totalDNS;
private AtomicInteger keyCount = new AtomicInteger(0);
private final String name;
+ private final boolean dn2id;
- public IndexManager(Index index)
+ public IndexManager(String name, boolean dn2id)
{
- this.index = index;
- dn2id = null;
- file = new File(tempDir, index.getName());
- name = index.getName();
- limit = index.getIndexEntryLimit();
- maintainCount = index.getMaintainCount();
- comparator = index.getComparator();
- entryContainer = null;
- }
-
-
- public IndexManager(DN2ID dn2id, EntryContainer entryContainer)
- {
- index = null;
+ file = new File(tempDir, name);
+ this.name = name;
this.dn2id = dn2id;
- file = new File(tempDir, dn2id.getName());
- limit = 1;
- maintainCount = false;
- comparator = dn2id.getComparator();
- this.entryContainer = entryContainer;
- name = dn2id.getName();
}
public void init() throws FileNotFoundException
@@ -1723,26 +2391,6 @@
raf.close();
}
- public int getLimit()
- {
- return limit;
- }
-
- public boolean getMaintainCount()
- {
- return maintainCount;
- }
-
- public Comparator<byte[]> getComparator()
- {
- return comparator;
- }
-
- public Index getIndex()
- {
- return index;
- }
-
public void setFileLength()
{
this.fileLength = file.length();
@@ -1764,11 +2412,15 @@
}
- public long getTotDNCount()
+ public long getDNCount()
{
return totalDNS;
}
+ public boolean isDN2ID()
+ {
+ return dn2id;
+ }
public void printStats(long deltaTime)
{
@@ -1785,6 +2437,11 @@
{
keyCount.incrementAndGet();
}
+
+ public String getName()
+ {
+ return name;
+ }
}
/**
@@ -1982,19 +2639,17 @@
// Suspend output.
private boolean pause = false;
- private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap;
+ private final List<IndexManager> indexMgrList;
/**
* Create a new import progress task.
- * @param containerIndexMgrMap Map of database container objects to
- * index manager objects.
+ * @param indexMgrList List of index managers.
*/
- public SecondPhaseProgressTask(Map<DatabaseContainer,
- IndexManager> containerIndexMgrMap)
+ public SecondPhaseProgressTask (List<IndexManager> indexMgrList)
{
previousTime = System.currentTimeMillis();
- this.containerIndexMgrMap = containerIndexMgrMap;
+ this.indexMgrList = indexMgrList;
try
{
prevEnvStats =
@@ -2087,12 +2742,237 @@
previousCount = latestCount;
previousTime = latestTime;
- for(Map.Entry<DatabaseContainer, IndexManager> e :
- containerIndexMgrMap.entrySet())
+ for(IndexManager indexMgr : indexMgrList)
{
- IndexManager indexMgr = e.getValue();
indexMgr.printStats(deltaTime);
}
}
}
+
+
+ /**
+ * A class to hold information about the entry determined by the LDIF reader.
+ *
+ */
+ public class EntryInformation
+ {
+ private EntryID entryID;
+ private Suffix suffix;
+
+
+ /**
+ * Return the suffix associated with the entry.
+ *
+ * @return Entry's suffix instance;
+ */
+ public Suffix getSuffix()
+ {
+ return suffix;
+ }
+
+ /**
+ * Set the suffix instance associated with the entry.
+ *
+ * @param suffix The suffix associated with the entry.
+ */
+ public void setSuffix(Suffix suffix)
+ {
+ this.suffix = suffix;
+ }
+
+ /**
+ * Set the entry's ID.
+ *
+ * @param entryID The entry ID to set the entry ID to.
+ */
+ public void setEntryID(EntryID entryID)
+ {
+ this.entryID = entryID;
+ }
+
+ /**
+ * Return the entry ID associated with the entry.
+ *
+ * @return The entry ID associated with the entry.
+ */
+ public EntryID getEntryID()
+ {
+ return entryID;
+ }
+ }
+
+ /**
+ * This class defines the individual index type available.
+ *
+ */
+ public enum IndexType {
+ /**
+ * The DN index type.
+ **/
+ DN,
+
+ /**
+ * The equality index type.
+ **/
+ EQUALITY,
+
+ /**
+ * The presence index type.
+ **/
+ PRESENCE,
+
+ /**
+ * The substring index type.
+ **/
+ SUBSTRING,
+
+ /**
+ * The ordering index type.
+ **/
+ ORDERING,
+
+ /**
+ * The approximate index type.
+ **/
+ APPROXIMATE,
+
+ /**
+ * The extensible substring index type.
+ **/
+ EX_SUBSTRING,
+
+ /**
+ * The extensible shared index type.
+ **/
+ EX_SHARED;
+ }
+
+
+ /**
+ * This class is used as and index key for several hash maps that need to
+ * process multiple suffix index elements into a single que or map based on
+ * both attribute type and index type (ie., cn.equality, sn.equality,...).
+ *
+ * It tries to perform some optimization if the index is a substring index.
+ */
+ public class IndexKey {
+
+ private final AttributeType type;
+ private final IndexType indexType;
+ private byte[] keyBytes = null;
+
+ /**
+ * Create index key instance using the specified attribute type, index type
+ * and substring length. Used only for substring indexes.
+ *
+ * @param type The attribute type.
+ * @param indexType The index type.
+ * @param subLen The substring length.
+ */
+ IndexKey(AttributeType type, IndexType indexType, int subLen)
+ {
+ this(type, indexType);
+ keyBytes = new byte[subLen];
+ }
+
+ /**
+ * Create index key instance using the specified attribute type, index type.
+ *
+ * @param type The attribute type.
+ * @param indexType The index type.
+ */
+ IndexKey(AttributeType type, IndexType indexType)
+ {
+ this.type = type;
+ this.indexType = indexType;
+ }
+
+ /**
+ * An equals method that uses both the attribute type and the index type.
+ *
+ * @param obj the object to compare.
+ * @return <CODE>true</CODE> if the objects are equal.
+ */
+ public boolean equals(Object obj)
+ {
+ IndexKey oKey = (IndexKey) obj;
+ boolean rc = false;
+ if(type.equals(oKey.getType()) && indexType.equals(oKey.getIndexType()))
+ {
+ rc = true;
+ }
+ return rc;
+ }
+
+ /**
+ * An hashcode method that adds the hashcodes of the attribute type and
+ * index type and returns that value.
+ *
+ * @return The combined hash values.
+ */
+ public int hashCode()
+ {
+ return type.hashCode() + indexType.hashCode();
+ }
+
+ /**
+ * Return the attribute type.
+ *
+ * @return The attribute type.
+ */
+ public AttributeType getType()
+ {
+ return type;
+ }
+
+ /**
+ * Return the index type.
+ * @return The index type.
+ */
+ public IndexType getIndexType()
+ {
+ return indexType;
+ }
+
+ /**
+ * Return the index key name, which is the attribute type primary name,
+ * a period, and the index type name. Used for building file names and
+ * output.
+ *
+ * @return The index key name.
+ */
+ public String getName()
+ {
+ return type.getPrimaryName() + "." +
+ StaticUtils.toLowerCase(indexType.name());
+ }
+
+ /**
+ * Returns a preallocated byte array having substring len size if the
+ * index key is a substring index and the desired size is equal to substring
+ * len size. This is a performance hack for substring indexes only.
+ *
+ * @param size The size of byte array desired.
+ * @return Either a preallocated byte array, or a freshly created one using
+ * the size parameter.
+ */
+ public byte[] getKeyBytes(int size)
+ {
+ if(keyBytes != null)
+ {
+ if(size == keyBytes.length)
+ {
+ return this.keyBytes;
+ }
+ else
+ {
+ return new byte[size];
+ }
+ }
+ else
+ {
+ return new byte[size];
+ }
+ }
+ }
}
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java
index 991fc84..14c7ea0 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/IndexBuffer.java
@@ -30,6 +30,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.ByteArrayOutputStream;
import org.opends.server.backends.jeb.*;
@@ -40,13 +41,25 @@
*/
public class IndexBuffer implements Comparable<IndexBuffer> {
- /**
- * Enumeration used when sorting a buffer.
- */
+ /**
+ * Enumeration used when sorting a buffer.
+ */
private enum CompareOp {
LT, GT, LE, GE, EQ
}
+ private static final int REC_OVERHEAD = 20;
+
+ /**
+ * Insert constant -- used when the key should be inserted in a DB.
+ */
+ public static final int INSERT = 0x0000;
+
+ /**
+ * Delete constant -- used when the key should be deleted from a DB.
+ */
+ public static final int DELETE = 0x0001;
+
//The size of a buffer.
private final int size;
@@ -61,9 +74,11 @@
private final byte[] intBytes = new byte[4];
private final byte[] idBytes = new byte[8];
- //keyPtr - offSet where next key is written
- //recPtr - offSet where next value record is written
- //bytesLeft - amount of bytes left in the buffer
+ /*
+ keyPtr - offSet where next key is written
+ recPtr - offSet where next value record is written
+ bytesLeft - amount of bytes left in the buffer
+ */
private int keyPtr=0, recPtr=0, bytesLeft = 0;
//keys - number of keys in the buffer
@@ -74,6 +89,7 @@
private ComparatorBuffer<byte[]> comparator;
private DatabaseContainer container;
private EntryContainer entryContainer;
+ private Importer.IndexKey indexKey;
private IndexBuffer(int size) {
@@ -106,38 +122,7 @@
container = null;
entryContainer = null;
comparator = null;
- }
-
- /**
- * Compare current IndexBuffer to the one in the specified argument. The key
- * at the value of pos in both buffers are used in the comparision.
- *
- * @param b The IndexBuffer to compare to.
- * @return 0 if the buffers are equal, -1 if the current buffer is less
- * than the specified buffer, or 1 if it is greater.
- */
- public int compareTo(IndexBuffer b) {
- byte[] key2 = b.getKeyBytes(b.getPos());
- int xKeyOffset = pos * 4;
- int xOffset = getValue(xKeyOffset);
- int xLen = getValue(xOffset);
- xOffset += 4;
- int rc = comparator.compare(buffer, xOffset, xLen, key2);
- if(rc == 0)
- {
- if(this.id == b.getBufID())
- {
- rc = 0;
- }
- else if(this.id < b.getBufID()) {
- rc = -1;
- }
- else
- {
- rc = 1;
- }
- }
- return rc;
+ indexKey = null;
}
/**
@@ -200,8 +185,7 @@
* buffer.
*/
public boolean isSpaceAvailable(byte[] keyBytes) {
- int recLen = 4 + keyBytes.length + 8;
- return (recLen + 4) < bytesLeft;
+ return (keyBytes.length + REC_OVERHEAD + 4) < bytesLeft;
}
/**
@@ -267,16 +251,29 @@
*
* @param keyBytes The key byte array.
* @param IDEntry The EntryID.
+ * @param indexID The index ID the record belongs.
+ * @param insert <CODE>True</CODE> if key is an insert, false otherwise.
*/
- public void add(byte[] keyBytes, EntryID IDEntry) {
+ public void add(byte[] keyBytes, EntryID IDEntry, int indexID,
+ boolean insert) {
byte[] idBytes = JebFormat.entryIDToDatabase(IDEntry.longValue());
- int recLen = 4 + keyBytes.length + 8;
- recPtr -= recLen;
- System.arraycopy(getBytes(recPtr), 0, buffer, keyPtr, 4);
+ recPtr -= keyBytes.length + REC_OVERHEAD;
+ System.arraycopy(getIntBytes(recPtr), 0, buffer, keyPtr, 4);
keyPtr += 4;
- System.arraycopy(getBytes(keyBytes.length), 0, buffer, recPtr, 4);
- System.arraycopy(keyBytes, 0, buffer, (recPtr+4), keyBytes.length);
- System.arraycopy(idBytes, 0, buffer, (recPtr + 4 + keyBytes.length), 8);
+ System.arraycopy(getIntBytes(indexID), 0, buffer, recPtr, 4);
+ System.arraycopy(getIntBytes(keyBytes.length), 0, buffer, (recPtr + 4), 4);
+ System.arraycopy(keyBytes, 0, buffer, (recPtr + 8), keyBytes.length);
+ if(insert)
+ {
+ System.arraycopy(getIntBytes(INSERT), 0, buffer,
+ (recPtr + 8 + keyBytes.length), 4);
+ }
+ else
+ {
+ System.arraycopy(getIntBytes(DELETE), 0, buffer,
+ (recPtr + 8 + keyBytes.length), 4);
+ }
+ System.arraycopy(idBytes, 0, buffer, (recPtr + 12 + keyBytes.length), 8);
bytesLeft = recPtr - keyPtr;
keys++;
}
@@ -289,24 +286,225 @@
* @param index The index value to retrieve.
* @return The byte array at the index value.
*/
- public byte[] getID(int index)
+ public byte[] getIDBytes(int index)
{
- int offset = index * 4;
- int recOffset = getValue(offset);
- int dnLen = getValue(recOffset);
- System.arraycopy(buffer, recOffset + 4 + dnLen, idBytes, 0, 8);
+ int recOffset = getIntValue(index * 4);
+ int keyLen = getIntValue(recOffset + 4);
+ System.arraycopy(buffer, recOffset + 12 + keyLen, idBytes, 0, 8);
return idBytes;
}
+
/**
- * Compare the byte array at the current pos with the specified one.
+ * Return if the record specified by the index is an insert or not.
+ * @param index The index of the record.
+ *
+ * @return <CODE>True</CODE> if the record is an insert, false otherwise.
+ */
+ public boolean isInsert(int index)
+ {
+ boolean ret = true;
+ int recOffset = getIntValue(index * 4);
+ int keyLen = getIntValue(recOffset + 4);
+ if(getIntValue(recOffset + 8 + keyLen) == DELETE)
+ {
+ ret = false;
+ }
+
+ return ret;
+ }
+
+
+ /**
+ * Return the size of the key part of the record.
+ *
+ * @return The size of the key part of the record.
+ */
+ public int getKeySize()
+ {
+ int recOffset = getIntValue(pos * 4);
+ return getIntValue(recOffset + 4);
+ }
+
+
+ private int getIndexID(int x)
+ {
+ return getIntValue(getIntValue(x * 4));
+ }
+
+
+ /**
+ * Return index id associated with the current position's record.
+ *
+ * @return The index id.
+ */
+ public int getIndexID()
+ {
+ return getIntValue(getIntValue(pos * 4));
+ }
+
+ /**
+ * Write a record to the specified data output stream using the specified
+ * parameters.
+ *
+ * @param key The key byte array.
+ * @param indexID The index ID.
+ * @param insertByteStream The byte stream containing insert ids.
+ * @param deleteByteStream The byte stream containing delete ids.
+ * @param dataStream The data output stream to write to.
+ * @return The record size written.
+ * @throws IOException If an I/O error occurs writing the record.
+ */
+ public static int writeRecord(byte[] key, int indexID,
+ ByteArrayOutputStream insertByteStream,
+ ByteArrayOutputStream deleteByteStream,
+ DataOutputStream dataStream) throws IOException
+ {
+ dataStream.writeInt(indexID);
+ dataStream.writeInt(key.length);
+ dataStream.write(key);
+ dataStream.writeInt(insertByteStream.size());
+ if(insertByteStream.size() > 0)
+ {
+ insertByteStream.writeTo(dataStream);
+ }
+ dataStream.writeInt(deleteByteStream.size());
+ if(deleteByteStream.size() > 0)
+ {
+ deleteByteStream.writeTo(dataStream);
+ }
+ return (key.length + insertByteStream.size() +
+ deleteByteStream.size() + (REC_OVERHEAD - 4));
+ }
+
+ /**
+ * Write a record to specified output stream using the record pointed to by
+ * the current position and the specified byte stream of ids.
+ *
+ * @param insertByteStream The byte stream containing the ids.
+ * @param deleteByteStream The byte stream containing the ids.
+ * @param dataStream The data output stream to write to.
+ * @return The record size written.
+ *
+ * @throws IOException If an I/O error occurs writing the record.
+ */
+ public int writeRecord(ByteArrayOutputStream insertByteStream,
+ ByteArrayOutputStream deleteByteStream,
+ DataOutputStream dataStream) throws IOException
+ {
+ int recOffset = getIntValue(pos * 4);
+ int indexID = getIntValue(recOffset);
+ int keyLen = getIntValue(recOffset + 4);
+ dataStream.writeInt(indexID);
+ dataStream.writeInt(keyLen);
+ dataStream.write(buffer, recOffset + 8, keyLen);
+ dataStream.writeInt(insertByteStream.size());
+ if(insertByteStream.size() > 0)
+ {
+ insertByteStream.writeTo(dataStream);
+ }
+ dataStream.writeInt(deleteByteStream.size());
+ if(deleteByteStream.size() > 0)
+ {
+ deleteByteStream.writeTo(dataStream);
+ }
+ return (getKeySize() + insertByteStream.size() +
+ deleteByteStream.size() + (REC_OVERHEAD - 4));
+ }
+
+ /**
+ * Return the key value part of a record specifed by the index.
+ *
+ * @return byte array containing the key value.
+ */
+ public byte[] getKeyBytes()
+ {
+ int recOffset = getIntValue(pos * 4);
+ int keyLen = getIntValue(recOffset + 4);
+ byte[] keyBytes = new byte[keyLen];
+ System.arraycopy(buffer, recOffset + 8, keyBytes, 0, keyLen);
+ return keyBytes;
+ }
+
+ /**
+ * Return the key value part of a record specifed by the index as a string.
+ *
+ * @return String representing the key value.
+ */
+ public String getKey()
+ {
+ int recOffset = getIntValue(pos * 4);
+ int keyLen = getIntValue(recOffset + 4);
+ byte[] keyBytes = new byte[keyLen];
+ System.arraycopy(buffer, recOffset + 8, keyBytes, 0, keyLen);
+ return new String(keyBytes);
+ }
+
+
+ /**
+ * Return the key value part of a record specifed by the index.
+ *
+ * @param x index to return.
+ * @return byte array containing the key value.
+ */
+ private byte[] getKeyBytes(int x)
+ {
+ int recOffset = getIntValue(x * 4);
+ int keyLen = getIntValue(recOffset + 4);
+ byte[] keyBytes = indexKey.getKeyBytes(keyLen);
+ System.arraycopy(buffer, recOffset + 8, keyBytes, 0, keyLen);
+ return keyBytes;
+ }
+
+ private boolean is(int x, int y, CompareOp op)
+ {
+ int xRecOffset = getIntValue(x * 4);
+ int xIndexID = getIntValue(xRecOffset);
+ int xKeyLen = getIntValue(xRecOffset + 4);
+ int xKey = xRecOffset + 8;
+ int yRecOffset = getIntValue(y * 4);
+ int yIndexID = getIntValue(yRecOffset);
+ int yKeyLen = getIntValue(yRecOffset + 4);
+ int yKey = yRecOffset + 8;
+ return eval(comparator.compare(buffer, xKey, xKeyLen, xIndexID,
+ yKey, yKeyLen, yIndexID), op);
+ }
+
+
+ private boolean is(int x, byte[] m, CompareOp op, int mIndexID)
+ {
+ int xRecOffset = getIntValue(x * 4);
+ int xIndexID = getIntValue(xRecOffset);
+ int xKeyLen = getIntValue(xRecOffset + 4);
+ int xKey = xRecOffset + 8;
+ return eval(comparator.compare(buffer, xKey, xKeyLen, xIndexID, m,
+ mIndexID), op);
+ }
+
+
+ /**
+ * Compare the byte array at the current pos with the specified one and
+ * using the specified index id.
*
* @param b The byte array to compare.
+ * @param bIndexID The index key.
* @return <CODE>True</CODE> if the byte arrays are equal.
*/
- public boolean compare(byte[] b)
+ public boolean compare(byte[] b, int bIndexID)
{
- return is(pos, b, CompareOp.EQ);
+ boolean ret = false;
+ int xRecOffset = getIntValue(pos * 4);
+ int xIndexID = getIntValue(xRecOffset);
+ int xKeyLen = getIntValue(xRecOffset + 4);
+ int rc = comparator.compare(buffer, xRecOffset + 8, xKeyLen, b);
+ if(rc == 0)
+ {
+ if(xIndexID == bIndexID)
+ {
+ ret = true;
+ }
+ }
+ return ret;
}
/**
@@ -322,6 +520,53 @@
}
/**
+ * Compare current IndexBuffer to the one in the specified argument. The key
+ * at the value of pos in both buffers are used in the comparision.
+ *
+ * @param b The IndexBuffer to compare to.
+ * @return 0 if the buffers are equal, -1 if the current buffer is less
+ * than the specified buffer, or 1 if it is greater.
+ */
+ public int compareTo(IndexBuffer b) {
+ byte[] key2 = b.getKeyBytes(b.getPos());
+ int xRecOffset = getIntValue(pos * 4);
+ int xIndexID = getIntValue(xRecOffset);
+ int xLen = getIntValue(xRecOffset + 4);
+ int rc = comparator.compare(buffer, xRecOffset + 8, xLen, key2);
+ if(rc == 0)
+ {
+ int bIndexID = b.getIndexID();
+ if(xIndexID == bIndexID)
+ {
+ long bBufID = b.getBufID();
+ //Used in Remove.
+ if(this.id == bBufID)
+ {
+ rc = 0;
+ }
+ else if(this.id < bBufID)
+ {
+ rc = -1;
+ }
+ else
+ {
+ rc = 1;
+ }
+ }
+ else if(xIndexID < bIndexID)
+ {
+ rc = -1;
+ }
+ else
+ {
+ rc = 1;
+ }
+ }
+ return rc;
+ }
+
+
+ /**
* Return the number of keys in an index buffer.
*
* @return The number of keys currently in an index buffer.
@@ -331,49 +576,6 @@
return keys;
}
- /**
- * Write a key to an output stream.
- *
- * @param out The stream to write the key to.
- *
- * @throws IOException If there was an error writing the key.
- */
- public void writeKey(DataOutputStream out) throws IOException
- {
- int offSet = pos * 4;
- int recOffset = getValue(offSet);
- int len = getValue(recOffset);
- out.writeInt(len);
- out.write(buffer, recOffset + 4, len);
- }
-
- /**
- * Return the size of the key part of the record.
- *
- * @return The size of the key part of the record.
- */
- public int getKeySize()
- {
- int offSet = pos * 4;
- int recOffset = getValue(offSet);
- return getValue(recOffset);
- }
-
- /**
- * Return the key value part of a record specifed by the index.
- *
- * @param index The index to return the key value of.
- * @return byte array containing the key value.
- */
- public byte[] getKeyBytes(int index)
- {
- int offSet = index * 4;
- int recOffset = getValue(offSet);
- int dnLen = getValue(recOffset);
- byte[] b = new byte[dnLen];
- System.arraycopy(buffer, recOffset + 4, b, 0, dnLen);
- return b;
- }
/**
* Return if the buffer has more data. Used when iterating over the
@@ -395,7 +597,7 @@
pos++;
}
- private byte[] getBytes(int val)
+ private byte[] getIntBytes(int val)
{
for (int i = 3; i >= 0; i--) {
intBytes[i] = (byte) (val & 0xff);
@@ -404,7 +606,7 @@
return intBytes;
}
- private int getValue(int pos)
+ private int getIntValue(int pos)
{
int answer = 0;
for (int i = 0; i < 4; i++) {
@@ -416,29 +618,6 @@
}
- private boolean is(int x, int y, CompareOp op)
- {
- int xKeyOffset = x * 4;
- int xOffset = getValue(xKeyOffset);
- int xLen = getValue(xOffset);
- xOffset += 4;
- int yKeyOffset = y * 4;
- int yOffset = getValue(yKeyOffset);
- int yLen = getValue(yOffset);
- yOffset += 4;
- return eval(comparator.compare(buffer, xOffset, xLen, yOffset, yLen), op);
- }
-
-
- private boolean is(int x, byte[] m, CompareOp op)
- {
- int xKeyOffset = x * 4;
- int xOffset = getValue(xKeyOffset);
- int xLen = getValue(xOffset);
- xOffset += 4;
- return eval(comparator.compare(buffer, xOffset, xLen, m), op);
- }
-
private int med3(int a, int b, int c)
{
@@ -470,19 +649,21 @@
m = med3(l, m, n);
}
- byte[] mKey = this.getKeyBytes(m);
+ byte[] mKey = getKeyBytes(m);
+ int mIndexID = getIndexID(m);
+
int a = off, b = a, c = off + len - 1, d = c;
while(true)
{
- while (b <= c && is(b, mKey, CompareOp.LE))
+ while (b <= c && is(b, mKey, CompareOp.LE, mIndexID))
{
- if (is(b, mKey, CompareOp.EQ))
+ if (is(b, mKey, CompareOp.EQ, mIndexID))
swap(a++, b);
b++;
}
- while (c >= b && is(c, mKey, CompareOp.GE))
+ while (c >= b && is(c, mKey, CompareOp.GE, mIndexID))
{
- if (is(c, mKey, CompareOp.EQ))
+ if (is(c, mKey, CompareOp.EQ, mIndexID))
swap(c, d--);
c--;
}
@@ -510,9 +691,9 @@
{
int aOffset = a * 4;
int bOffset = b * 4;
- int bVal = getValue(bOffset);
+ int bVal = getIntValue(bOffset);
System.arraycopy(buffer, aOffset, buffer, bOffset, 4);
- System.arraycopy(getBytes(bVal), 0, buffer, aOffset, 4);
+ System.arraycopy(getIntBytes(bVal), 0, buffer, aOffset, 4);
}
private void vecswap(int a, int b, int n)
@@ -558,24 +739,43 @@
* @param o The object.
* @param offset The first offset.
* @param len The first length.
+ * @param indexID The first index id.
* @param offset1 The second offset.
* @param len1 The second length.
+ * @param indexID1 The second index id.
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second.
*/
- int compare(T o, int offset, int len, int offset1, int len1);
+ int compare(T o, int offset, int len, int indexID, int offset1,
+ int len1, int indexID1);
/**
* Compare an offset in an object with the specified object.
*
* @param o The first object.
* @param offset The first offset.
* @param len The first length.
- * @param o2 The second object.
+ * @param indexID The first index id.
+ * @param o1 The second object.
+ * @param indexID1 The second index id.
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second
* object.
*/
- int compare(T o, int offset, int len, T o2);
+ int compare(T o, int offset, int len, int indexID, T o1, int indexID1);
+
+ /**
+ * Compare an offset in an object with the specified object.
+ *
+ * @param o The first object.
+ * @param offset The first offset.
+ * @param len The first length.
+ * @param o1 The second object.
+ * @return a negative integer, zero, or a positive integer as the first
+ * offset value is less than, equal to, or greater than the second
+ * object.
+ */
+ int compare(T o, int offset, int len, T o1);
+
}
/**
@@ -591,12 +791,15 @@
* @param b The byte array.
* @param offset The first offset.
* @param len The first length.
+ * @param indexID The first index id.
* @param offset1 The second offset.
* @param len1 The second length.
+ * @param indexID1 The second index id.
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second.
*/
- public int compare(byte[] b, int offset, int len, int offset1, int len1)
+ public int compare(byte[] b, int offset, int len, int indexID,
+ int offset1, int len1, int indexID1)
{
for (int ai = len - 1, bi = len1 - 1;
ai >= 0 && bi >= 0; ai--, bi--) {
@@ -611,7 +814,18 @@
}
if(len == len1)
{
- return 0;
+ if(indexID == indexID1)
+ {
+ return 0;
+ }
+ else if(indexID > indexID1)
+ {
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
}
if(len > len1)
{
@@ -630,12 +844,15 @@
* @param b The byte array.
* @param offset The first offset.
* @param len The first length.
+ * @param indexID The first index id.
* @param m The second byte array to compare to.
+ * @param mIndexID The second index id.
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second
* byte array.
*/
- public int compare(byte[] b, int offset, int len, byte[]m)
+ public int compare(byte[] b, int offset, int len, int indexID,
+ byte[]m, int mIndexID)
{
int len1 = m.length;
for (int ai = len - 1, bi = len1 - 1;
@@ -651,7 +868,18 @@
}
if(len == len1)
{
- return 0;
+ if(indexID == mIndexID)
+ {
+ return 0;
+ }
+ else if(indexID > mIndexID)
+ {
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
}
if(len > len1)
{
@@ -662,7 +890,47 @@
return -1;
}
}
- }
+
+ /**
+ * Compare an offset in an byte array with the specified byte array,
+ * using the DN comparision algorithm.
+ *
+ * @param b The byte array.
+ * @param offset The first offset.
+ * @param len The first length.
+ * @param m The second byte array to compare to.
+ * @return a negative integer, zero, or a positive integer as the first
+ * offset value is less than, equal to, or greater than the
+ * second byte array.
+ */
+ public int compare(byte[] b, int offset, int len, byte[]m)
+ {
+ int len1 = m.length;
+ for (int ai = len - 1, bi = len1 - 1;
+ ai >= 0 && bi >= 0; ai--, bi--) {
+ if (b[offset + ai] > m[bi])
+ {
+ return 1;
+ }
+ else if (b[offset + ai] < m[bi])
+ {
+ return -1;
+ }
+ }
+ if(len == len1)
+ {
+ return 0;
+ }
+ if(len > len1)
+ {
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+ }
/**
* Implementation of ComparatorBuffer interface. Used to compare keys when
@@ -678,12 +946,15 @@
* @param b The byte array.
* @param offset The first offset.
* @param len The first length.
+ * @param indexID The first index id.
* @param offset1 The second offset.
* @param len1 The second length.
+ * @param indexID1 The second index id.
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second.
*/
- public int compare(byte[] b, int offset, int len, int offset1, int len1)
+ public int compare(byte[] b, int offset, int len, int indexID,
+ int offset1, int len1, int indexID1)
{
for(int i = 0; i < len && i < len1; i++)
{
@@ -698,7 +969,18 @@
}
if(len == len1)
{
- return 0;
+ if(indexID == indexID1)
+ {
+ return 0;
+ }
+ else if(indexID > indexID1)
+ {
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
}
if (len > len1)
{
@@ -717,6 +999,60 @@
* @param b The byte array.
* @param offset The first offset.
* @param len The first length.
+ * @param indexID The first index id.
+ * @param m The second byte array to compare to.
+ * @param mIndexID The second index id.
+ * @return a negative integer, zero, or a positive integer as the first
+ * offset value is less than, equal to, or greater than the second
+ * byte array.
+ */
+ public int compare(byte[] b, int offset, int len, int indexID,
+ byte[] m, int mIndexID)
+ {
+ int len1 = m.length;
+ for(int i = 0; i < len && i < len1; i++)
+ {
+ if(b[offset + i] > m[i])
+ {
+ return 1;
+ }
+ else if (b[offset + i] < m[i])
+ {
+ return -1;
+ }
+ }
+ if(len == len1)
+ {
+ if(indexID == mIndexID)
+ {
+ return 0;
+ }
+ else if(indexID > mIndexID)
+ {
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+ if (len > len1)
+ {
+ return 1;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+
+ /**
+ * Compare an offset in an byte array with the specified byte array,
+ * using the DN comparision algorithm.
+ *
+ * @param b The byte array.
+ * @param offset The first offset.
+ * @param len The first length.
* @param m The second byte array to compare to.
* @return a negative integer, zero, or a positive integer as the first
* offset value is less than, equal to, or greater than the second
@@ -750,4 +1086,23 @@
}
}
}
+
+ /**
+ * Set the index key associated with an index buffer.
+ *
+ * @param indexKey The index key.
+ */
+ public void setIndexKey(Importer.IndexKey indexKey)
+ {
+ this.indexKey = indexKey;
+ }
+
+ /**
+ * Return the index key of an index buffer.
+ * @return The index buffer's index key.
+ */
+ public Importer.IndexKey getIndexKey()
+ {
+ return indexKey;
+ }
}
diff --git a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java
index ba1bab2..12764f4 100644
--- a/opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java
+++ b/opends/src/server/org/opends/server/backends/jeb/importLDIF/Suffix.java
@@ -33,11 +33,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import org.opends.server.backends.jeb.*;
import org.opends.server.config.ConfigException;
import org.opends.server.types.*;
-
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockMode;
@@ -47,14 +45,11 @@
*/
public class Suffix
{
- private final RootContainer rootContainer;
- private final LDIFImportConfig config;
- private final List<DN> includeBranches = new ArrayList<DN>();
- private final List<DN> excludeBranches = new ArrayList<DN>();
+ private final List<DN> includeBranches;
+ private final List<DN> excludeBranches;
private final DN baseDN;
- private EntryContainer srcEntryContainer = null;
+ private final EntryContainer srcEntryContainer;
private EntryContainer entryContainer;
- private boolean exclude = false;
private final Object synchObject = new Object();
private static final int PARENT_ID_MAP_SIZE = 4096;
@@ -66,34 +61,52 @@
private DN parentDN;
private ArrayList<EntryID> IDs;
- private Suffix(EntryContainer entryContainer, LDIFImportConfig config,
- RootContainer rootContainer) throws InitializationException,
- ConfigException
+ private
+ Suffix(EntryContainer entryContainer, EntryContainer srcEntryContainer,
+ List<DN> includeBranches, List<DN> excludeBranches)
+ throws InitializationException, ConfigException
{
- this.rootContainer = rootContainer;
this.entryContainer = entryContainer;
- this.config = config;
+ this.srcEntryContainer = srcEntryContainer;
this.baseDN = entryContainer.getBaseDN();
- init();
+ if (includeBranches == null)
+ {
+ this.includeBranches = new ArrayList<DN>(0);
+ }
+ else
+ {
+ this.includeBranches = includeBranches;
+ }
+ if (excludeBranches == null)
+ {
+ this.excludeBranches = new ArrayList<DN>(0);
+ }
+ else
+ {
+ this.excludeBranches = excludeBranches;
+ }
}
/**
* Creates a suffix instance using the specified parameters.
*
* @param entryContainer The entry container pertaining to the suffix.
- * @param config The import config instance.
- * @param rootContainer The root container.
+ * @param srcEntryContainer The original entry container.
+ * @param includeBranches The include branches.
+ * @param excludeBranches The exclude branches.
*
* @return A suffix instance.
* @throws InitializationException If the suffix cannot be initialized.
* @throws ConfigException If an error occured reading the configuration.
*/
public static Suffix
- createSuffixContext(EntryContainer entryContainer, LDIFImportConfig config,
- RootContainer rootContainer) throws InitializationException,
- ConfigException
+ createSuffixContext(EntryContainer entryContainer,
+ EntryContainer srcEntryContainer,
+ List<DN> includeBranches, List<DN> excludeBranches)
+ throws InitializationException, ConfigException
{
- return new Suffix(entryContainer, config, rootContainer);
+ return new Suffix(entryContainer, srcEntryContainer,
+ includeBranches, excludeBranches);
}
/**
@@ -142,80 +155,6 @@
}
- private void init() throws InitializationException, ConfigException
- {
- if(!config.appendToExistingData() && !config.clearBackend()) {
- for(DN dn : config.getExcludeBranches()) {
- if(baseDN.equals(dn))
- exclude = true;
- if(baseDN.isAncestorOf(dn))
- excludeBranches.add(dn);
- }
-
- if(!config.getIncludeBranches().isEmpty()) {
- for(DN dn : config.getIncludeBranches()) {
- if(baseDN.isAncestorOf(dn))
- includeBranches.add(dn);
- }
- if(includeBranches.isEmpty())
- this.exclude = true;
-
- // Remove any overlapping include branches.
- Iterator<DN> includeBranchIterator = includeBranches.iterator();
- while(includeBranchIterator.hasNext()) {
- DN includeDN = includeBranchIterator.next();
- boolean keep = true;
- for(DN dn : includeBranches) {
- if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN)) {
- keep = false;
- break;
- }
- }
- if(!keep)
- includeBranchIterator.remove();
- }
-
- // Remove any exclude branches that are not are not under a include
- // branch since they will be migrated as part of the existing entries
- // outside of the include branches anyways.
- Iterator<DN> excludeBranchIterator = excludeBranches.iterator();
- while(excludeBranchIterator.hasNext()) {
- DN excludeDN = excludeBranchIterator.next();
- boolean keep = false;
- for(DN includeDN : includeBranches) {
- if(includeDN.isAncestorOf(excludeDN)) {
- keep = true;
- break;
- }
- }
- if(!keep)
- excludeBranchIterator.remove();
- }
-
- try {
- if(includeBranches.size() == 1 && excludeBranches.size() == 0 &&
- includeBranches.get(0).equals(baseDN)) {
- // This entire base DN is explicitly included in the import with
- // no exclude branches that we need to migrate. Just clear the entry
- // container.
- entryContainer.lock();
- entryContainer.clear();
- entryContainer.unlock();
- } else {
- // Create a temporary entry container
- srcEntryContainer = entryContainer;
- String tmpName = baseDN.toNormalizedString() +"_importTmp";
- entryContainer = rootContainer.openEntryContainer(baseDN, tmpName);
- }
- } catch (DatabaseException e) {
- // Message msg = ERR_CONFIG_IMPORT_SUFFIX_ERROR.get(e.getMessage());
- // throw new InitializationException(msg);
- }
- }
- }
- }
-
-
/**
* Return the Attribute Type - Index map used to map an attribute type to an
* index instance.
@@ -394,4 +333,44 @@
{
this.IDs = IDs;
}
+
+ /**
+ * Return a src entry container.
+ *
+ * @return The src entry container.
+ */
+ public EntryContainer getSrcEntryContainer()
+ {
+ return this.srcEntryContainer;
+ }
+
+ /**
+ * Return include branches.
+ *
+ * @return The include branches.
+ */
+ public List<DN> getIncludeBranches()
+ {
+ return this.includeBranches;
+ }
+
+ /**
+ * Return exclude branches.
+ *
+ * @return the exclude branches.
+ */
+ public List<DN> getExcludeBranches()
+ {
+ return this.excludeBranches;
+ }
+
+ /**
+ * Return base DN.
+ *
+ * @return The base DN.
+ */
+ public DN getBaseDN()
+ {
+ return this.baseDN;
+ }
}
diff --git a/opends/src/server/org/opends/server/config/ConfigConstants.java b/opends/src/server/org/opends/server/config/ConfigConstants.java
index cfe7455..4975cca 100644
--- a/opends/src/server/org/opends/server/config/ConfigConstants.java
+++ b/opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -3834,6 +3834,14 @@
public static final String ATTR_IMPORT_RANDOM_SEED =
NAME_PREFIX_TASK + "import-random-seed";
+
+ /**
+ * The name of the attribute in an import task definition that specifies the
+ * thread count to be used during the import.
+ */
+ public static final String ATTR_IMPORT_THREAD_COUNT =
+ NAME_PREFIX_TASK + "import-thread-count";
+
/**
* The name of the attribute in an import task definition that specifies
* whether the import process should append to the existing database rather
@@ -3984,8 +3992,8 @@
* that minimal DN validation should be done during phase 2.
*/
- public static final String ATTR_IMPORT_DN_CHECK_PHASE2 =
- NAME_PREFIX_TASK + "import-dn-check-phase2";
+ public static final String ATTR_IMPORT_SKIP_DN_VALIDATION =
+ NAME_PREFIX_TASK + "import-skip-dn-validation";
/**
* The name of the objectclass that will be used for a Directory Server
diff --git a/opends/src/server/org/opends/server/protocols/asn1/ASN1OutputStreamWriter.java b/opends/src/server/org/opends/server/protocols/asn1/ASN1OutputStreamWriter.java
index b85166e..69ee24b 100644
--- a/opends/src/server/org/opends/server/protocols/asn1/ASN1OutputStreamWriter.java
+++ b/opends/src/server/org/opends/server/protocols/asn1/ASN1OutputStreamWriter.java
@@ -479,13 +479,14 @@
{
out = streamStack.get(stackDepth);
}
-
+/*
if(debugEnabled())
{
TRACER.debugProtocolElement(DebugLogLevel.VERBOSE,
String.format("WRITE ASN.1 START SEQUENCE(type=0x%x)",
type));
}
+ */
return this;
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 49f6f59..441af91 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4129,7 +4129,7 @@
includeBranches.add(this.baseDn);
importConfig.setIncludeBranches(includeBranches);
importConfig.setAppendToExistingData(false);
- importConfig.setDNCheckPhase2(true);
+ importConfig.setSkipDNValidation(true);
// Allow fractional replication ldif import plugin to be called
importConfig.setInvokeImportPlugins(true);
// Reset the follow import flag and message before starting the import
diff --git a/opends/src/server/org/opends/server/tasks/ImportTask.java b/opends/src/server/org/opends/server/tasks/ImportTask.java
index 56103ac..d185d19 100644
--- a/opends/src/server/org/opends/server/tasks/ImportTask.java
+++ b/opends/src/server/org/opends/server/tasks/ImportTask.java
@@ -170,8 +170,9 @@
private boolean replaceExisting = false;
private boolean skipSchemaValidation = false;
private boolean clearBackend = false;
- private boolean dnCheckPhase2 = false;
+ private boolean skipDNValidation = false;
private String tmpDirectory = null;
+ private int threadCount = 0;
private String backendID = null;
private String rejectFile = null;
private String skipFile = null;
@@ -243,6 +244,7 @@
AttributeType typeIsEncrypted;
AttributeType typeClearBackend;
AttributeType typeRandomSeed;
+ AttributeType typeThreadCount;
AttributeType typeTmpDirectory;
AttributeType typeDNCheckPhase2;
@@ -284,10 +286,12 @@
getAttributeType(ATTR_IMPORT_CLEAR_BACKEND, true);
typeRandomSeed =
getAttributeType(ATTR_IMPORT_RANDOM_SEED, true);
+ typeThreadCount =
+ getAttributeType(ATTR_IMPORT_THREAD_COUNT, true);
typeTmpDirectory =
getAttributeType(ATTR_IMPORT_TMP_DIRECTORY, true);
typeDNCheckPhase2 =
- getAttributeType(ATTR_IMPORT_DN_CHECK_PHASE2, true);
+ getAttributeType(ATTR_IMPORT_SKIP_DN_VALIDATION, true);
List<Attribute> attrList;
@@ -332,7 +336,7 @@
append = TaskUtils.getBoolean(attrList, false);
attrList = taskEntry.getAttribute(typeDNCheckPhase2);
- dnCheckPhase2 = TaskUtils.getBoolean(attrList, true);
+ skipDNValidation = TaskUtils.getBoolean(attrList, true);
attrList = taskEntry.getAttribute(typeTmpDirectory);
tmpDirectory = TaskUtils.getSingleValueString(attrList);
@@ -385,6 +389,9 @@
attrList = taskEntry.getAttribute(typeRandomSeed);
randomSeed = TaskUtils.getSingleValueInteger(attrList, 0);
+ attrList = taskEntry.getAttribute(typeThreadCount);
+ threadCount = TaskUtils.getSingleValueInteger(attrList, 0);
+
// Make sure that either the "includeBranchStrings" argument or the
// "backendID" argument was provided.
if(includeBranchStrings.isEmpty() && backendID == null)
@@ -891,8 +898,9 @@
importConfig.setIncludeBranches(includeBranches);
importConfig.setIncludeFilters(includeFilters);
importConfig.setValidateSchema(!skipSchemaValidation);
- importConfig.setDNCheckPhase2(dnCheckPhase2);
+ importConfig.setSkipDNValidation(skipDNValidation);
importConfig.setTmpDirectory(tmpDirectory);
+ importConfig.setThreadCount(threadCount);
// FIXME -- Should this be conditional?
importConfig.setInvokeImportPlugins(true);
diff --git a/opends/src/server/org/opends/server/tools/ImportLDIF.java b/opends/src/server/org/opends/server/tools/ImportLDIF.java
index 890a80d..e45b1c1 100644
--- a/opends/src/server/org/opends/server/tools/ImportLDIF.java
+++ b/opends/src/server/org/opends/server/tools/ImportLDIF.java
@@ -145,15 +145,14 @@
}
// Define the command-line arguments that may be used with this program.
- //Append and replace removed for new import.
-// private BooleanArgument append = null;
+ private BooleanArgument append = null;
private BooleanArgument countRejects = null;
private BooleanArgument displayUsage = null;
private BooleanArgument isCompressed = null;
private BooleanArgument isEncrypted = null;
private BooleanArgument overwrite = null;
private BooleanArgument quietMode = null;
-// private BooleanArgument replaceExisting = null;
+ private BooleanArgument replaceExisting = null;
private BooleanArgument skipSchemaValidation = null;
private BooleanArgument clearBackend = null;
private IntegerArgument randomSeed = null;
@@ -170,7 +169,8 @@
private StringArgument rejectFile = null;
private StringArgument skipFile = null;
private StringArgument templateFile = null;
- private BooleanArgument dnCheckPhase2 = null;
+ private BooleanArgument skipDNValidation = null;
+ private IntegerArgument threadCount = null;
private StringArgument tmpDirectory = null;
private int process(String[] args, boolean initializeServer,
@@ -242,8 +242,6 @@
INFO_LDIFIMPORT_DESCRIPTION_TEMPLATE_FILE.get());
argParser.addArgument(templateFile);
- /*
- Append and replace removed for new import.
append =
new BooleanArgument("append", 'a', "append",
@@ -256,7 +254,7 @@
"replaceexisting", 'r', "replaceExisting",
INFO_LDIFIMPORT_DESCRIPTION_REPLACE_EXISTING.get());
argParser.addArgument(replaceExisting);
- */
+
backendID =
new StringArgument("backendid", 'n', "backendID", false, false, true,
@@ -357,12 +355,18 @@
argParser.addArgument(skipSchemaValidation);
- dnCheckPhase2 =
- new BooleanArgument("dnPhase2", null, "dnCheckPhase2",
- INFO_LDIFIMPORT_DESCRIPTION_DN_CHECK_PHASE_2.get());
- argParser.addArgument(dnCheckPhase2);
+ skipDNValidation =
+ new BooleanArgument("skipDNValidation", null, "skipDNValidation",
+ INFO_LDIFIMPORT_DESCRIPTION_DN_VALIDATION.get());
+ argParser.addArgument(skipDNValidation);
+ threadCount = new IntegerArgument("threadCount", null, "threadCount",
+ false, true,
+ INFO_LDIFIMPORT_THREAD_COUNT_PLACEHOLDER.get(),
+ INFO_LDIFIMPORT_DESCRIPTION_THREAD_COUNT.get());
+ argParser.addArgument(threadCount);
+
tmpDirectory =
new StringArgument("tmpdirectory", null, "tmpdirectory", false,
false, true, INFO_LDIFIMPORT_TEMP_DIR_PLACEHOLDER.get(),
@@ -546,8 +550,6 @@
//
// Optional attributes
//
- /*
- Append and replace removed for new import.
if (append.getValue() != null &&
!append.getValue().equals(append.getDefaultValue())) {
@@ -563,7 +565,7 @@
values.add(ByteString.valueOf(replaceExisting.getValue()));
attributes.add(new LDAPAttribute(ATTR_IMPORT_REPLACE_EXISTING, values));
}
- */
+
if (backendID.getValue() != null &&
!backendID.getValue().equals(
backendID.getDefaultValue())) {
@@ -668,16 +670,26 @@
}
- if (dnCheckPhase2.getValue() != null &&
- !dnCheckPhase2.getValue().equals(
- dnCheckPhase2.getDefaultValue())) {
+ if (skipDNValidation.getValue() != null &&
+ !skipDNValidation.getValue().equals(
+ skipDNValidation.getDefaultValue())) {
values = new ArrayList<ByteString>(1);
- values.add(ByteString.valueOf(dnCheckPhase2.getValue()));
+ values.add(ByteString.valueOf(skipDNValidation.getValue()));
attributes.add(
- new LDAPAttribute(ATTR_IMPORT_DN_CHECK_PHASE2, values));
+ new LDAPAttribute(ATTR_IMPORT_SKIP_DN_VALIDATION, values));
}
+ if (threadCount.getValue() != null &&
+ !threadCount.getValue().equals(
+ threadCount.getDefaultValue())) {
+ values = new ArrayList<ByteString>(1);
+ values.add(ByteString.valueOf(threadCount.getValue()));
+ attributes.add(new LDAPAttribute(ATTR_IMPORT_THREAD_COUNT, values));
+ }
+
+
+
if (isCompressed.getValue() != null &&
!isCompressed.getValue().equals(
isCompressed.getDefaultValue())) {
@@ -1199,6 +1211,7 @@
// baseDNs for the backend being imported.
if(backendID.isPresent() && !includeBranchStrings.isPresent() &&
+ !append.isPresent() &&
defaultIncludeBranches.size() > 1 &&
!clearBackend.isPresent())
{
@@ -1322,11 +1335,26 @@
importConfig = new LDIFImportConfig(tf);
}
+ int tc = -1;
+ if(threadCount.isPresent())
+ {
+ try
+ {
+ tc = threadCount.getIntValue();
+ }
+ catch(Exception e)
+ {
+ Message msg = ERR_LDIFIMPORT_CANNOT_PARSE_THREAD_COUNT.get(
+ threadCount.getValue(), e.getMessage());
+ logError(msg);
+ return 1;
+ }
+ }
// Create the LDIF import configuration to use when reading the LDIF.
- // importConfig.setAppendToExistingData(append.isPresent());
- // importConfig.setReplaceExistingEntries(replaceExisting.isPresent());
+ importConfig.setAppendToExistingData(append.isPresent());
+ importConfig.setReplaceExistingEntries(replaceExisting.isPresent());
importConfig.setCompressed(isCompressed.isPresent());
importConfig.setClearBackend(clearBackend.isPresent());
importConfig.setEncrypted(isEncrypted.isPresent());
@@ -1337,8 +1365,9 @@
importConfig.setIncludeBranches(includeBranches);
importConfig.setIncludeFilters(includeFilters);
importConfig.setValidateSchema(!skipSchemaValidation.isPresent());
- importConfig.setDNCheckPhase2(dnCheckPhase2.isPresent());
+ importConfig.setSkipDNValidation(skipDNValidation.isPresent());
importConfig.setTmpDirectory(tmpDirectory.getValue());
+ importConfig.setThreadCount(tc);
importConfig.setBufferSize(LDIF_BUFFER_SIZE);
importConfig.setExcludeAllUserAttributes(
@@ -1460,6 +1489,7 @@
Message message =
ERR_LDIFIMPORT_ERROR_DURING_IMPORT.get(getExceptionMessage(e));
logError(message);
+e.printStackTrace();
retCode = 1;
}
diff --git a/opends/src/server/org/opends/server/types/LDIFImportConfig.java b/opends/src/server/org/opends/server/types/LDIFImportConfig.java
index e4f6c8c..c22e40a 100644
--- a/opends/src/server/org/opends/server/types/LDIFImportConfig.java
+++ b/opends/src/server/org/opends/server/types/LDIFImportConfig.java
@@ -165,7 +165,8 @@
private boolean excludeAllOpAttrs;
private String tmpDirectory;
- private boolean dnCheckPhase2 = false;
+ private boolean skipDNValidation = false;
+ private int threadCount = -1;
/**
@@ -647,9 +648,9 @@
return;
}
-
skipWriter =
new BufferedWriter(new OutputStreamWriter(outputStream));
+ System.out.println("New Skipped: " + skipWriter.toString());
}
/**
@@ -1411,9 +1412,9 @@
*
* @param v The value to set the dn check in phase two boolean to.
*/
- public void setDNCheckPhase2(boolean v)
+ public void setSkipDNValidation(boolean v)
{
- dnCheckPhase2 = v;
+ skipDNValidation = v;
}
/**
@@ -1421,9 +1422,30 @@
*
* @return Return the dn check in phase two boolean value.
*/
- public boolean getDNCheckPhase2()
+ public boolean getSkipDNValidation()
{
- return dnCheckPhase2;
+ return skipDNValidation;
+ }
+
+
+ /**
+ * Set the thread count.
+ *
+ * @param c The thread count value.
+ */
+ public void setThreadCount(int c)
+ {
+ this.threadCount = c;
+ }
+
+ /**
+ * Return the specified thread count.
+ *
+ * @return The thread count.
+ */
+ public int getThreadCount()
+ {
+ return this.threadCount;
}
}
diff --git a/opends/src/server/org/opends/server/util/LDIFReader.java b/opends/src/server/org/opends/server/util/LDIFReader.java
index 72264e5..dd92fca 100644
--- a/opends/src/server/org/opends/server/util/LDIFReader.java
+++ b/opends/src/server/org/opends/server/util/LDIFReader.java
@@ -120,9 +120,6 @@
private RootContainer rootContainer;
- //Temporary until multiple suffixes are supported.
- private volatile Suffix suffix = null;
-
/**
* Creates a new LDIF reader that will read information from the specified
@@ -203,36 +200,39 @@
/**
- * Reads the next entry from the LDIF source. This method will need
- * to be changed when multiple suffixes is supported.
+ * Reads the next entry from the LDIF source.
*
* @return The next entry read from the LDIF source, or <CODE>null</CODE> if
* the end of the LDIF data is reached.
*
- * @param map A
+ * @param map A map of suffixes instances.
+ *
+ * @param entryInfo A object to hold information about the entry ID and what
+ * suffix was selected.
*
* @throws IOException If an I/O problem occurs while reading from the file.
*
* @throws LDIFException If the information read cannot be parsed as an LDIF
* entry.
*/
- public final Entry readEntry(Map<DN, Suffix> map)
+ public final Entry readEntry(Map<DN, Suffix> map,
+ Importer.EntryInformation entryInfo)
throws IOException, LDIFException
{
- return readEntry(importConfig.validateSchema(), map);
+ return readEntry(importConfig.validateSchema(), map, entryInfo);
}
- private final Entry readEntry(boolean checkSchema, Map<DN, Suffix> map)
+ private final Entry readEntry(boolean checkSchema, Map<DN, Suffix> map,
+ Importer.EntryInformation entryInfo)
throws IOException, LDIFException
{
-
while (true)
{
LinkedList<StringBuilder> lines;
DN entryDN;
- EntryID entryID;
+ EntryID entryID=null;
synchronized (this)
{
// Read the set of lines that make up the next entry.
@@ -271,12 +271,7 @@
}
entryID = rootContainer.getNextEntryID();
}
- //Temporary until multiple suffixes are supported.
- //getMatchSuffix calls the expensive DN getParentDNInSuffix
- if(suffix == null)
- {
- suffix= Importer.getMatchSuffix(entryDN, map);
- }
+ Suffix suffix= Importer.getMatchSuffix(entryDN, map);
if(suffix == null)
{
if (debugEnabled())
@@ -398,8 +393,8 @@
throw new LDIFException(message, lastEntryLineNumber, true);
}
}
-
- entry.setAttachment(entryID);
+ entryInfo.setEntryID(entryID);
+ entryInfo.setSuffix(suffix);
// The entry should be included in the import, so return it.
return entry;
}
diff --git a/opends/tests/unit-tests-testng/resource/config-changes.ldif b/opends/tests/unit-tests-testng/resource/config-changes.ldif
index 78b44c6..71a8818 100644
--- a/opends/tests/unit-tests-testng/resource/config-changes.ldif
+++ b/opends/tests/unit-tests-testng/resource/config-changes.ldif
@@ -426,7 +426,6 @@
ds-cfg-db-directory-permissions: 700
ds-cfg-index-entry-limit: 1
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-db-cache-percent: 2
@@ -657,7 +656,6 @@
ds-cfg-db-directory-permissions: 700
ds-cfg-index-entry-limit: 10
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-db-cache-percent: 2
@@ -828,7 +826,6 @@
ds-cfg-db-directory-permissions: 700
ds-cfg-index-entry-limit: 10
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-db-cache-percent: 2
@@ -994,7 +991,6 @@
ds-cfg-db-directory-permissions: 700
ds-cfg-index-entry-limit: 10
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-db-cache-percent: 2
@@ -1194,7 +1190,6 @@
ds-cfg-db-directory-permissions: 700
ds-cfg-index-entry-limit: 13
ds-cfg-preload-time-limit: 0 seconds
-ds-cfg-import-thread-count: 8
ds-cfg-entries-compressed: false
ds-cfg-db-cache-percent: 2
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/jeb/TestImportJob.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/jeb/TestImportJob.java
index fb74a0e..0f2494f 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/jeb/TestImportJob.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/jeb/TestImportJob.java
@@ -282,7 +282,7 @@
TestCaseUtils.deleteDirectory(tempDir);
}
- @Test(enabled=false)
+ @Test(enabled=true)
public void testImportAll() throws Exception
{
TestCaseUtils.clearJEBackend(false, beID, null);
@@ -365,8 +365,7 @@
}
}
- //@Test(dependsOnMethods = "testImportAll")
- @Test(enabled=false)
+ @Test(dependsOnMethods = "testImportAll")
public void testImportPartial() throws Exception
{
ArrayList<String> fileList = new ArrayList<String>();
@@ -454,8 +453,7 @@
}
}
- //@Test(dependsOnMethods = "testImportPartial")
- @Test(enabled=false)
+ @Test(dependsOnMethods = "testImportPartial")
public void testImportReplaceExisting() throws Exception
{
ByteArrayOutputStream rejectedEntries = new ByteArrayOutputStream();
@@ -508,8 +506,7 @@
}
}
- //@Test(dependsOnMethods = "testImportReplaceExisting")
- @Test(enabled=false)
+ @Test(dependsOnMethods = "testImportReplaceExisting")
public void testImportNoParent() throws Exception
{
ByteArrayOutputStream rejectedEntries = new ByteArrayOutputStream();
@@ -532,8 +529,7 @@
assertTrue(rejectedEntries.toString().contains("uid=user.446,dc=importtest1,dc=com"));
}
- //@Test(dependsOnMethods = "testImportReplaceExisting")
- @Test(enabled=false)
+ @Test(dependsOnMethods = "testImportReplaceExisting")
public void testImportAppend() throws Exception
{
LDIFImportConfig importConfig = new LDIFImportConfig(homeDirName + File.separator + "top.ldif");
@@ -603,8 +599,7 @@
}
}
- //@Test(dependsOnMethods = "testImportPartial")
- @Test(enabled=false)
+ @Test(dependsOnMethods = "testImportPartial")
public void testImportNotReplaceExisting() throws Exception
{
ByteArrayOutputStream rejectedEntries = new ByteArrayOutputStream();
@@ -628,8 +623,7 @@
assertTrue(rejectedEntries.toString().contains("uid=user.446,dc=importtest1,dc=com"));
}
- //@Test(dependsOnMethods = "testImportPartial")
- @Test(enabled=false)
+ @Test(dependsOnMethods = "testImportPartial")
public void testImportSkip() throws Exception
{
ArrayList<DN> excludeBranches = new ArrayList<DN>();
@@ -655,7 +649,7 @@
assertTrue(skippedEntries.toString().contains("dc=skipped,dc=importtest1,dc=com"));
assertTrue(skippedEntries.toString().contains("uid=user.446,dc=skipped,dc=importtest1,dc=com"));
}
-
+
/**
* Builds an entry suitable for using in the verify job to gather statistics about
* the verify.
--
Gitblit v1.10.0