From acac2a13ebe79697f86272cacd61541955c9d343 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 16 Jun 2016 09:14:13 +0000
Subject: [PATCH] OPENDJ-3123: Reduce memory pressure by limiting the number of thread.
---
opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java | 7
opendj-server-legacy/src/messages/org/opends/messages/backend.properties | 6
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java | 357 ++++++++++++-------------------------------
opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java | 19 --
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java | 40 ++---
opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java | 25 ---
opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java | 4
opendj-server-legacy/resource/schema/02-config.ldif | 7
opendj-server-legacy/src/messages/org/opends/messages/tool.properties | 11 -
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java | 1
10 files changed, 124 insertions(+), 353 deletions(-)
diff --git a/opendj-server-legacy/resource/schema/02-config.ldif b/opendj-server-legacy/resource/schema/02-config.ldif
index 2bd227f..c248cad 100644
--- a/opendj-server-legacy/resource/schema/02-config.ldif
+++ b/opendj-server-legacy/resource/schema/02-config.ldif
@@ -3837,12 +3837,6 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
SINGLE-VALUE
X-ORIGIN 'OpenDJ Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.158
- NAME 'ds-task-import-offheap-size'
- EQUALITY integerMatch
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
- SINGLE-VALUE
- X-ORIGIN 'OpenDJ Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.160
NAME 'ds-cfg-base-path'
EQUALITY caseIgnoreMatch
@@ -4632,7 +4626,6 @@
ds-task-import-is-encrypted $
ds-task-import-backend-id $
ds-task-import-thread-count $
- ds-task-import-offheap-size $
ds-task-import-clear-backend )
X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.64
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index 798ccf4..22a85f6 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -27,7 +27,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@@ -94,7 +93,7 @@
import org.opends.server.backends.pluggable.CursorTransformer.SequentialCursorAdapter;
import org.opends.server.backends.pluggable.DN2ID.TreeVisitor;
import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation;
-import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk;
+import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.MemoryBuffer;
import org.opends.server.backends.pluggable.spi.Cursor;
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadOperation;
@@ -117,8 +116,7 @@
import com.forgerock.opendj.util.OperatingSystem;
import com.forgerock.opendj.util.PackedLong;
-// @Checkstyle:ignore
-import sun.misc.Unsafe;
+import net.jcip.annotations.NotThreadSafe;
/**
* Imports LDIF data contained in files into the database. Because of the B-Tree structure used in backend, import is
@@ -170,41 +168,22 @@
public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception
{
final int threadCount =
- importConfig.getThreadCount() == 0 ? Runtime.getRuntime().availableProcessors()
- : importConfig.getThreadCount();
+ importConfig.getThreadCount() == 0 ? getDefaultNumberOfThread() : importConfig.getThreadCount();
final int indexCount = getIndexCount();
- final int nbBuffer = threadCount * indexCount * 2;
- final int bufferSize;
- if (BufferPool.SUPPORTS_OFF_HEAP && importConfig.getOffHeapSize() > 0)
- {
- final long offHeapSize = importConfig.getOffHeapSize();
- bufferSize = (int) ((offHeapSize * MB) / nbBuffer);
- if (bufferSize < MIN_BUFFER_SIZE)
- {
- // Not enough memory.
- throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapSize * MB, nbBuffer * MIN_BUFFER_SIZE));
- }
- logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE, offHeapSize, nbBuffer, bufferSize / KB);
- }
- else
- {
- bufferSize = computeBufferSize(nbBuffer);
- logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
- }
+ final int nbRequiredBuffers = threadCount * indexCount * 2;
logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
final long startTime = System.currentTimeMillis();
final OnDiskMergeImporter importer;
- final ExecutorService sorter = Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
- newThreadFactory(null, SORTER_THREAD_NAME, true));
+ final ExecutorService sorter =
+ Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
final LDIFReaderSource source =
new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount);
final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory());
try (final Importer dbStorage = rootContainer.getStorage().startImport();
- final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize))
+ final BufferPool bufferPool = newBufferPool(nbRequiredBuffers))
{
final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers();
final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation()
@@ -241,6 +220,12 @@
.getEntriesIgnored());
}
+ private static int getDefaultNumberOfThread()
+ {
+ final int nbProcessors = Runtime.getRuntime().availableProcessors();
+ return Math.max(2, DirectoryServer.isRunning() ? nbProcessors / 2 : nbProcessors);
+ }
+
private int getIndexCount() throws ConfigException
{
int indexCount = 2; // dn2id, dn2uri
@@ -307,29 +292,15 @@
return;
}
rootContainer.getStorage().close();
- final int threadCount = Runtime.getRuntime().availableProcessors();
+ final int threadCount = getDefaultNumberOfThread();
final int nbBuffer = 2 * indexesToRebuild.size() * threadCount;
- final int bufferSize;
- if (BufferPool.SUPPORTS_OFF_HEAP)
- {
- bufferSize = MAX_BUFFER_SIZE;
- logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
- DB_CACHE_SIZE, (((long) bufferSize) * nbBuffer) / MB, nbBuffer, bufferSize / KB);
- }
- else
- {
- bufferSize = computeBufferSize(nbBuffer);
- logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
- }
-
- final ExecutorService sorter = Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
- newThreadFactory(null, SORTER_THREAD_NAME, true));
+ final ExecutorService sorter =
+ Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true));
final OnDiskMergeImporter importer;
final File tempDir = prepareTempDir(backendCfg, tmpDirectory);
try (final Importer dbStorage = rootContainer.getStorage().startImport();
- final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize))
+ final BufferPool bufferPool = newBufferPool(nbBuffer))
{
final AbstractTwoPhaseImportStrategy strategy = new RebuildIndexStrategy(
rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild);
@@ -349,6 +320,44 @@
logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate);
}
+ public BufferPool newBufferPool(int nbBuffers) throws InitializationException
+ {
+ // Try off-heap direct buffer
+ int bufferSize = MAX_BUFFER_SIZE;
+ do
+ {
+ try
+ {
+ final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true);
+ logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO,
+ DB_CACHE_SIZE / MB, (((long) bufferSize) * nbBuffers) / MB, nbBuffers, bufferSize / KB);
+ return pool;
+ }
+ catch (OutOfMemoryError e)
+ {
+ bufferSize /= 2;
+ }
+ }
+ while (bufferSize > MIN_BUFFER_SIZE);
+
+ // Off-line mode or direct memory allocation failed.
+ final long availableMemory = calculateAvailableHeapMemoryForBuffers();
+ logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers);
+ long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
+ if (availableMemory < minimumRequiredMemory)
+ {
+ // Not enough memory.
+ throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
+ }
+ bufferSize = (int) (availableMemory / nbBuffers);
+ if (DirectoryServer.isRunning() && bufferSize > MAX_BUFFER_SIZE)
+ {
+ bufferSize = MAX_BUFFER_SIZE;
+ }
+ logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
+ return new BufferPool(nbBuffers, bufferSize, false);
+ }
+
private static final Set<String> selectIndexesToRebuild(EntryContainer entryContainer, RebuildConfig rebuildConfig,
long totalEntries) throws InitializationException
{
@@ -489,20 +498,6 @@
return tempDir;
}
- private int computeBufferSize(int nbBuffer) throws InitializationException
- {
- final long availableMemory = calculateAvailableHeapMemoryForBuffers();
- logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer);
-
- final long minimumRequiredMemory = nbBuffer * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY;
- if (availableMemory < minimumRequiredMemory)
- {
- // Not enough memory.
- throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory));
- }
- return Math.min((int) (availableMemory / nbBuffer), MAX_BUFFER_SIZE);
- }
-
/**
* Calculates the amount of available memory which can be used by this import, taking into account whether
* the import is running offline or online as a task.
@@ -519,6 +514,11 @@
else
{
// Offline import/rebuild.
+ // call twice gc to ensure finalizers are called
+ // and young to old gen references are properly gc'd
+ Runtime.getRuntime().gc();
+ Runtime.getRuntime().gc();
+
totalAvailableMemory = Platform.getUsableMemoryForCaching();
}
@@ -823,7 +823,7 @@
}
/** Max size of phase one buffer. */
- private static final int MAX_BUFFER_SIZE = 2 * MB;
+ private static final int MAX_BUFFER_SIZE = 4 * MB;
/** Min size of phase one buffer. */
private static final int MIN_BUFFER_SIZE = 4 * KB;
/** DB cache size to use during import. */
@@ -1621,7 +1621,7 @@
{
private final String metricName;
private final BufferPool bufferPool;
- private final Buffer buffer;
+ private final MemoryBuffer buffer;
private long totalBytes;
private int indexPos;
private int dataPos;
@@ -2727,67 +2727,26 @@
}
}
- /** Buffer used by {@link InMemorySortedChunk} to store and sort data. */
- interface Buffer extends Closeable
- {
- void writeInt(int position, int value);
-
- int readInt(int position);
-
- ByteString readByteString(int position, int length);
-
- void writeByteSequence(int position, ByteSequence data);
-
- int length();
-
- int compare(int offsetA, int lengthA, int offsetB, int lengthB);
- }
-
/**
* Pre-allocate and maintain a fixed number of re-usable {@code Buffer}s. This allow to keep controls of heap memory
* consumption and prevents the significant object allocation cost occurring for huge objects.
*/
static final class BufferPool implements Closeable
{
- private static final Object UNSAFE_OBJECT;
- static final boolean SUPPORTS_OFF_HEAP;
- static
- {
- Object unsafeObject = null;
- try
- {
- final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
- final Field theUnsafeField = unsafeClass.getDeclaredField("theUnsafe");
- theUnsafeField.setAccessible(true);
- unsafeObject = theUnsafeField.get(null);
- }
- catch (Throwable e)
- {
- // Unsupported.
- }
- UNSAFE_OBJECT = unsafeObject;
- SUPPORTS_OFF_HEAP = UNSAFE_OBJECT != null;
- }
+ private final BlockingQueue<MemoryBuffer> pool;
- private final BlockingQueue<Buffer> pool;
- private final int bufferSize;
-
- BufferPool(int nbBuffer, int bufferSize)
+ BufferPool(int nbBuffer, int bufferSize, boolean allocateDirect)
{
this.pool = new ArrayBlockingQueue<>(nbBuffer);
- this.bufferSize = bufferSize;
for (int i = 0; i < nbBuffer; i++)
{
- pool.offer(SUPPORTS_OFF_HEAP ? new OffHeapBuffer(bufferSize) : new HeapBuffer(bufferSize));
+ pool.offer(new MemoryBuffer(allocateDirect
+ ? ByteBuffer.allocateDirect(bufferSize)
+ : ByteBuffer.allocate(bufferSize)));
}
}
- public int getBufferSize()
- {
- return bufferSize;
- }
-
- private Buffer get()
+ private MemoryBuffer get()
{
try
{
@@ -2799,7 +2758,7 @@
}
}
- private void release(Buffer buffer)
+ private void release(MemoryBuffer buffer)
{
try
{
@@ -2811,183 +2770,71 @@
}
}
- public void setSize(int size)
- {
- while (pool.size() > size)
- {
- get();
- }
- }
-
@Override
public void close()
{
- Buffer buffer;
- while ((buffer = pool.poll()) != null)
- {
- closeSilently(buffer);
- }
+ pool.clear();
}
- /** Off-heap buffer using Unsafe memory access. */
- @SuppressWarnings("restriction")
- static final class OffHeapBuffer implements Buffer
- {
- private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT;
- private static final long BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-
- private final long address;
- private final int size;
- private int position;
- private final OutputStream asOutputStream = new OutputStream()
- {
- @Override
- public void write(int value) throws IOException
- {
- UNSAFE.putByte(address + position++, (byte) (value & 0xFF));
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET, null, address + position, b.length);
- position += b.length;
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET + off, null, address + position, len);
- position += b.length;
- }
- };
- private boolean closed;
-
- OffHeapBuffer(int size)
- {
- this.size = size;
- this.address = UNSAFE.allocateMemory(size);
- }
-
- @Override
- public void writeInt(final int position, final int value)
- {
- UNSAFE.putInt(address + position, value);
- }
-
- @Override
- public int readInt(final int position)
- {
- return UNSAFE.getInt(address + position);
- }
-
- @Override
- public void writeByteSequence(final int position, ByteSequence data)
- {
- Reject.ifFalse(position + data.length() <= size);
- this.position = position;
- try
- {
- data.copyTo(asOutputStream);
- }
- catch(IOException e)
- {
- throw new StorageRuntimeException(e);
- }
- }
-
- @Override
- public int length()
- {
- return size;
- }
-
- @Override
- public ByteString readByteString(int position, int length)
- {
- Reject.ifFalse(position + length <= size);
-
- final byte[] data = new byte[length];
- UNSAFE.copyMemory(null, address + position, data, BYTE_ARRAY_OFFSET, length);
- return ByteString.wrap(data);
- }
-
- @Override
- public int compare(int offsetA, int lengthA, int offsetB, int lengthB)
- {
- final int len = Math.min(lengthA, lengthB);
- for(int i = 0 ; i < len ; i++)
- {
- final int a = UNSAFE.getByte(address + offsetA + i) & 0xFF;
- final int b = UNSAFE.getByte(address + offsetB + i) & 0xFF;
- if ( a != b )
- {
- return a - b;
- }
- }
- return lengthA - lengthB;
- }
-
- @Override
- public void close()
- {
- if (!closed)
- {
- UNSAFE.freeMemory(address);
- }
- closed = true;
- }
- }
-
- /** Heap buffer using ByteBuffer. */
- static final class HeapBuffer implements Buffer
+ /** Buffer wrapping a ByteBuffer. */
+ @NotThreadSafe
+ static final class MemoryBuffer
{
private final ByteBuffer buffer;
- HeapBuffer(int size)
+ MemoryBuffer(final ByteBuffer byteBuffer)
{
- this.buffer = ByteBuffer.allocate(size);
+ this.buffer = byteBuffer;
}
- @Override
- public void writeInt(final int position, final int value)
+ void writeInt(final int position, final int value)
{
buffer.putInt(position, value);
}
- @Override
- public int readInt(final int position)
+ int readInt(final int position)
{
return buffer.getInt(position);
}
- @Override
- public void writeByteSequence(int position, ByteSequence data)
+ void writeByteSequence(int position, ByteSequence data)
{
buffer.position(position);
data.copyTo(buffer);
}
- @Override
- public int length()
+ int length()
{
return buffer.capacity();
}
- @Override
- public ByteString readByteString(int position, int length)
+ ByteString readByteString(int position, int length)
{
- return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length);
+ if (buffer.hasArray())
+ {
+ return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length);
+ }
+ final byte[] data = new byte[length];
+ buffer.position(position);
+ buffer.get(data);
+ return ByteString.wrap(data);
}
- @Override
- public int compare(int offsetA, int lengthA, int offsetB, int lengthB)
+ int compare(int offsetA, int lengthA, int offsetB, int lengthB)
{
- return readByteString(offsetA, lengthA).compareTo(readByteString(offsetB, lengthB));
- }
-
- @Override
- public void close()
- {
- // Nothing to do
+ int count = Math.min(lengthA, lengthB);
+ int i = offsetA;
+ int j = offsetB;
+ while (count-- != 0)
+ {
+ final int firstByte = 0xFF & buffer.get(i++);
+ final int secondByte = 0xFF & buffer.get(j++);
+ if (firstByte != secondByte)
+ {
+ return firstByte - secondByte;
+ }
+ }
+ return lengthA - lengthB;
}
}
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java b/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
index 736660f..03f0e73 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
@@ -3829,13 +3829,6 @@
NAME_PREFIX_TASK + "import-thread-count";
/**
- * The name of the attribute in an import task definition that specifies the
- * off-heap memory size used during the import.
- */
- public static final String ATTR_IMPORT_OFFHEAP_SIZE =
- NAME_PREFIX_TASK + "import-offheap-size";
-
- /**
* The name of the attribute in an import task definition that specifies
* whether the import process should append to the existing database rather
* than overwriting it.
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java b/opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java
index 501ebdd..fdfe6b9 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/tasks/ImportTask.java
@@ -93,7 +93,6 @@
private boolean skipDNValidation;
private String tmpDirectory;
private int threadCount;
- private int offHeapSize;
private String backendID;
private String rejectFile;
private String skipFile;
@@ -153,7 +152,6 @@
AttributeType typeClearBackend = getSchema().getAttributeType(ATTR_IMPORT_CLEAR_BACKEND);
AttributeType typeRandomSeed = getSchema().getAttributeType(ATTR_IMPORT_RANDOM_SEED);
AttributeType typeThreadCount = getSchema().getAttributeType(ATTR_IMPORT_THREAD_COUNT);
- AttributeType typeOffHeapSize = getSchema().getAttributeType(ATTR_IMPORT_OFFHEAP_SIZE);
AttributeType typeTmpDirectory = getSchema().getAttributeType(ATTR_IMPORT_TMP_DIRECTORY);
AttributeType typeDNCheckPhase2 = getSchema().getAttributeType(ATTR_IMPORT_SKIP_DN_VALIDATION);
@@ -209,7 +207,6 @@
clearBackend = asBoolean(taskEntry, typeClearBackend);
randomSeed = asInt(taskEntry, typeRandomSeed);
threadCount = asInt(taskEntry, typeThreadCount);
- offHeapSize = asInt(taskEntry, typeOffHeapSize);
// Make sure that either the "includeBranchStrings" argument or the
// "backendID" argument was provided.
@@ -587,7 +584,6 @@
importConfig.setSkipDNValidation(skipDNValidation);
importConfig.setTmpDirectory(tmpDirectory);
importConfig.setThreadCount(threadCount);
- importConfig.setOffHeapSize(offHeapSize);
// FIXME -- Should this be conditional?
importConfig.setInvokeImportPlugins(true);
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java b/opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java
index 17a69f5..aec0b2c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/tools/ImportLDIF.java
@@ -136,7 +136,6 @@
private StringArgument templateFile;
private BooleanArgument skipDNValidation;
private IntegerArgument threadCount;
- private IntegerArgument offHeapSize;
private StringArgument tmpDirectory;
private int process(String[] args, boolean initializeServer,
@@ -360,13 +359,6 @@
.defaultValue(0)
.valuePlaceholder(INFO_LDIFIMPORT_THREAD_COUNT_PLACEHOLDER.get())
.buildAndAddToParser(argParser);
- offHeapSize =
- IntegerArgument.builder("offHeapSize")
- .description(INFO_LDIFIMPORT_DESCRIPTION_OFFHEAP_SIZE.get())
- .lowerBound(0)
- .defaultValue(700)
- .valuePlaceholder(INFO_LDIFIMPORT_OFFHEAP_SIZE_PLACEHOLDER.get())
- .buildAndAddToParser(argParser);
tmpDirectory =
StringArgument.builder("tmpdirectory")
.description(INFO_LDIFIMPORT_DESCRIPTION_TEMP_DIRECTORY.get())
@@ -407,7 +399,6 @@
addAttribute(attributes, ATTR_IMPORT_TEMPLATE_FILE, templateFile.getValue());
addAttribute(attributes, ATTR_IMPORT_RANDOM_SEED, randomSeed.getValue());
addAttribute(attributes, ATTR_IMPORT_THREAD_COUNT, threadCount.getValue());
- addAttribute(attributes, ATTR_IMPORT_OFFHEAP_SIZE, offHeapSize.getValue());
// Optional attributes
addAttribute2(attributes, ATTR_IMPORT_BACKEND_ID, backendID);
@@ -794,16 +785,6 @@
return 1;
}
- try
- {
- importConfig.setOffHeapSize(offHeapSize.getIntValue());
- }
- catch (Exception e)
- {
- logger.error(ERR_LDIFIMPORT_CANNOT_PARSE_OFFHEAP_SIZE, offHeapSize.getValue(), e.getMessage());
- return 1;
- }
-
importConfig.setBufferSize(LDIF_BUFFER_SIZE);
importConfig.setExcludeAllUserAttributes(excludeAllUserAttributes);
importConfig.setExcludeAllOperationalAttributes(excludeAllOperationalAttributes);
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java b/opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java
index 3605862..fd58233 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/types/LDIFImportConfig.java
@@ -125,10 +125,6 @@
private boolean skipDNValidation;
private int threadCount;
- /** Indicates the memory size, in megabytes, to use for off-heap buffers. */
- private int offHeapSize;
-
-
/**
* Creates a new LDIF import configuration that will read from the
* specified LDIF file.
@@ -1061,27 +1057,6 @@
return skipDNValidation;
}
-
- /**
- * Set the memory size available for off-heap buffers.
- *
- * @param sizeInMb The memory size available expressed in megabytes.
- */
- public void setOffHeapSize(int sizeInMb)
- {
- this.offHeapSize = sizeInMb;
- }
-
- /**
- * Get the memory size available for off-heap buffers.
- *
- * @return The memory size in megabytes.
- */
- public int getOffHeapSize()
- {
- return offHeapSize;
- }
-
/**
* Set the thread count.
*
diff --git a/opendj-server-legacy/src/messages/org/opends/messages/backend.properties b/opendj-server-legacy/src/messages/org/opends/messages/backend.properties
index 6b354fb..a327de8 100644
--- a/opendj-server-legacy/src/messages/org/opends/messages/backend.properties
+++ b/opendj-server-legacy/src/messages/org/opends/messages/backend.properties
@@ -1002,7 +1002,7 @@
NOTE_IMPORT_LDIF_INDEX_STARTED_523=Index %s phase two started processing \
%d buffers in %d batches
NOTE_IMPORT_LDIF_PHASE_TWO_REPORT_525=Index %s %d%% complete: \
- remaining = %d kb, rate = %d kb/s; batch %d/%d
+ remaining = %d KB, rate = %d KB/s; batch %d/%d
NOTE_IMPORT_LDIF_ROOTCONTAINER_CLOSE_526=Import LDIF environment close \
took %d seconds
NOTE_IMPORT_LDIF_TOT_MEM_BUF_528=The amount of free memory available to \
@@ -1078,8 +1078,8 @@
children for DN <%s> (got %d, expecting %d)
ERR_VERIFY_ID2COUNT_WRONG_ID_597=File id2ChildrenCount references non-existing EntryID <%d>.
NOTE_REBUILD_NOTHING_TO_REBUILD_598=Rebuilding index finished: no indexes to rebuild.
-NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO_599=Setting DB cache size to %d bytes. \
- Using %d Mb off-heap memory through %d phase one buffers of %d Kb.
+NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO_599=Setting DB cache size to %d MB. \
+ Using %d mb off-heap memory through %d phase one buffers of %d KB.
ERR_SCHEMA_PARSE_LINE_600=Ignoring schema definition '%s' because the following error occurred while \
it was being parsed: %s
ERR_SCHEMA_COULD_NOT_PARSE_DEFINITION_601=Schema definition could not be parsed as valid attribute value
diff --git a/opendj-server-legacy/src/messages/org/opends/messages/tool.properties b/opendj-server-legacy/src/messages/org/opends/messages/tool.properties
index 35804bf..123bbff 100644
--- a/opendj-server-legacy/src/messages/org/opends/messages/tool.properties
+++ b/opendj-server-legacy/src/messages/org/opends/messages/tool.properties
@@ -2453,14 +2453,9 @@
INFO_INDEX_NAME_PLACEHOLDER_1894={indexName}
INFO_DESCRIPTION_BACKEND_DEBUG_RAW_DB_NAME_1895=The raw database name
INFO_CHANGE_NUMBER_PLACEHOLDER_1896={change number}
-ERR_LDIFIMPORT_CANNOT_PARSE_OFFHEAP_SIZE_1897=The value %s for \
-offHeapSize cannot be parsed: %s
-INFO_LDIFIMPORT_DESCRIPTION_OFFHEAP_SIZE_1898=Size expressed in megabytes of the off-heap memory dedicated to the \
-phase one buffers.
-INFO_LDIFIMPORT_OFFHEAP_SIZE_PLACEHOLDER_1899={size in megabytes}
-ERR_CANNOT_INITIALIZE_BACKENDS_1900=An error occurred while initializing server backends: %s
-ERR_CANNOT_INITIALIZE_SERVER_PLUGINS_1901=An error occurred while initializing plugins: %s
-ERR_CANNOT_SUBSYSTEM_NOT_INITIALIZED_1902=Subsystem %s should be initialized first
+ERR_CANNOT_INITIALIZE_BACKENDS_1897=An error occurred while initializing server backends: %s
+ERR_CANNOT_INITIALIZE_SERVER_PLUGINS_1898=An error occurred while initializing plugins: %s
+ERR_CANNOT_SUBSYSTEM_NOT_INITIALIZED_1899=Subsystem %s should be initialized first
# Upgrade tasks
INFO_UPGRADE_TASK_6869_SUMMARY_10000=Fixing de-DE collation matching rule OID
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
index 4cdfafd..65b6101 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -19,7 +19,7 @@
import static org.opends.server.backends.pluggable.EntryIDSet.*;
import java.io.File;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
@@ -42,10 +42,8 @@
import org.mockito.Mockito;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
-import org.opends.server.backends.pluggable.OnDiskMergeImporter.Buffer;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool;
-import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.HeapBuffer;
-import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.OffHeapBuffer;
+import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.MemoryBuffer;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Chunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Collector;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.EntryIDSetsCollector;
@@ -69,35 +67,29 @@
public class OnDiskMergeImporterTest extends DirectoryServerTestCase
{
@Test
- public void testHeapBuffer() throws IOException
+ public void testHeapBuffer()
{
- try(Buffer buffer = new HeapBuffer(1024))
- {
- testBufferImplementation(buffer);
- }
+ testBufferImplementation(new MemoryBuffer(ByteBuffer.allocate(1024)));
}
@Test
- public void testOffHeapBuffer() throws IOException
+ public void testOffHeapBuffer()
{
- if (BufferPool.SUPPORTS_OFF_HEAP)
- {
- try (Buffer buffer = new OffHeapBuffer(1024))
- {
- testBufferImplementation(buffer);
- }
- }
+ testBufferImplementation(new MemoryBuffer(ByteBuffer.allocateDirect(1024)));
}
- private static void testBufferImplementation(Buffer buffer)
+ private static void testBufferImplementation(MemoryBuffer buffer)
{
- final ByteString binary = ByteString.valueOfBytes(new byte[] { 1, 2, 3, 4 });
+ final ByteString binary = ByteString.valueOfBytes(new byte[] { 1, 2, 3, 4, 1 });
buffer.writeByteSequence(0, binary);
- buffer.writeInt(4, 1234);
+ buffer.writeInt(5, 1234);
- assertThat(buffer.readByteString(0, 4)).isEqualTo(binary);
- assertThat(buffer.readInt(4)).isEqualTo(1234);
+ assertThat(buffer.readByteString(0, 5)).isEqualTo(binary);
+ assertThat(buffer.readInt(5)).isEqualTo(1234);
+ assertThat(buffer.compare(0, 1, 2, 1)).isLessThan(0);
+ assertThat(buffer.compare(0, 1, 4, 1)).isEqualTo(0);
+ assertThat(buffer.compare(1, 1, 0, 1)).isGreaterThan(0);
}
@Test
@@ -239,7 +231,7 @@
@Test
public void testInMemorySortedChunkSortUnsignedOnFlip() throws Exception
{
- try(final BufferPool bufferPool = new BufferPool(1, 1024)) {
+ try(final BufferPool bufferPool = new BufferPool(1, 1024, false)) {
final Chunk chunk = new InMemorySortedChunk("test", bufferPool);
populate(chunk, content(new String[][] {
{ new String(new byte[] { (byte) 0xFF }), "value0xFF" },
@@ -313,7 +305,7 @@
final int NB_REGION = 10;
final ByteString KEY = ByteString.valueOfUtf8("key");
final File tempDir = TestCaseUtils.createTemporaryDirectory("testExternalSortChunk");
- try (final BufferPool bufferPool = new BufferPool(2, 4 + 4 + KEY.length() + 4 + 4))
+ try (final BufferPool bufferPool = new BufferPool(2, 4 + 4 + KEY.length() + 4 + 4, false))
{
// 4: record offset, 4: key length, 4: value length, 4: value
final ExternalSortChunk chunk =
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java
index eabaa9a..cb63949 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java
@@ -955,7 +955,6 @@
importConf.setIncludeBranches(Collections.singleton(testBaseDN));
importConf.setSkipDNValidation(true);
importConf.setThreadCount(0);
- importConf.setOffHeapSize(0); // Force heap buffer for automatic buffer scaling.
backend.importLDIF(importConf, DirectoryServer.getInstance().getServerContext());
}
assertEquals(rejectedEntries.size(), 0,
--
Gitblit v1.10.0