| | |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.lang.reflect.Field; |
| | | import java.nio.ByteBuffer; |
| | | import java.nio.MappedByteBuffer; |
| | | import java.nio.channels.FileChannel; |
| | |
| | | import org.opends.server.backends.pluggable.CursorTransformer.SequentialCursorAdapter; |
| | | import org.opends.server.backends.pluggable.DN2ID.TreeVisitor; |
| | | import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation; |
| | | import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk; |
| | | import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool.MemoryBuffer; |
| | | import org.opends.server.backends.pluggable.spi.Cursor; |
| | | import org.opends.server.backends.pluggable.spi.Importer; |
| | | import org.opends.server.backends.pluggable.spi.ReadOperation; |
| | |
| | | import com.forgerock.opendj.util.OperatingSystem; |
| | | import com.forgerock.opendj.util.PackedLong; |
| | | |
| | | // @Checkstyle:ignore |
| | | import sun.misc.Unsafe; |
| | | import net.jcip.annotations.NotThreadSafe; |
| | | |
| | | /** |
| | | * Imports LDIF data contained in files into the database. Because of the B-Tree structure used in backend, import is |
| | |
| | | public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception |
| | | { |
| | | final int threadCount = |
| | | importConfig.getThreadCount() == 0 ? Runtime.getRuntime().availableProcessors() |
| | | : importConfig.getThreadCount(); |
| | | importConfig.getThreadCount() == 0 ? getDefaultNumberOfThread() : importConfig.getThreadCount(); |
| | | final int indexCount = getIndexCount(); |
| | | |
| | | final int nbBuffer = threadCount * indexCount * 2; |
| | | final int bufferSize; |
| | | if (BufferPool.SUPPORTS_OFF_HEAP && importConfig.getOffHeapSize() > 0) |
| | | { |
| | | final long offHeapSize = importConfig.getOffHeapSize(); |
| | | bufferSize = (int) ((offHeapSize * MB) / nbBuffer); |
| | | if (bufferSize < MIN_BUFFER_SIZE) |
| | | { |
| | | // Not enough memory. |
| | | throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapSize * MB, nbBuffer * MIN_BUFFER_SIZE)); |
| | | } |
| | | logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE, offHeapSize, nbBuffer, bufferSize / KB); |
| | | } |
| | | else |
| | | { |
| | | bufferSize = computeBufferSize(nbBuffer); |
| | | logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize); |
| | | } |
| | | final int nbRequiredBuffers = threadCount * indexCount * 2; |
| | | logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION); |
| | | logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount); |
| | | |
| | | final long startTime = System.currentTimeMillis(); |
| | | final OnDiskMergeImporter importer; |
| | | final ExecutorService sorter = Executors.newFixedThreadPool( |
| | | Runtime.getRuntime().availableProcessors(), |
| | | newThreadFactory(null, SORTER_THREAD_NAME, true)); |
| | | final ExecutorService sorter = |
| | | Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true)); |
| | | final LDIFReaderSource source = |
| | | new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount); |
| | | final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory()); |
| | | try (final Importer dbStorage = rootContainer.getStorage().startImport(); |
| | | final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize)) |
| | | final BufferPool bufferPool = newBufferPool(nbRequiredBuffers)) |
| | | { |
| | | final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers(); |
| | | final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation() |
| | |
| | | .getEntriesIgnored()); |
| | | } |
| | | |
| | | private static int getDefaultNumberOfThread() |
| | | { |
| | | final int nbProcessors = Runtime.getRuntime().availableProcessors(); |
| | | return Math.max(2, DirectoryServer.isRunning() ? nbProcessors / 2 : nbProcessors); |
| | | } |
| | | |
| | | private int getIndexCount() throws ConfigException |
| | | { |
| | | int indexCount = 2; // dn2id, dn2uri |
| | |
| | | return; |
| | | } |
| | | rootContainer.getStorage().close(); |
| | | final int threadCount = Runtime.getRuntime().availableProcessors(); |
| | | final int threadCount = getDefaultNumberOfThread(); |
| | | final int nbBuffer = 2 * indexesToRebuild.size() * threadCount; |
| | | final int bufferSize; |
| | | if (BufferPool.SUPPORTS_OFF_HEAP) |
| | | { |
| | | bufferSize = MAX_BUFFER_SIZE; |
| | | logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, |
| | | DB_CACHE_SIZE, (((long) bufferSize) * nbBuffer) / MB, nbBuffer, bufferSize / KB); |
| | | } |
| | | else |
| | | { |
| | | bufferSize = computeBufferSize(nbBuffer); |
| | | logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize); |
| | | } |
| | | |
| | | final ExecutorService sorter = Executors.newFixedThreadPool( |
| | | Runtime.getRuntime().availableProcessors(), |
| | | newThreadFactory(null, SORTER_THREAD_NAME, true)); |
| | | final ExecutorService sorter = |
| | | Executors.newFixedThreadPool(threadCount, newThreadFactory(null, SORTER_THREAD_NAME, true)); |
| | | |
| | | final OnDiskMergeImporter importer; |
| | | final File tempDir = prepareTempDir(backendCfg, tmpDirectory); |
| | | try (final Importer dbStorage = rootContainer.getStorage().startImport(); |
| | | final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize)) |
| | | final BufferPool bufferPool = newBufferPool(nbBuffer)) |
| | | { |
| | | final AbstractTwoPhaseImportStrategy strategy = new RebuildIndexStrategy( |
| | | rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild); |
| | |
| | | logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate); |
| | | } |
| | | |
| | | public BufferPool newBufferPool(int nbBuffers) throws InitializationException |
| | | { |
| | | // Try off-heap direct buffer |
| | | int bufferSize = MAX_BUFFER_SIZE; |
| | | do |
| | | { |
| | | try |
| | | { |
| | | final BufferPool pool = new BufferPool(nbBuffers, bufferSize, true); |
| | | logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, |
| | | DB_CACHE_SIZE / MB, (((long) bufferSize) * nbBuffers) / MB, nbBuffers, bufferSize / KB); |
| | | return pool; |
| | | } |
| | | catch (OutOfMemoryError e) |
| | | { |
| | | bufferSize /= 2; |
| | | } |
| | | } |
| | | while (bufferSize > MIN_BUFFER_SIZE); |
| | | |
| | | // Off-line mode or direct memory allocation failed. |
| | | final long availableMemory = calculateAvailableHeapMemoryForBuffers(); |
| | | logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffers); |
| | | long minimumRequiredMemory = nbBuffers * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY; |
| | | if (availableMemory < minimumRequiredMemory) |
| | | { |
| | | // Not enough memory. |
| | | throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory)); |
| | | } |
| | | bufferSize = (int) (availableMemory / nbBuffers); |
| | | if (DirectoryServer.isRunning() && bufferSize > MAX_BUFFER_SIZE) |
| | | { |
| | | bufferSize = MAX_BUFFER_SIZE; |
| | | } |
| | | logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize); |
| | | return new BufferPool(nbBuffers, bufferSize, false); |
| | | } |
| | | |
| | | private static final Set<String> selectIndexesToRebuild(EntryContainer entryContainer, RebuildConfig rebuildConfig, |
| | | long totalEntries) throws InitializationException |
| | | { |
| | |
| | | return tempDir; |
| | | } |
| | | |
| | | private int computeBufferSize(int nbBuffer) throws InitializationException |
| | | { |
| | | final long availableMemory = calculateAvailableHeapMemoryForBuffers(); |
| | | logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer); |
| | | |
| | | final long minimumRequiredMemory = nbBuffer * MIN_BUFFER_SIZE + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY; |
| | | if (availableMemory < minimumRequiredMemory) |
| | | { |
| | | // Not enough memory. |
| | | throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, minimumRequiredMemory)); |
| | | } |
| | | return Math.min((int) (availableMemory / nbBuffer), MAX_BUFFER_SIZE); |
| | | } |
| | | |
| | | /** |
| | | * Calculates the amount of available memory which can be used by this import, taking into account whether |
| | | * the import is running offline or online as a task. |
| | |
| | | else |
| | | { |
| | | // Offline import/rebuild. |
| | | // call twice gc to ensure finalizers are called |
| | | // and young to old gen references are properly gc'd |
| | | Runtime.getRuntime().gc(); |
| | | Runtime.getRuntime().gc(); |
| | | |
| | | totalAvailableMemory = Platform.getUsableMemoryForCaching(); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** Max size of phase one buffer. */ |
| | | private static final int MAX_BUFFER_SIZE = 2 * MB; |
| | | private static final int MAX_BUFFER_SIZE = 4 * MB; |
| | | /** Min size of phase one buffer. */ |
| | | private static final int MIN_BUFFER_SIZE = 4 * KB; |
| | | /** DB cache size to use during import. */ |
| | |
| | | { |
| | | private final String metricName; |
| | | private final BufferPool bufferPool; |
| | | private final Buffer buffer; |
| | | private final MemoryBuffer buffer; |
| | | private long totalBytes; |
| | | private int indexPos; |
| | | private int dataPos; |
| | |
| | | } |
| | | } |
| | | |
| | | /** Buffer used by {@link InMemorySortedChunk} to store and sort data. */ |
| | | interface Buffer extends Closeable |
| | | { |
| | | void writeInt(int position, int value); |
| | | |
| | | int readInt(int position); |
| | | |
| | | ByteString readByteString(int position, int length); |
| | | |
| | | void writeByteSequence(int position, ByteSequence data); |
| | | |
| | | int length(); |
| | | |
| | | int compare(int offsetA, int lengthA, int offsetB, int lengthB); |
| | | } |
| | | |
| | | /** |
| | | * Pre-allocate and maintain a fixed number of re-usable {@code Buffer}s. This allow to keep controls of heap memory |
| | | * consumption and prevents the significant object allocation cost occurring for huge objects. |
| | | */ |
| | | static final class BufferPool implements Closeable |
| | | { |
| | | private static final Object UNSAFE_OBJECT; |
| | | static final boolean SUPPORTS_OFF_HEAP; |
| | | static |
| | | { |
| | | Object unsafeObject = null; |
| | | try |
| | | { |
| | | final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe"); |
| | | final Field theUnsafeField = unsafeClass.getDeclaredField("theUnsafe"); |
| | | theUnsafeField.setAccessible(true); |
| | | unsafeObject = theUnsafeField.get(null); |
| | | } |
| | | catch (Throwable e) |
| | | { |
| | | // Unsupported. |
| | | } |
| | | UNSAFE_OBJECT = unsafeObject; |
| | | SUPPORTS_OFF_HEAP = UNSAFE_OBJECT != null; |
| | | } |
| | | private final BlockingQueue<MemoryBuffer> pool; |
| | | |
| | | private final BlockingQueue<Buffer> pool; |
| | | private final int bufferSize; |
| | | |
| | | BufferPool(int nbBuffer, int bufferSize) |
| | | BufferPool(int nbBuffer, int bufferSize, boolean allocateDirect) |
| | | { |
| | | this.pool = new ArrayBlockingQueue<>(nbBuffer); |
| | | this.bufferSize = bufferSize; |
| | | for (int i = 0; i < nbBuffer; i++) |
| | | { |
| | | pool.offer(SUPPORTS_OFF_HEAP ? new OffHeapBuffer(bufferSize) : new HeapBuffer(bufferSize)); |
| | | pool.offer(new MemoryBuffer(allocateDirect |
| | | ? ByteBuffer.allocateDirect(bufferSize) |
| | | : ByteBuffer.allocate(bufferSize))); |
| | | } |
| | | } |
| | | |
| | | public int getBufferSize() |
| | | { |
| | | return bufferSize; |
| | | } |
| | | |
| | | private Buffer get() |
| | | private MemoryBuffer get() |
| | | { |
| | | try |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private void release(Buffer buffer) |
| | | private void release(MemoryBuffer buffer) |
| | | { |
| | | try |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | public void setSize(int size) |
| | | { |
| | | while (pool.size() > size) |
| | | { |
| | | get(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | Buffer buffer; |
| | | while ((buffer = pool.poll()) != null) |
| | | { |
| | | closeSilently(buffer); |
| | | } |
| | | pool.clear(); |
| | | } |
| | | |
| | | /** Off-heap buffer using Unsafe memory access. */ |
| | | @SuppressWarnings("restriction") |
| | | static final class OffHeapBuffer implements Buffer |
| | | { |
| | | private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT; |
| | | private static final long BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); |
| | | |
| | | private final long address; |
| | | private final int size; |
| | | private int position; |
| | | private final OutputStream asOutputStream = new OutputStream() |
| | | { |
| | | @Override |
| | | public void write(int value) throws IOException |
| | | { |
| | | UNSAFE.putByte(address + position++, (byte) (value & 0xFF)); |
| | | } |
| | | |
| | | @Override |
| | | public void write(byte[] b) throws IOException { |
| | | UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET, null, address + position, b.length); |
| | | position += b.length; |
| | | } |
| | | |
| | | @Override |
| | | public void write(byte[] b, int off, int len) throws IOException { |
| | | UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET + off, null, address + position, len); |
| | | position += b.length; |
| | | } |
| | | }; |
| | | private boolean closed; |
| | | |
| | | OffHeapBuffer(int size) |
| | | { |
| | | this.size = size; |
| | | this.address = UNSAFE.allocateMemory(size); |
| | | } |
| | | |
| | | @Override |
| | | public void writeInt(final int position, final int value) |
| | | { |
| | | UNSAFE.putInt(address + position, value); |
| | | } |
| | | |
| | | @Override |
| | | public int readInt(final int position) |
| | | { |
| | | return UNSAFE.getInt(address + position); |
| | | } |
| | | |
| | | @Override |
| | | public void writeByteSequence(final int position, ByteSequence data) |
| | | { |
| | | Reject.ifFalse(position + data.length() <= size); |
| | | this.position = position; |
| | | try |
| | | { |
| | | data.copyTo(asOutputStream); |
| | | } |
| | | catch(IOException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public int length() |
| | | { |
| | | return size; |
| | | } |
| | | |
| | | @Override |
| | | public ByteString readByteString(int position, int length) |
| | | { |
| | | Reject.ifFalse(position + length <= size); |
| | | |
| | | final byte[] data = new byte[length]; |
| | | UNSAFE.copyMemory(null, address + position, data, BYTE_ARRAY_OFFSET, length); |
| | | return ByteString.wrap(data); |
| | | } |
| | | |
| | | @Override |
| | | public int compare(int offsetA, int lengthA, int offsetB, int lengthB) |
| | | { |
| | | final int len = Math.min(lengthA, lengthB); |
| | | for(int i = 0 ; i < len ; i++) |
| | | { |
| | | final int a = UNSAFE.getByte(address + offsetA + i) & 0xFF; |
| | | final int b = UNSAFE.getByte(address + offsetB + i) & 0xFF; |
| | | if ( a != b ) |
| | | { |
| | | return a - b; |
| | | } |
| | | } |
| | | return lengthA - lengthB; |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | if (!closed) |
| | | { |
| | | UNSAFE.freeMemory(address); |
| | | } |
| | | closed = true; |
| | | } |
| | | } |
| | | |
| | | /** Heap buffer using ByteBuffer. */ |
| | | static final class HeapBuffer implements Buffer |
| | | /** Buffer wrapping a ByteBuffer. */ |
| | | @NotThreadSafe |
| | | static final class MemoryBuffer |
| | | { |
| | | private final ByteBuffer buffer; |
| | | |
| | | HeapBuffer(int size) |
| | | MemoryBuffer(final ByteBuffer byteBuffer) |
| | | { |
| | | this.buffer = ByteBuffer.allocate(size); |
| | | this.buffer = byteBuffer; |
| | | } |
| | | |
| | | @Override |
| | | public void writeInt(final int position, final int value) |
| | | void writeInt(final int position, final int value) |
| | | { |
| | | buffer.putInt(position, value); |
| | | } |
| | | |
| | | @Override |
| | | public int readInt(final int position) |
| | | int readInt(final int position) |
| | | { |
| | | return buffer.getInt(position); |
| | | } |
| | | |
| | | @Override |
| | | public void writeByteSequence(int position, ByteSequence data) |
| | | void writeByteSequence(int position, ByteSequence data) |
| | | { |
| | | buffer.position(position); |
| | | data.copyTo(buffer); |
| | | } |
| | | |
| | | @Override |
| | | public int length() |
| | | int length() |
| | | { |
| | | return buffer.capacity(); |
| | | } |
| | | |
| | | @Override |
| | | public ByteString readByteString(int position, int length) |
| | | ByteString readByteString(int position, int length) |
| | | { |
| | | return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length); |
| | | if (buffer.hasArray()) |
| | | { |
| | | return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length); |
| | | } |
| | | final byte[] data = new byte[length]; |
| | | buffer.position(position); |
| | | buffer.get(data); |
| | | return ByteString.wrap(data); |
| | | } |
| | | |
| | | @Override |
| | | public int compare(int offsetA, int lengthA, int offsetB, int lengthB) |
| | | int compare(int offsetA, int lengthA, int offsetB, int lengthB) |
| | | { |
| | | return readByteString(offsetA, lengthA).compareTo(readByteString(offsetB, lengthB)); |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | // Nothing to do |
| | | int count = Math.min(lengthA, lengthB); |
| | | int i = offsetA; |
| | | int j = offsetB; |
| | | while (count-- != 0) |
| | | { |
| | | final int firstByte = 0xFF & buffer.get(i++); |
| | | final int secondByte = 0xFF & buffer.get(j++); |
| | | if (firstByte != secondByte) |
| | | { |
| | | return firstByte - secondByte; |
| | | } |
| | | } |
| | | return lengthA - lengthB; |
| | | } |
| | | } |
| | | } |