opendj-core/src/main/java/com/forgerock/opendj/util/PackedLong.java
New file @@ -0,0 +1,240 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2015 ForgeRock AS. */ package com.forgerock.opendj.util; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; /** * Provides static methods to manipulate compact long representation. Compact long allow to stores unsigned long values * up to 56 bits using a variable number of bytes from 1 to 8. The binary representations of this compact encoding has * the interesting properties of maintaining correct order of values when compared. */ public final class PackedLong { private PackedLong() { } private static final int[] DECODE_SIZE = new int[256]; static { Arrays.fill(DECODE_SIZE, 0, 0x80, 1); Arrays.fill(DECODE_SIZE, 0x80, 0xc0, 2); Arrays.fill(DECODE_SIZE, 0xc0, 0xe0, 3); Arrays.fill(DECODE_SIZE, 0xe0, 0xf0, 4); Arrays.fill(DECODE_SIZE, 0xf0, 0xf8, 5); Arrays.fill(DECODE_SIZE, 0xf8, 0xfc, 6); Arrays.fill(DECODE_SIZE, 0xfc, 0xfe, 7); Arrays.fill(DECODE_SIZE, 0xfe, 0x100, 8); } /** Maximum value that can be stored with a compacted representation. */ public static final long COMPACTED_MAX_VALUE = 0x00FFFFFFFFFFFFFFL; /** * Append the compact representation of the value into the {@link OutputStream}. * * @param os * {@link OutputStream} where the compact representation will be written. * @param value * Value to be encoded and written in the compact long format. * @throws IOException * if problem appear in the underlying {@link OutputStream} * @return Number of bytes which has been written in the buffer. */ public static int writeCompactUnsigned(OutputStream os, long value) throws IOException { final int size = getEncodedSize(value); switch (size) { case 1: os.write((int) value); break; case 2: os.write((int) ((value >>> 8) | 0x80L)); os.write((int) value); break; case 3: os.write((int) ((value >>> 16) | 0xc0L)); os.write((int) (value >>> 8)); os.write((int) (value)); break; case 4: os.write((int) ((value >>> 24) | 0xe0L)); os.write((int) (value >>> 16)); os.write((int) (value >>> 8)); os.write((int) (value)); break; case 5: os.write((int) ((value >>> 32) | 0xf0L)); os.write((int) (value >>> 24)); os.write((int) (value >>> 16)); os.write((int) (value >>> 8)); os.write((int) (value)); break; case 6: os.write((int) ((value >>> 40) | 0xf8L)); os.write((int) (value >>> 32)); os.write((int) (value >>> 24)); os.write((int) (value >>> 16)); os.write((int) (value >>> 8)); os.write((int) (value)); break; case 7: os.write((int) ((value >>> 48) | 0xfcL)); os.write((int) (value >>> 40)); os.write((int) (value >>> 32)); os.write((int) (value >>> 24)); os.write((int) (value >>> 16)); os.write((int) (value >>> 8)); os.write((int) (value)); break; case 8: os.write(0xfe); os.write((int) (value >>> 48)); os.write((int) (value >>> 40)); os.write((int) (value >>> 32)); os.write((int) (value >>> 24)); os.write((int) (value >>> 16)); os.write((int) (value >>> 8)); os.write((int) (value)); break; default: throw new IllegalArgumentException(); } return size; } /** * Get the number of bytes required to store the given value using the compact long representation. * * @param value * Value to get the compact representation's size. * @return Number of bytes required to store the compact long representation of the value. */ public static int getEncodedSize(long value) { if (value < 0x80L) { return 1; } else if (value < 0x4000L) { return 2; } else if (value < 0x200000L) { return 3; } else if (value < 0x10000000L) { return 4; } else if (value < 0x800000000L) { return 5; } else if (value < 0x40000000000L) { return 6; } else if (value < 0x2000000000000L) { return 7; } else if (value < 0x100000000000000L) { return 8; } else { throw new IllegalArgumentException("value out of range: " + value); } } /** * Decode and get the value of the compact long contained in the specified {@link InputStream}. * * @param is * Stream where to read the compact unsigned long * @return The long value. * @throws IOException * If the first byte cannot be read for any reason other than the end of the file, if the input stream * has been closed, or if some other I/O error occurs. */ public static long readCompactUnsignedLong(InputStream is) throws IOException { final int b0 = checkNotEndOfStream(is.read()); final int size = decodeSize(b0); long value; switch (size) { case 1: value = b2l((byte) b0); break; case 2: value = (b0 & 0x3fL) << 8; value |= checkNotEndOfStream(is.read()); break; case 3: value = (b0 & 0x1fL) << 16; value |= ((long) checkNotEndOfStream(is.read())) << 8; value |= checkNotEndOfStream(is.read()); break; case 4: value = (b0 & 0x0fL) << 24; value |= ((long) checkNotEndOfStream(is.read())) << 16; value |= ((long) checkNotEndOfStream(is.read())) << 8; value |= is.read(); break; case 5: value = (b0 & 0x07L) << 32; value |= ((long) checkNotEndOfStream(is.read())) << 24; value |= ((long) checkNotEndOfStream(is.read())) << 16; value |= ((long) checkNotEndOfStream(is.read())) << 8; value |= (is.read()); break; case 6: value = (b0 & 0x03L) << 40; value |= ((long) checkNotEndOfStream(is.read())) << 32; value |= ((long) checkNotEndOfStream(is.read())) << 24; value |= ((long) checkNotEndOfStream(is.read())) << 16; value |= ((long) checkNotEndOfStream(is.read())) << 8; value |= is.read(); break; case 7: value = (b0 & 0x01L) << 48; value |= ((long) checkNotEndOfStream(is.read())) << 40; value |= ((long) checkNotEndOfStream(is.read())) << 32; value |= ((long) checkNotEndOfStream(is.read())) << 24; value |= ((long) checkNotEndOfStream(is.read())) << 16; value |= ((long) checkNotEndOfStream(is.read())) << 8; value |= is.read(); break; default: value = ((long) checkNotEndOfStream(is.read())) << 48; value |= ((long) checkNotEndOfStream(is.read())) << 40; value |= ((long) checkNotEndOfStream(is.read())) << 32; value |= ((long) checkNotEndOfStream(is.read())) << 24; value |= ((long) checkNotEndOfStream(is.read())) << 16; value |= ((long) checkNotEndOfStream(is.read())) << 8; value |= is.read(); } return value; } private static int checkNotEndOfStream(final int byteValue) { if (byteValue == -1) { throw new IllegalArgumentException("End of stream reached."); } return byteValue; } private static int decodeSize(int b) { return DECODE_SIZE[b & 0xff]; } private static long b2l(final byte b) { return b & 0xffL; } } opendj-core/src/main/java/org/forgerock/opendj/ldap/AVA.java
@@ -840,6 +840,7 @@ .onMalformedInput(CodingErrorAction.REPORT) .onUnmappableCharacter(CodingErrorAction.REPORT); if (value.copyTo(buffer, decoder)) { buffer.flip(); try { // URL encoding encodes space char as '+' instead of using hex code final String val = URLEncoder.encode(buffer.toString(), "UTF-8").replaceAll("\\+", "%20"); opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequence.java
@@ -193,7 +193,8 @@ ByteStringBuilder copyTo(ByteStringBuilder builder); /** * Appends the content of this byte sequence to the provided {@link ByteBuffer}. * Appends the content of this byte sequence to the provided {@link ByteBuffer} starting at it's current position. * The position of the buffer is then incremented by the length of this sequence. * * @param buffer * The buffer to copy to. @@ -205,8 +206,9 @@ ByteBuffer copyTo(ByteBuffer buffer); /** * Appends the content of this byte sequence decoded using provided charset decoder, * to the provided {@link CharBuffer}. * Appends the content of this byte sequence decoded using provided charset decoder to the provided * {@link CharBuffer} starting at it's current position. The position of charBuffer is then incremented by the * length of this sequence. * * @param charBuffer * The buffer to copy to, if decoding is successful. opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteSequenceReader.java
@@ -22,11 +22,14 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions copyright 2012-2014 ForgeRock AS. * Portions copyright 2012-2015 ForgeRock AS. */ package org.forgerock.opendj.ldap; import java.util.Arrays; import java.io.IOException; import java.io.InputStream; import com.forgerock.opendj.util.PackedLong; /** * An interface for iteratively reading data from a {@link ByteSequence} . @@ -35,18 +38,6 @@ */ public final class ByteSequenceReader { private static final int[] DECODE_SIZE = new int[256]; static { Arrays.fill(DECODE_SIZE, 0, 0x80, 1); Arrays.fill(DECODE_SIZE, 0x80, 0xc0, 2); Arrays.fill(DECODE_SIZE, 0xc0, 0xe0, 3); Arrays.fill(DECODE_SIZE, 0xe0, 0xf0, 4); Arrays.fill(DECODE_SIZE, 0xf0, 0xf8, 5); Arrays.fill(DECODE_SIZE, 0xf8, 0xfc, 6); Arrays.fill(DECODE_SIZE, 0xfc, 0xfe, 7); Arrays.fill(DECODE_SIZE, 0xfe, 0x100, 8); } /** The current position in the byte sequence. */ private int pos; @@ -54,6 +45,12 @@ private final ByteSequence sequence; /** * The lazily allocated input stream view of this reader. Synchronization is not necessary because the stream is * stateless and race conditions can be tolerated. */ private InputStream inputStream; /** * Creates a new byte sequence reader whose source is the provided byte * sequence. * <p> @@ -329,70 +326,11 @@ * required to satisfy the request. */ public long getCompactUnsigned() { final int b0 = get(); final int size = decodeSize(b0); long value; switch (size) { case 1: value = b2l((byte) b0); break; case 2: value = (b0 & 0x3fL) << 8; value |= b2l(get()); break; case 3: value = (b0 & 0x1fL) << 16; value |= b2l(get()) << 8; value |= b2l(get()); break; case 4: value = (b0 & 0x0fL) << 24; value |= b2l(get()) << 16; value |= b2l(get()) << 8; value |= b2l(get()); break; case 5: value = (b0 & 0x07L) << 32; value |= b2l(get()) << 24; value |= b2l(get()) << 16; value |= b2l(get()) << 8; value |= b2l(get()); break; case 6: value = (b0 & 0x03L) << 40; value |= b2l(get()) << 32; value |= b2l(get()) << 24; value |= b2l(get()) << 16; value |= b2l(get()) << 8; value |= b2l(get()); break; case 7: value = (b0 & 0x01L) << 48; value |= b2l(get()) << 40; value |= b2l(get()) << 32; value |= b2l(get()) << 24; value |= b2l(get()) << 16; value |= b2l(get()) << 8; value |= b2l(get()); break; default: value = b2l(get()) << 48; value |= b2l(get()) << 40; value |= b2l(get()) << 32; value |= b2l(get()) << 24; value |= b2l(get()) << 16; value |= b2l(get()) << 8; value |= b2l(get()); try { return PackedLong.readCompactUnsignedLong(asInputStream()); } catch (IOException e) { throw new IllegalStateException(e); } return value; } private static long b2l(final byte b) { return b & 0xffL; } private static int decodeSize(int b) { return DECODE_SIZE[b & 0xff]; } /** @@ -564,4 +502,19 @@ public String toString() { return sequence.toString(); } private InputStream asInputStream() { if (inputStream == null) { inputStream = new InputStream() { @Override public int read() throws IOException { if (pos >= sequence.length()) { return -1; } return sequence.byteAt(pos++) & 0xFF; } }; } return inputStream; } } opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteString.java
@@ -659,7 +659,6 @@ @Override public ByteBuffer copyTo(final ByteBuffer byteBuffer) { byteBuffer.put(buffer, offset, length); byteBuffer.flip(); return byteBuffer; } @@ -686,7 +685,6 @@ static boolean copyTo(ByteBuffer inBuffer, CharBuffer outBuffer, CharsetDecoder decoder) { final CoderResult result = decoder.decode(inBuffer, outBuffer, true); decoder.flush(outBuffer); outBuffer.flip(); return !result.isError() && !result.isOverflow(); } opendj-core/src/main/java/org/forgerock/opendj/ldap/ByteStringBuilder.java
@@ -40,16 +40,13 @@ import org.forgerock.util.Reject; import com.forgerock.opendj.util.PackedLong; /** * A mutable sequence of bytes backed by a byte array. */ public final class ByteStringBuilder implements ByteSequence { /** * Maximum value that can be stored with a compacted representation. */ public static final long COMPACTED_MAX_VALUE = 0xFFFFFFFFFFFFFFL; /** Output stream implementation. */ private final class OutputStreamImpl extends OutputStream { @Override @@ -159,7 +156,6 @@ @Override public ByteBuffer copyTo(final ByteBuffer byteBuffer) { byteBuffer.put(buffer, subOffset, subLength); byteBuffer.flip(); return byteBuffer; } @@ -603,92 +599,14 @@ */ public ByteStringBuilder appendCompactUnsigned(long value) { Reject.ifFalse(value >= 0, "value must be >= 0"); final int size = getEncodedSize(value); ensureAdditionalCapacity(size); switch (size) { case 1: buffer[length++] = (byte) value; break; case 2: buffer[length++] = (byte) ((value >>> 8) | 0x80L); buffer[length++] = l2b(value); break; case 3: buffer[length++] = (byte) ((value >>> 16) | 0xc0L); buffer[length++] = l2b(value >>> 8); buffer[length++] = l2b(value); break; case 4: buffer[length++] = (byte) ((value >>> 24) | 0xe0L); buffer[length++] = l2b(value >>> 16); buffer[length++] = l2b(value >>> 8); buffer[length++] = l2b(value); break; case 5: buffer[length++] = (byte) ((value >>> 32) | 0xf0L); buffer[length++] = l2b(value >>> 24); buffer[length++] = l2b(value >>> 16); buffer[length++] = l2b(value >>> 8); buffer[length++] = l2b(value); break; case 6: buffer[length++] = (byte) ((value >>> 40) | 0xf8L); buffer[length++] = l2b(value >>> 32); buffer[length++] = l2b(value >>> 24); buffer[length++] = l2b(value >>> 16); buffer[length++] = l2b(value >>> 8); buffer[length++] = l2b(value); break; case 7: buffer[length++] = (byte) ((value >>> 48) | 0xfcL); buffer[length++] = l2b(value >>> 40); buffer[length++] = l2b(value >>> 32); buffer[length++] = l2b(value >>> 24); buffer[length++] = l2b(value >>> 16); buffer[length++] = l2b(value >>> 8); buffer[length++] = l2b(value); break; default: buffer[length++] = (byte) 0xfe; buffer[length++] = l2b(value >>> 48); buffer[length++] = l2b(value >>> 40); buffer[length++] = l2b(value >>> 32); buffer[length++] = l2b(value >>> 24); buffer[length++] = l2b(value >>> 16); buffer[length++] = l2b(value >>> 8); buffer[length++] = l2b(value); break; try { PackedLong.writeCompactUnsigned(asOutputStream(), value); } catch (IOException e) { throw new IllegalStateException(e); } return this; } private static int getEncodedSize(long value) { if (value < 0x80L) { return 1; } else if (value < 0x4000L) { return 2; } else if (value < 0x200000L) { return 3; } else if (value < 0x10000000L) { return 4; } else if (value < 0x800000000L) { return 5; } else if (value < 0x40000000000L) { return 6; } else if (value < 0x2000000000000L) { return 7; } else if (value < 0x100000000000000L) { return 8; } else { throw new IllegalArgumentException("value out of range: " + value); } } private static byte l2b(long value) { return (byte) (value & 0xffL); } /** * Appends the byte string representation of the provided object to this * byte string builder. The object is converted to a byte string as follows: @@ -970,7 +888,6 @@ @Override public ByteBuffer copyTo(final ByteBuffer byteBuffer) { byteBuffer.put(buffer, 0, length); byteBuffer.flip(); return byteBuffer; } opendj-core/src/main/java/org/forgerock/opendj/ldap/Functions.java
@@ -22,7 +22,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2012-2014 ForgeRock AS. * Portions copyright 2012-2015 ForgeRock AS. */ package org.forgerock.opendj.ldap; @@ -137,6 +137,26 @@ byteStringToString(), STRING_TO_LONG); /** * Creates a function that returns constant value for any input. * * @param <M> * The type of input values transformed by this function. * @param <N> * The type of output values returned by this function. * @param constant * The constant value for the function to return * @return A function that always returns constant value. */ public static <M, N> Function<M, N, NeverThrowsException> returns(final N constant) { return new Function<M, N, NeverThrowsException>() { @Override public N apply(M value) { return constant; } }; } /** * Returns the composition of two functions. The result of the first * function will be passed to the second. * opendj-core/src/main/java/org/forgerock/opendj/ldap/schema/ObjectIdentifierEqualityMatchingRuleImpl.java
@@ -54,7 +54,7 @@ return oidOrName; } // Do a best effort attempt to normalize names to OIDs. final String lowerCaseName = StaticUtils.toLowerCase(oidOrName.toLowerCase()); final String lowerCaseName = StaticUtils.toLowerCase(oidOrName); try { final String oid = schema.getOIDForName(lowerCaseName); if (oid != null) { opendj-core/src/test/java/org/forgerock/opendj/ldap/ByteStringBuilderTestCase.java
@@ -22,7 +22,7 @@ * * * Copyright 2010 Sun Microsystems, Inc. * Portions copyright 2011-2014 ForgeRock AS * Portions copyright 2011-2015 ForgeRock AS */ package org.forgerock.opendj.ldap; @@ -94,13 +94,18 @@ @DataProvider public Object[][] unsignedLongValues() throws Exception { return new Object[][] { { 0 }, { 0x80L }, { 0x81L }, { 0x4000L }, { 0x4001L }, { 0x200000L }, { 0x200001L }, { 0x10000000L }, { 0x10000001L }, { 0x800000000L }, { 0x800000001L }, { 0x40000000000L }, { 0x40000000001L }, { 0x2000000000000L }, { 0x2000000000001L }, { 0x00FFFFFFFFFFFFFFL } { 0 }, { 0x80L }, { 0x81L }, { 0x4000L }, { 0x4001L }, { 0x200000L }, { 0x200001L }, { 0x10000000L }, { 0x10000001L }, { 0x800000000L }, { 0x800000001L }, { 0x40000000000L }, { 0x40000000001L }, { 0x2000000000000L }, { 0x2000000000001L }, { 0x00FFFFFFFFFFFFFFL } }; } @Test(expectedExceptions = IndexOutOfBoundsException.class) public void testAppendBadByteBufferLength1() { new ByteStringBuilder().append(ByteBuffer.wrap(new byte[5]), -1); opendj-core/src/test/java/org/forgerock/opendj/ldap/ByteStringTestCase.java
@@ -286,6 +286,7 @@ final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); boolean isCopied = byteString.copyTo(buffer, decoder); buffer.flip(); assertThat(isCopied).isTrue(); assertThat(buffer.toString()).isEqualTo(value); @@ -299,6 +300,7 @@ final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); boolean isCopied = byteString.copyTo(buffer, decoder); buffer.flip(); assertThat(isCopied).isFalse(); } @@ -310,6 +312,7 @@ ByteBuffer buffer = ByteBuffer.allocate(value.length()); byteString.copyTo(buffer); buffer.flip(); assertSameByteContent(buffer, byteString); } opendj-core/src/test/java/org/forgerock/opendj/ldap/PackedLongTestCase.java
New file @@ -0,0 +1,68 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2015 ForgeRock AS */ package org.forgerock.opendj.ldap; import static org.fest.assertions.Assertions.*; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.forgerock.opendj.util.PackedLong; /** * Test case for PackedLong. */ @Test(groups = "unit") public class PackedLongTestCase extends SdkTestCase { @Test(dataProvider = "unsignedLongValues") public void testCanReadWriteByteArray(int size, long value) throws IOException { final ByteArrayOutputStream os = new ByteArrayOutputStream(); PackedLong.writeCompactUnsigned(os, value); assertThat(os.size()).isEqualTo(size); assertThat(PackedLong.getEncodedSize(value)).isEqualTo(size); assertThat(PackedLong.readCompactUnsignedLong(new ByteArrayInputStream(os.toByteArray()))).isEqualTo(value); } @DataProvider private static Object[][] unsignedLongValues() { return new Object[][] { { 1, 0 }, { 1, 0x7FL }, { 2, 0x80L }, { 2, 0x3FFFL }, { 3, 0x4000L }, { 3, 0x1FFFFFL }, { 4, 0x200000L }, { 4, 0x0FFFFFFFL }, { 5, 0x10000000L }, { 5, 0x7FFFFFFFFL }, { 6, 0x800000000L }, { 6, 0x3FFFFFFFFFFL }, { 7, 0x40000000000L }, { 7, 0x1FFFFFFFFFFFFL }, { 8, 0x2000000000000L }, { 8, 0x00FFFFFFFFFFFFFFL } }; } } opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
@@ -27,7 +27,6 @@ import static com.persistit.Transaction.CommitPolicy.*; import static java.util.Arrays.*; import static org.opends.messages.BackendMessages.*; import static org.opends.messages.ConfigMessages.*; import static org.opends.messages.UtilityMessages.*; @@ -69,6 +68,7 @@ import org.opends.server.backends.pluggable.spi.Importer; import org.opends.server.backends.pluggable.spi.ReadOnlyStorageException; import org.opends.server.backends.pluggable.spi.ReadOperation; import org.opends.server.backends.pluggable.spi.SequentialCursor; import org.opends.server.backends.pluggable.spi.Storage; import org.opends.server.backends.pluggable.spi.StorageInUseException; import org.opends.server.backends.pluggable.spi.StorageRuntimeException; @@ -94,7 +94,6 @@ import com.persistit.Key; import com.persistit.Persistit; import com.persistit.Transaction; import com.persistit.Tree; import com.persistit.Value; import com.persistit.Volume; import com.persistit.VolumeSpecification; @@ -102,12 +101,15 @@ import com.persistit.exception.PersistitException; import com.persistit.exception.RollbackException; import com.persistit.exception.TreeNotFoundException; import com.persistit.mxbeans.CheckpointManagerMXBean; /** PersistIt database implementation of the {@link Storage} engine. */ @SuppressWarnings("javadoc") public final class PDBStorage implements Storage, Backupable, ConfigurationChangeListener<PDBBackendCfg>, DiskSpaceMonitorHandler { private static final int IMPORT_DB_CACHE_SIZE = 4 * MB; private static final double MAX_SLEEP_ON_RETRY_MS = 50.0; private static final String VOLUME_NAME = "dj"; @@ -263,7 +265,6 @@ /** PersistIt implementation of the {@link Importer} interface. */ private final class ImporterImpl implements Importer { private final Map<TreeName, Tree> trees = new HashMap<>(); private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>(); private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>() { @@ -291,17 +292,60 @@ } @Override public void createTree(final TreeName treeName) public void clearTree(final TreeName treeName) { final Transaction txn = db.getTransaction(); deleteTree(txn, treeName); createTree(txn, treeName); } private void createTree(final Transaction txn, final TreeName treeName) { Exchange ex = null; try { final Tree tree = volume.getTree(mangleTreeName(treeName), true); trees.put(treeName, tree); txn.begin(); ex = getNewExchange(treeName, true); txn.commit(); } catch (final PersistitException e) catch (PersistitException e) { throw new StorageRuntimeException(e); } finally { txn.end(); releaseExchangeSilenty(ex); } } private void deleteTree(Transaction txn, final TreeName treeName) { Exchange ex = null; try { txn.begin(); ex = getNewExchange(treeName, true); ex.removeTree(); txn.commit(); } catch (PersistitException e) { throw new StorageRuntimeException(e); } finally { txn.end(); releaseExchangeSilenty(ex); } } private void releaseExchangeSilenty(Exchange ex) { if ( ex != null) { db.releaseExchange(ex); } } @Override @@ -321,21 +365,6 @@ } @Override public boolean delete(final TreeName treeName, final ByteSequence key) { try { final Exchange ex = getExchangeFromCache(treeName); bytesToKey(ex.getKey(), key); return ex.remove(); } catch (final PersistitException e) { throw new StorageRuntimeException(e); } } @Override public ByteString read(final TreeName treeName, final ByteSequence key) { try @@ -362,6 +391,19 @@ } return exchange; } @Override public SequentialCursor<ByteString, ByteString> openCursor(TreeName treeName) { try { return new CursorImpl(getNewExchange(treeName, false)); } catch (PersistitException e) { throw new StorageRuntimeException(e); } } } /** Common interface for internal WriteableTransaction implementations. */ @@ -430,8 +472,7 @@ public long getRecordCount(TreeName treeName) { // FIXME: is there a better/quicker way to do this? final Cursor<?, ?> cursor = openCursor(treeName); try try(final Cursor<?, ?> cursor = openCursor(treeName)) { long count = 0; while (cursor.next()) @@ -440,10 +481,6 @@ } return count; } finally { cursor.close(); } } @Override @@ -501,12 +538,6 @@ } @Override public void renameTree(final TreeName oldTreeName, final TreeName newTreeName) { throw new UnsupportedOperationException(); } @Override public boolean update(final TreeName treeName, final ByteSequence key, final UpdateFunction f) { try @@ -642,12 +673,6 @@ } @Override public void renameTree(TreeName oldName, TreeName newName) { throw new ReadOnlyStorageException(); } @Override public void deleteTree(TreeName name) { throw new ReadOnlyStorageException(); @@ -712,6 +737,15 @@ cfg.addPDBChangeListener(this); } private Configuration buildImportConfiguration() { final Configuration dbCfg = buildConfiguration(AccessMode.READ_WRITE); getBufferPoolCfg(dbCfg).setMaximumMemory(IMPORT_DB_CACHE_SIZE); dbCfg.setCheckpointInterval(CheckpointManagerMXBean.MAXIMUM_CHECKPOINT_INTERVAL_S); dbCfg.setCommitPolicy(SOFT); return dbCfg; } private Configuration buildConfiguration(AccessMode accessMode) { this.accessMode = accessMode; @@ -867,7 +901,7 @@ @Override public Importer startImport() throws ConfigException, StorageRuntimeException { open0(buildConfiguration(AccessMode.READ_WRITE)); open0(buildImportConfiguration()); return new ImporterImpl(); } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
@@ -53,7 +53,6 @@ import org.opends.server.api.MonitorProvider; import org.opends.server.backends.RebuildConfig; import org.opends.server.backends.VerifyConfig; import org.opends.server.backends.pluggable.ImportSuffixCommand.SuffixImportStrategy; import org.opends.server.backends.pluggable.spi.AccessMode; import org.opends.server.backends.pluggable.spi.Storage; import org.opends.server.backends.pluggable.spi.StorageInUseException; @@ -86,6 +85,8 @@ import org.opends.server.util.LDIFException; import org.opends.server.util.RuntimeInformation; import com.forgerock.opendj.util.StaticUtils; /** * This is an implementation of a Directory Server Backend which stores entries locally * in a pluggable storage. @@ -656,6 +657,10 @@ public LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext) throws DirectoryException { if (importConfig.appendToExistingData() || importConfig.replaceExistingEntries()) { throw new UnsupportedOperationException("append/replace mode is not supported by this backend."); } RuntimeInformation.logInfo(); // If the rootContainer is open, the backend is initialized by something else. @@ -664,33 +669,31 @@ { throw new DirectoryException(getServerErrorResultCode(), ERR_IMPORT_BACKEND_ONLINE.get()); } for (DN dn : cfg.getBaseDN()) { ImportSuffixCommand importCommand = new ImportSuffixCommand(dn, importConfig); if (importCommand.getSuffixImportStrategy() == SuffixImportStrategy.MERGE_DB_WITH_LDIF) { // fail-fast to avoid ending up in an unrecoverable state for the server throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_IMPORT_UNSUPPORTED_WITH_BRANCH.get()); } } try { if (OnDiskMergeBufferImporter.mustClearBackend(importConfig, cfg)) try { try if (importConfig.clearBackend()) { // clear all files before opening the root container storage.removeStorageFiles(); } catch (Exception e) { throw new DirectoryException(getServerErrorResultCode(), ERR_REMOVE_FAIL.get(e.getMessage()), e); } } catch (Exception e) { throw new DirectoryException(getServerErrorResultCode(), ERR_REMOVE_FAIL.get(e.getMessage()), e); } rootContainer = newRootContainer(AccessMode.READ_WRITE); return getImportStrategy().importLDIF(importConfig, rootContainer, serverContext); try { rootContainer.getStorage().close(); return getImportStrategy(serverContext, rootContainer).importLDIF(importConfig); } finally { rootContainer.getStorage().open(AccessMode.READ_WRITE); } } catch (StorageRuntimeException e) { @@ -704,6 +707,11 @@ { throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e); } catch (Exception e) { throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(StaticUtils .stackTraceToSingleLineString(e, false)), e); } finally { try @@ -727,10 +735,9 @@ } } private ImportStrategy getImportStrategy() throws DirectoryException private ImportStrategy getImportStrategy(ServerContext serverContext, RootContainer rootContainer) { // TODO JNR may call new SuccessiveAddsImportStrategy() depending on configured import strategy return new OnDiskMergeBufferImporter.StrategyImpl(cfg); return new OnDiskMergeImporter.StrategyImpl(serverContext, rootContainer, cfg); } /** {@inheritDoc} */ @@ -803,7 +810,15 @@ { rootContainer = newRootContainer(AccessMode.READ_WRITE); } new OnDiskMergeBufferImporter(rootContainer, rebuildConfig, cfg, serverContext).rebuildIndexes(); rootContainer.getStorage().close(); try { getImportStrategy(serverContext, rootContainer).rebuildIndex(rebuildConfig); } finally { rootContainer.getStorage().open(AccessMode.READ_WRITE); } } catch (ExecutionException execEx) { @@ -825,6 +840,11 @@ { throw e; } catch (Exception ex) { throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(stackTraceToSingleLineString(ex)), ex); } finally { closeTemporaryRootContainer(openRootContainer); opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/CursorTransformer.java
@@ -27,10 +27,13 @@ import static org.forgerock.util.Reject.*; import java.util.NoSuchElementException; import org.forgerock.opendj.ldap.ByteSequence; import org.forgerock.util.Function; import org.forgerock.util.promise.NeverThrowsException; import org.opends.server.backends.pluggable.spi.Cursor; import org.opends.server.backends.pluggable.spi.SequentialCursor; /** * Transforms the keys and values of a cursor from their original types to others. Typically used for @@ -83,6 +86,14 @@ return new CursorTransformer<>(input, keyTransformer, valueTransformer); } static <KI, VI, KO, VO> SequentialCursor<KO, VO> transformKeysAndValues(SequentialCursor<KI, VI> input, Function<KI, KO, ? extends Exception> keyTransformer, ValueTransformer<KI, VI, VO, ? extends Exception> valueTransformer) { // SequentialCursorAdapter constructor never throws return new CursorTransformer<>(new SequentialCursorAdapter<>(input), keyTransformer, valueTransformer); } @SuppressWarnings("unchecked") static <KI, VI, VO> Cursor<KI, VO> transformValues(Cursor<KI, VI> input, ValueTransformer<KI, VI, VO, ? extends Exception> valueTransformer) @@ -90,6 +101,15 @@ return transformKeysAndValues(input, (Function<KI, KI, NeverThrowsException>) NO_TRANSFORM, valueTransformer); } @SuppressWarnings("unchecked") static <KI, VI, VO> Cursor<KI, VO> transformValues(SequentialCursor<KI, VI> input, ValueTransformer<KI, VI, VO, ? extends Exception> valueTransformer) { // SequentialCursorAdapter constructor never throws return transformKeysAndValues(new SequentialCursorAdapter<>(input), (Function<KI, KI, NeverThrowsException>) NO_TRANSFORM, valueTransformer); } private CursorTransformer(Cursor<KI, VI> input, Function<KI, KO, ? extends Exception> keyTransformer, ValueTransformer<KI, VI, VO, ? extends Exception> valueTransformer) { @@ -184,6 +204,71 @@ cachedTransformedValue = null; } /** Make a {@link SequentialCursor} looks like a {@link Cursor}. */ static final class SequentialCursorAdapter<K, V> implements Cursor<K, V> { private final SequentialCursor<K, V> delegate; SequentialCursorAdapter(SequentialCursor<K, V> delegate) { this.delegate = delegate; } @Override public boolean next() { return delegate.next(); } @Override public boolean isDefined() { return delegate.isDefined(); } @Override public K getKey() throws NoSuchElementException { return delegate.getKey(); } @Override public V getValue() throws NoSuchElementException { return delegate.getValue(); } @Override public void close() { delegate.close(); } @Override public boolean positionToKey(ByteSequence key) { throw new UnsupportedOperationException(); } @Override public boolean positionToKeyOrNext(ByteSequence key) { throw new UnsupportedOperationException(); } @Override public boolean positionToLastKey() { throw new UnsupportedOperationException(); } @Override public boolean positionToIndex(int index) { throw new UnsupportedOperationException(); } } /** * Runtime exception for problems happening during the transformation */ opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java
@@ -29,10 +29,16 @@ import static org.opends.server.backends.pluggable.CursorTransformer.*; import static org.opends.server.backends.pluggable.DnKeyFormat.*; import java.util.LinkedList; import java.util.NoSuchElementException; import org.forgerock.opendj.ldap.ByteSequence; import org.forgerock.opendj.ldap.ByteString; import org.forgerock.opendj.ldap.ByteStringBuilder; import org.forgerock.opendj.ldap.Functions; import org.forgerock.util.Function; import org.forgerock.util.promise.NeverThrowsException; import org.opends.server.backends.pluggable.OnDiskMergeImporter.SequentialCursorDecorator; import org.opends.server.backends.pluggable.spi.Cursor; import org.opends.server.backends.pluggable.spi.ReadableTransaction; import org.opends.server.backends.pluggable.spi.SequentialCursor; @@ -41,7 +47,6 @@ import org.opends.server.backends.pluggable.spi.UpdateFunction; import org.opends.server.backends.pluggable.spi.WriteableTransaction; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; /** * This class represents the dn2id index, which has one record @@ -51,25 +56,18 @@ @SuppressWarnings("javadoc") class DN2ID extends AbstractTree { private static final Function<ByteString, Void, DirectoryException> TO_VOID_KEY = new Function<ByteString, Void, DirectoryException>() { @Override public Void apply(ByteString value) throws DirectoryException { return null; } }; private static final Function<ByteString, Void, NeverThrowsException> TO_VOID_KEY = Functions.returns(null); private static final CursorTransformer.ValueTransformer<ByteString, ByteString, EntryID, Exception> TO_ENTRY_ID = new CursorTransformer.ValueTransformer<ByteString, ByteString, EntryID, Exception>() { @Override public EntryID transform(ByteString key, ByteString value) throws Exception { return new EntryID(value); } }; private static final CursorTransformer.ValueTransformer<ByteString, ByteString, EntryID, NeverThrowsException> TO_ENTRY_ID = new CursorTransformer.ValueTransformer<ByteString, ByteString, EntryID, NeverThrowsException>() { @Override public EntryID transform(ByteString key, ByteString value) { return new EntryID(value); } }; private final DN baseDN; @@ -154,6 +152,12 @@ return value != null ? new EntryID(value) : null; } <V> SequentialCursor<ByteString, ByteString> openCursor(SequentialCursor<ByteString, ByteString> dn2IdCursor, TreeVisitor<V> treeVisitor) { return new TreeVisitorCursor<>(dn2IdCursor, treeVisitor); } Cursor<Void, EntryID> openCursor(ReadableTransaction txn, DN dn) { return transformKeysAndValues(openCursor0(txn, dn), TO_VOID_KEY, TO_ENTRY_ID); @@ -167,11 +171,11 @@ SequentialCursor<Void, EntryID> openChildrenCursor(ReadableTransaction txn, DN dn) { return new ChildrenCursor(openCursor0(txn, dn)); return transformKeysAndValues(new ChildrenCursor(openCursor0(txn, dn)), TO_VOID_KEY, TO_ENTRY_ID); } SequentialCursor<Void, EntryID> openSubordinatesCursor(ReadableTransaction txn, DN dn) { return new SubtreeCursor(openCursor0(txn, dn)); return transformKeysAndValues(new SubtreeCursor(openCursor0(txn, dn)), TO_VOID_KEY, TO_ENTRY_ID); } /** @@ -209,7 +213,9 @@ * Decorator overriding the next() behavior to iterate through children of the entry pointed by the given cursor at * creation. */ private static final class ChildrenCursor extends SequentialCursorForwarding { private static final class ChildrenCursor extends SequentialCursorDecorator<Cursor<ByteString, ByteString>, ByteString, ByteString> { private final ByteStringBuilder builder; private final ByteString limit; private boolean cursorOnParent; @@ -236,7 +242,7 @@ return isDefined() && delegate.getKey().compareTo(limit) < 0; } private ByteStringBuilder nextSibling() private ByteSequence nextSibling() { return builder.clear().append(delegate.getKey()).append((byte) 0x1); } @@ -246,7 +252,9 @@ * Decorator overriding the next() behavior to iterate through subordinates of the entry pointed by the given cursor * at creation. */ private static final class SubtreeCursor extends SequentialCursorForwarding { private static final class SubtreeCursor extends SequentialCursorDecorator<Cursor<ByteString, ByteString>, ByteString, ByteString> { private final ByteString limit; SubtreeCursor(Cursor<ByteString, ByteString> delegate) @@ -262,15 +270,79 @@ } } /** * Decorator allowing to partially overrides methods of a given cursor while keeping the default behavior for other * methods. */ private static class SequentialCursorForwarding implements SequentialCursor<Void, EntryID> { final Cursor<ByteString, ByteString> delegate; /** Keep track of information during the visit. */ private static final class ParentInfo<V> { private final ByteString parentDN; private final V visitorData; SequentialCursorForwarding(Cursor<ByteString, ByteString> delegate) { ParentInfo(ByteString parentDN, V visitorData) { this.parentDN = parentDN; this.visitorData = visitorData; } } /** Allows to visit dn2id tree without exposing internal encoding. */ static interface TreeVisitor<V> { V beginParent(EntryID parentID); void onChild(V parent, EntryID childID); void endParent(V parent); } /** Perform dn2id cursoring to expose parent and children to the {@link TreeVisitor} */ private static final class TreeVisitorCursor<V> implements SequentialCursor<ByteString, ByteString> { private final SequentialCursor<ByteString, ByteString> delegate; private final LinkedList<ParentInfo<V>> parentsInfoStack; private final TreeVisitor<V> visitor; TreeVisitorCursor(SequentialCursor<ByteString, ByteString> delegate, TreeVisitor<V> visitor) { this.delegate = delegate; this.parentsInfoStack = new LinkedList<>(); this.visitor = visitor; } @Override public boolean next() { if (delegate.next()) { final ByteString dn = delegate.getKey(); final EntryID entryID = new EntryID(delegate.getValue()); popCompleteParents(dn); notifyChild(entryID); pushNewParent(dn, entryID); return true; } popCompleteParents(DN.NULL_DN.toNormalizedByteString()); return false; } private void pushNewParent(final ByteString dn, final EntryID entryID) { parentsInfoStack.push(new ParentInfo<>(dn, visitor.beginParent(entryID))); } private void notifyChild(final EntryID entryID) { if (!parentsInfoStack.isEmpty()) { visitor.onChild(parentsInfoStack.peek().visitorData, entryID); } } private void popCompleteParents(ByteString dn) { ParentInfo<V> currentParent; while ((currentParent = parentsInfoStack.peek()) != null && !isChild(currentParent.parentDN, dn)) { visitor.endParent(parentsInfoStack.pop().visitorData); } } @Override @@ -280,21 +352,15 @@ } @Override public boolean next() public ByteString getKey() throws NoSuchElementException { return delegate.next(); return delegate.getKey(); } @Override public Void getKey() public ByteString getValue() throws NoSuchElementException { return null; } @Override public EntryID getValue() { return new EntryID(delegate.getValue()); return delegate.getValue(); } @Override @@ -303,4 +369,5 @@ delegate.close(); } } } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
@@ -36,13 +36,11 @@ import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.ldap.ByteSequence; import org.forgerock.opendj.ldap.ByteString; import org.forgerock.util.Reject; import org.forgerock.util.promise.NeverThrowsException; import org.opends.server.backends.pluggable.CursorTransformer.ValueTransformer; import org.opends.server.backends.pluggable.EntryIDSet.EntryIDSetCodec; import org.opends.server.backends.pluggable.State.IndexFlag; import org.opends.server.backends.pluggable.spi.Cursor; import org.opends.server.backends.pluggable.spi.Importer; import org.opends.server.backends.pluggable.spi.ReadableTransaction; import org.opends.server.backends.pluggable.spi.StorageRuntimeException; import org.opends.server.backends.pluggable.spi.TreeName; @@ -146,53 +144,6 @@ return importIDSet.valueToByteString(codec); } // TODO JNR rename to importUpsert() ? @Override public final void importPut(Importer importer, ImportIDSet idsToBeAdded) throws StorageRuntimeException { Reject.ifNull(importer, "importer must not be null"); Reject.ifNull(idsToBeAdded, "idsToBeAdded must not be null"); ByteSequence key = idsToBeAdded.getKey(); ByteString value = importer.read(getName(), key); if (value != null) { final EntryIDSet entryIDSet = decodeValue(key, value); final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit); importIDSet.merge(idsToBeAdded); importer.put(getName(), key, toValue(importIDSet)); } else { importer.put(getName(), key, toValue(idsToBeAdded)); } } @Override public final void importRemove(Importer importer, ImportIDSet idsToBeRemoved) throws StorageRuntimeException { Reject.ifNull(importer, "importer must not be null"); Reject.ifNull(idsToBeRemoved, "idsToBeRemoved must not be null"); ByteSequence key = idsToBeRemoved.getKey(); ByteString value = importer.read(getName(), key); if (value == null) { // Should never happen -- the keys should always be there. throw new IllegalStateException("Expected to have a value associated to key " + key + " for index " + getName()); } final EntryIDSet entryIDSet = decodeValue(key, value); final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit); importIDSet.remove(idsToBeRemoved); if (importIDSet.isDefined() && importIDSet.size() == 0) { importer.delete(getName(), key); } else { importer.put(getName(), key, toValue(importIDSet)); } } @Override public final void update(final WriteableTransaction txn, final ByteString key, final EntryIDSet deletedIDs, final EntryIDSet addedIDs) throws StorageRuntimeException opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryContainer.java
@@ -1474,7 +1474,7 @@ // Insert into the indexes, in index configuration order. final IndexBuffer indexBuffer = new IndexBuffer(); indexInsertEntry(indexBuffer, entry, entryID); insertEntryIntoIndexes(indexBuffer, entry, entryID); final ByteString encodedEntry = id2entry.encode(entry); @@ -1490,33 +1490,10 @@ // Check whether the entry already exists. if (dn2id.get(txn, entry.getName()) != null) { throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get( entry.getName())); throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(entry.getName())); } // Check that the parent entry exists. EntryID parentID = null; if (parentDN != null) { // Check for referral entries above the target. dn2uri.targetEntryReferrals(txn, entry.getName(), null); // Read the parent ID from dn2id. parentID = dn2id.get(txn, parentDN); if (parentID == null) { throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, ERR_ADD_NO_SUCH_OBJECT.get(entry.getName()), getMatchedDN(txn, baseDN), null); } id2childrenCount.addDelta(txn, parentID, 1); } dn2id.put(txn, entry.getName(), entryID); dn2uri.addEntry(txn, entry); id2entry.put(txn, entryID, encodedEntry); indexBuffer.flush(txn); addEntry0(entry, parentDN, entryID, indexBuffer, encodedEntry, txn); if (addOperation != null) { // One last check before committing @@ -1552,6 +1529,39 @@ } } void addEntry0(final Entry entry, final DN parentDN, final EntryID entryID, final IndexBuffer indexBuffer, final ByteString encodedEntry, WriteableTransaction txn) throws DirectoryException { // Check that the parent entry exists. if (parentDN != null) { // Check for referral entries above the target. dn2uri.targetEntryReferrals(txn, entry.getName(), null); final EntryID parentID = dn2id.get(txn, parentDN); if (parentID == null) { throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, ERR_ADD_NO_SUCH_OBJECT.get(entry.getName()), getMatchedDN(txn, baseDN), null); } id2childrenCount.addDelta(txn, parentID, 1); } dn2id.put(txn, entry.getName(), entryID); dn2uri.addEntry(txn, entry); id2entry.put(txn, entryID, encodedEntry); indexBuffer.flush(txn); } void importEntry(WriteableTransaction txn, EntryID entryID, Entry entry) throws DirectoryException, StorageRuntimeException { final IndexBuffer indexBuffer = IndexBuffer.newImportIndexBuffer(txn, entryID); insertEntryIntoIndexes(indexBuffer, entry, entryID); addEntry0(entry, null, entryID, indexBuffer, id2entry.encode(entry), txn); } /** * Removes the specified entry from this tree. This method must ensure * that the entry exists and that it does not have any subordinate entries @@ -1765,7 +1775,7 @@ } // Remove from the indexes, in index config order. indexRemoveEntry(indexBuffer, entry, leafID); removeEntryFromIndexes(indexBuffer, entry, leafID); // Remove the children counter for this entry. id2childrenCount.deleteCount(txn, leafID); @@ -1964,8 +1974,8 @@ else { // The most optimal would be to figure out what the modifications were. indexRemoveEntry(indexBuffer, oldEntry, entryID); indexInsertEntry(indexBuffer, newEntry, entryID); removeEntryFromIndexes(indexBuffer, oldEntry, entryID); insertEntryIntoIndexes(indexBuffer, newEntry, entryID); } indexBuffer.flush(txn); @@ -2256,7 +2266,7 @@ if (renumbered || modifyDNOperation == null) { // Reindex the entry with the new ID. indexInsertEntry(buffer, newEntry, newID); insertEntryIntoIndexes(buffer, newEntry, newID); } if(isApexEntryMoved) @@ -2309,7 +2319,7 @@ if (!newID.equals(oldID) || modifyDNOperation == null) { // Reindex the entry with the new ID. indexRemoveEntry(buffer, oldEntry, oldID); removeEntryFromIndexes(buffer, oldEntry, oldID); } else { @@ -2391,7 +2401,7 @@ id2childrenCount.deleteCount(txn, oldID); // Reindex the entry with the new ID. indexRemoveEntry(buffer, oldEntry, oldID); removeEntryFromIndexes(buffer, oldEntry, oldID); } else if (!modifications.isEmpty()) { @@ -2444,7 +2454,7 @@ * @throws StorageRuntimeException If an error occurs in the storage. * @throws DirectoryException If a Directory Server error occurs. */ private void indexInsertEntry(IndexBuffer buffer, Entry entry, EntryID entryID) private void insertEntryIntoIndexes(IndexBuffer buffer, Entry entry, EntryID entryID) throws StorageRuntimeException, DirectoryException { for (AttributeIndex index : attrIndexMap.values()) @@ -2467,7 +2477,7 @@ * @throws StorageRuntimeException If an error occurs in the storage. * @throws DirectoryException If a Directory Server error occurs. */ private void indexRemoveEntry(IndexBuffer buffer, Entry entry, EntryID entryID) private void removeEntryFromIndexes(IndexBuffer buffer, Entry entry, EntryID entryID) throws StorageRuntimeException, DirectoryException { for (AttributeIndex index : attrIndexMap.values()) @@ -2622,61 +2632,6 @@ return treePrefix; } /** * Sets a new tree prefix for this entry container and rename all * existing trees in use by this entry container. * * @param txn the transaction for renaming Trees * @param newBaseDN The new tree prefix to use. * @throws StorageRuntimeException If an error occurs in the storage. */ void setTreePrefix(WriteableTransaction txn, final String newBaseDN) throws StorageRuntimeException { final List<Tree> allTrees = listTrees(); try { // Rename in transaction. for(Tree tree : allTrees) { TreeName oldName = tree.getName(); TreeName newName = oldName.replaceBaseDN(newBaseDN); txn.renameTree(oldName, newName); } // Only rename the containers if the txn succeeded. for (Tree tree : allTrees) { TreeName oldName = tree.getName(); TreeName newName = oldName.replaceBaseDN(newBaseDN); tree.setName(newName); } } catch (Exception e) { String msg = e.getMessage(); if (msg == null) { msg = stackTraceToSingleLineString(e); } throw new StorageRuntimeException(ERR_UNCHECKED_EXCEPTION.get(msg).toString(), e); } try { for(Tree tree : allTrees) { tree.open(txn, false); } } catch (Exception e) { String msg = e.getMessage(); if (msg == null) { msg = stackTraceToSingleLineString(e); } throw new StorageRuntimeException(ERR_UNCHECKED_EXCEPTION.get(msg).toString(), e); } } @Override public DN getBaseDN() { opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java
@@ -426,8 +426,8 @@ @Override public ByteString encode(EntryIDSet idSet) { return ByteString.wrap(append( new ByteStringBuilder(getEstimatedSize(idSet)), idSet).trimToSize().getBackingArray()); final ByteStringBuilder builder = new ByteStringBuilder(getEstimatedSize(idSet)); return ByteString.wrap(append(builder, idSet).getBackingArray(), 0, builder.length()); } @Override @@ -502,8 +502,8 @@ public ByteString encode(EntryIDSet idSet) { checkNotNull(idSet, "idSet must not be null"); ByteStringBuilder builder = new ByteStringBuilder(getEstimatedSize(idSet)); return append(builder, idSet).toByteString(); final ByteStringBuilder builder = new ByteStringBuilder(getEstimatedSize(idSet)); return ByteString.wrap(append(builder, idSet).getBackingArray(), 0, builder.length()); } @Override opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java
@@ -37,6 +37,8 @@ import org.opends.server.backends.pluggable.spi.UpdateFunction; import org.opends.server.backends.pluggable.spi.WriteableTransaction; import com.forgerock.opendj.util.PackedLong; /** * Store a counter associated to a key. Counter value is sharded amongst multiple keys to allow concurrent * update without contention (at the price of a slower read). @@ -50,7 +52,7 @@ */ private static final long SHARD_COUNT = 4096; private static final int LONG_SIZE = Long.SIZE / Byte.SIZE; private static final EntryID TOTAL_COUNT_ENTRY_ID = new EntryID(ByteStringBuilder.COMPACTED_MAX_VALUE); private static final EntryID TOTAL_COUNT_ENTRY_ID = new EntryID(PackedLong.COMPACTED_MAX_VALUE); ID2Count(TreeName name) { @@ -98,8 +100,9 @@ @Override public ByteSequence computeNewValue(ByteSequence oldValue) { final long currentValue = oldValue != null ? oldValue.asReader().getLong() : 0; return toValue(currentValue + delta); final long currentValue = oldValue == null ? 0 : oldValue.asReader().getLong(); final long newValue = currentValue + delta; return newValue == 0 ? null : toValue(newValue); } }); } @@ -118,8 +121,11 @@ private void importPut0(Importer importer, EntryID entryID, final long delta) { Reject.ifNull(importer, "importer must not be null"); final ByteSequence shardedKey = getShardedKey(entryID); importer.put(getName(), shardedKey, toValue(delta)); if (delta != 0) { final ByteSequence shardedKey = getShardedKey(entryID); importer.put(getName(), shardedKey, toValue(delta)); } } private ByteSequence getShardedKey(EntryID entryID) @@ -139,11 +145,13 @@ ByteString toValue(final long count) { Reject.ifFalse(count != 0, "count must be != 0"); return ByteString.valueOf(count); } long fromValue(ByteString value) { Reject.ifNull(value, "value must not be null"); return value.toLong(); } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportLDIFReader.java
@@ -32,6 +32,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.LocalizableMessageBuilder; @@ -50,6 +52,8 @@ /** This class specializes the LDIFReader for imports. */ final class ImportLDIFReader extends LDIFReader { private final ConcurrentHashMap<DN, CountDownLatch> pendingMap = new ConcurrentHashMap<>(); /** * A class holding the entry, its entryID as assigned by the LDIF reader and its suffix as * determined by the LDIF reader. @@ -58,13 +62,13 @@ { private final Entry entry; private final EntryID entryID; private final Suffix suffix; private final EntryContainer entryContainer; private EntryInformation(Entry entry, EntryID entryID, Suffix suffix) EntryInformation(Entry entry, EntryID entryID, EntryContainer entryContainer) { this.entry = entry; this.entryID = entryID; this.suffix = suffix; this.entryContainer = entryContainer; } Entry getEntry() @@ -77,9 +81,9 @@ return entryID; } Suffix getSuffix() EntryContainer getEntryContainer() { return suffix; return entryContainer; } } @@ -108,15 +112,15 @@ * Reads the next entry from the LDIF source. * * @return The next entry information read from the LDIF source, or <CODE>null</CODE> if the end of the LDIF * data is reached. * data is reached of if the import has been cancelled. * @param suffixesMap * A map of suffixes instances. * A map of entry containers instances. * @throws IOException * If an I/O problem occurs while reading from the file. * @throws LDIFException * If the information read cannot be parsed as an LDIF entry. */ public final EntryInformation readEntry(Map<DN, Suffix> suffixesMap) throws IOException, LDIFException public final EntryInformation readEntry(Map<DN, EntryContainer> suffixesMap) throws IOException, LDIFException { final boolean checkSchema = importConfig.validateSchema(); while (true) @@ -124,7 +128,7 @@ LinkedList<StringBuilder> lines; DN entryDN; EntryID entryID; Suffix suffix; final EntryContainer entryContainer; synchronized (this) { // Read the set of lines that make up the next entry. @@ -155,16 +159,8 @@ // read and return the next entry. continue; } else if (!importConfig.includeEntry(entryDN)) { logger.trace("Skipping entry %s because the DN is not one that " + "should be included based on the include and exclude branches.", entryDN); entriesRead.incrementAndGet(); logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN)); continue; } suffix = getMatchSuffix(entryDN, suffixesMap); if (suffix == null) entryContainer = getEntryContainer(entryDN, suffixesMap); if (entryContainer == null) { logger.trace("Skipping entry %s because the DN is not one that " + "should be included based on a suffix match check.", entryDN); @@ -174,23 +170,29 @@ } entriesRead.incrementAndGet(); entryID = rootContainer.getNextEntryID(); suffix.addPending(entryDN); if (!addPending(entryDN)) { logger.trace("Skipping entry %s because the DN already exists.", entryDN); logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN)); continue; } } // Create the entry and see if it is one that should be included in the import final Entry entry = createEntry(lines, entryDN, checkSchema, suffix); final Entry entry = createEntry(lines, entryDN, checkSchema); if (entry == null || !isIncludedInImport(entry, suffix, lines) || !invokeImportPlugins(entry, suffix, lines) || (checkSchema && !isValidAgainstSchema(entry, suffix, lines))) || !invokeImportPlugins(entry, lines) || (checkSchema && !isValidAgainstSchema(entry, lines))) { removePending(entryDN); continue; } return new EntryInformation(entry, entryID, suffix); return new EntryInformation(entry, entryID, entryContainer); } } private Entry createEntry(List<StringBuilder> lines, DN entryDN, boolean checkSchema, Suffix suffix) private Entry createEntry(List<StringBuilder> lines, DN entryDN, boolean checkSchema) { // Read the set of attributes from the entry. Map<ObjectClass, String> objectClasses = new HashMap<>(); @@ -210,7 +212,6 @@ logger.trace("Skipping entry %s because reading" + "its attributes failed.", entryDN); } logToSkipWriter(lines, ERR_LDIF_READ_ATTR_SKIP.get(entryDN, e.getMessage())); suffix.removePending(entryDN); return null; } @@ -220,36 +221,7 @@ return entry; } private boolean isIncludedInImport(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines) { final DN entryDN = entry.getName(); try { if (!importConfig.includeEntry(entry)) { if (logger.isTraceEnabled()) { logger.trace("Skipping entry %s because the DN is not one that " + "should be included based on the include and exclude filters.", entryDN); } logToSkipWriter(lines, ERR_LDIF_SKIP.get(entryDN)); suffix.removePending(entryDN); return false; } } catch (Exception e) { logger.traceException(e); suffix.removePending(entryDN); logToSkipWriter(lines, ERR_LDIF_COULD_NOT_EVALUATE_FILTERS_FOR_IMPORT.get(entry.getName(), lastEntryLineNumber, e)); suffix.removePending(entryDN); return false; } return true; } private boolean invokeImportPlugins(final Entry entry, Suffix suffix, LinkedList<StringBuilder> lines) private boolean invokeImportPlugins(final Entry entry, LinkedList<StringBuilder> lines) { if (importConfig.invokeImportPlugins()) { @@ -269,14 +241,13 @@ } logToRejectWriter(lines, m); suffix.removePending(entryDN); return false; } } return true; } private boolean isValidAgainstSchema(Entry entry, Suffix suffix, LinkedList<StringBuilder> lines) private boolean isValidAgainstSchema(Entry entry, LinkedList<StringBuilder> lines) { final DN entryDN = entry.getName(); addRDNAttributesIfNecessary(entryDN, entry.getUserAttributes(), entry.getOperationalAttributes()); @@ -288,7 +259,6 @@ { LocalizableMessage message = ERR_LDIF_SCHEMA_VIOLATION.get(entryDN, lastEntryLineNumber, invalidReason); logToRejectWriter(lines, message); suffix.removePending(entryDN); return false; } return true; @@ -301,20 +271,60 @@ * The DN to search for. * @param map * The map to search. * @return The suffix instance that matches the DN, or null if no match is found. * @return The entry container instance that matches the DN, or null if no match is found. */ private Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map) private EntryContainer getEntryContainer(DN dn, Map<DN, EntryContainer> map) { DN nodeDN = dn; while (nodeDN != null) { final Suffix suffix = map.get(nodeDN); if (suffix != null) final EntryContainer entryContainer = map.get(nodeDN); if (entryContainer != null) { return suffix; return entryContainer; } nodeDN = nodeDN.getParentDNInSuffix(); } return null; } /** * Make sure the specified parent DN is not in the pending map. * * @param parentDN The DN of the parent. */ void waitIfPending(DN parentDN) throws InterruptedException { final CountDownLatch l = pendingMap.get(parentDN); if (l != null) { l.await(); } } /** * Add specified DN to the pending map. * * @param dn The DN to add to the map. * @return true if the DN was added, false if the DN is already present. */ private boolean addPending(DN dn) { return pendingMap.putIfAbsent(dn, new CountDownLatch(1)) == null; } /** * Remove the specified DN from the pending map, it may not exist if the * entries are being migrated so just return. * * @param dn The DN to remove from the map. */ void removePending(DN dn) { CountDownLatch l = pendingMap.remove(dn); if(l != null) { l.countDown(); } } } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java
@@ -24,9 +24,7 @@ */ package org.opends.server.backends.pluggable; import org.opends.server.core.ServerContext; import org.opends.server.types.DirectoryException; import org.opends.server.types.InitializationException; import org.opends.server.backends.RebuildConfig; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.LDIFImportResult; @@ -34,21 +32,25 @@ interface ImportStrategy { /** * Imports information from an LDIF file into the supplied root container. * Imports information from an LDIF file. * * @param importConfig * The configuration to use when performing the import * @param rootContainer * The root container where to do the import * @param serverContext * The server context * @return Information about the result of the import processing * @throws DirectoryException * @throws Exception * If a problem occurs while performing the LDIF import * @throws InitializationException * If a problem occurs while initializing the LDIF import * @see {@link Backend#importLDIF(LDIFImportConfig, ServerContext)} */ LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer, ServerContext serverContext) throws DirectoryException, InitializationException; LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception; /** * Rebuild indexes. * * @param rebuildConfig * The configuration to sue when performing the rebuild. * @throws Exception * If a problem occurs while performing the rebuild. * @see {@link Backend#rebuildIndex(RebuildConfig, ServerContext)} */ void rebuildIndex(RebuildConfig rebuildConfig) throws Exception; } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java
@@ -29,7 +29,6 @@ import org.forgerock.opendj.ldap.ByteSequence; import org.forgerock.opendj.ldap.ByteString; import org.opends.server.backends.pluggable.spi.Cursor; import org.opends.server.backends.pluggable.spi.Importer; import org.opends.server.backends.pluggable.spi.ReadableTransaction; import org.opends.server.backends.pluggable.spi.WriteableTransaction; @@ -44,12 +43,6 @@ int getIndexEntryLimit(); // Ignores trusted state. void importPut(Importer importer, ImportIDSet idsToBeAdded); // Ignores trusted state. void importRemove(Importer importer, ImportIDSet idsToBeRemoved); boolean isTrusted(); Cursor<ByteString, EntryIDSet> openCursor(ReadableTransaction txn); opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
@@ -35,6 +35,7 @@ import java.util.TreeSet; import org.forgerock.opendj.ldap.ByteString; import org.forgerock.util.Reject; import org.opends.server.backends.pluggable.spi.StorageRuntimeException; import org.opends.server.backends.pluggable.spi.WriteableTransaction; import org.opends.server.types.DirectoryException; @@ -50,181 +51,305 @@ @SuppressWarnings("javadoc") class IndexBuffer { /** Internal interface for IndexBuffer implementor. */ private interface IndexBufferImplementor { void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException; void put(Index index, ByteString key, EntryID entryID); void put(VLVIndex index, ByteString sortKey); void remove(VLVIndex index, ByteString sortKey); void remove(Index index, ByteString key, EntryID entryID); } /** * The buffered records stored as a map from the record key to the * buffered value for that key for each index. * A buffered index is used to buffer multiple reads or writes to the same index key into a single read or write. * <p> * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}). * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE). * It can only be used to buffer multiple reads and writes under the same transaction. The transaction may be null if * it is known that there are no other concurrent updates to the index. */ private final SortedMap<Index, SortedMap<ByteString, BufferedIndexValues>> bufferedIndexes = new TreeMap<>(); private static final class DefaultIndexBuffer implements IndexBufferImplementor { /** * The buffered records stored as a map from the record key to the buffered value for that key for each index. * <p> * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}). * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE). */ private final SortedMap<Index, SortedMap<ByteString, BufferedIndexValues>> bufferedIndexes = new TreeMap<>(); /** * The buffered records stored as a set of buffered VLV values for each index. * <p> * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}). * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE). */ private final SortedMap<VLVIndex, BufferedVLVIndexValues> bufferedVLVIndexes = new TreeMap<>(); /** * A simple class representing a pair of added and deleted indexed IDs. Initially both addedIDs and deletedIDs are * {@code null} indicating that that the whole record should be deleted. */ private static class BufferedIndexValues { private EntryIDSet addedEntryIDs; private EntryIDSet deletedEntryIDs; void addEntryID(EntryID entryID) { if (!remove(deletedEntryIDs, entryID)) { if (this.addedEntryIDs == null) { this.addedEntryIDs = newDefinedSet(); } this.addedEntryIDs.add(entryID); } } void deleteEntryID(EntryID entryID) { if (!remove(addedEntryIDs, entryID)) { if (this.deletedEntryIDs == null) { this.deletedEntryIDs = newDefinedSet(); } this.deletedEntryIDs.add(entryID); } } private static boolean remove(EntryIDSet entryIDs, EntryID entryID) { return entryIDs != null ? entryIDs.remove(entryID) : false; } } /** A simple class representing a pair of added and deleted VLV values. */ private static class BufferedVLVIndexValues { private TreeSet<ByteString> addedSortKeys; private TreeSet<ByteString> deletedSortKeys; void addSortKey(ByteString sortKey) { if (!remove(deletedSortKeys, sortKey)) { if (addedSortKeys == null) { addedSortKeys = new TreeSet<>(); } addedSortKeys.add(sortKey); } } void deleteSortKey(ByteString sortKey) { if (!remove(addedSortKeys, sortKey)) { if (deletedSortKeys == null) { deletedSortKeys = new TreeSet<>(); } deletedSortKeys.add(sortKey); } } private static boolean remove(TreeSet<ByteString> sortKeys, ByteString sortKey) { return sortKeys != null ? sortKeys.remove(sortKey) : false; } } private BufferedVLVIndexValues createOrGetBufferedVLVIndexValues(VLVIndex vlvIndex) { BufferedVLVIndexValues bufferedValues = bufferedVLVIndexes.get(vlvIndex); if (bufferedValues == null) { bufferedValues = new BufferedVLVIndexValues(); bufferedVLVIndexes.put(vlvIndex, bufferedValues); } return bufferedValues; } private BufferedIndexValues createOrGetBufferedIndexValues(Index index, ByteString keyBytes) { Map<ByteString, BufferedIndexValues> bufferedOperations = createOrGetBufferedOperations(index); BufferedIndexValues values = bufferedOperations.get(keyBytes); if (values == null) { values = new BufferedIndexValues(); bufferedOperations.put(keyBytes, values); } return values; } private Map<ByteString, BufferedIndexValues> createOrGetBufferedOperations(Index index) { SortedMap<ByteString, BufferedIndexValues> bufferedOperations = bufferedIndexes.get(index); if (bufferedOperations == null) { bufferedOperations = new TreeMap<>(); bufferedIndexes.put(index, bufferedOperations); } return bufferedOperations; } @Override public void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException { // Indexes are stored in sorted map to prevent deadlock during flush with DB using pessimistic lock strategies. for (Entry<Index, SortedMap<ByteString, BufferedIndexValues>> entry : bufferedIndexes.entrySet()) { flushIndex(entry.getKey(), txn, entry.getValue()); } for (Entry<VLVIndex, BufferedVLVIndexValues> entry : bufferedVLVIndexes.entrySet()) { entry.getKey().updateIndex(txn, entry.getValue().addedSortKeys, entry.getValue().deletedSortKeys); } } @Override public void put(Index index, ByteString key, EntryID entryID) { createOrGetBufferedIndexValues(index, key).addEntryID(entryID); } @Override public void put(VLVIndex index, ByteString sortKey) { createOrGetBufferedVLVIndexValues(index).addSortKey(sortKey); } @Override public void remove(VLVIndex index, ByteString sortKey) { createOrGetBufferedVLVIndexValues(index).deleteSortKey(sortKey); } @Override public void remove(Index index, ByteString key, EntryID entryID) { createOrGetBufferedIndexValues(index, key).deleteEntryID(entryID); } private static void flushIndex(Index index, WriteableTransaction txn, Map<ByteString, BufferedIndexValues> bufferedValues) { for (Entry<ByteString, BufferedIndexValues> entry : bufferedValues.entrySet()) { final BufferedIndexValues values = entry.getValue(); index.update(txn, entry.getKey(), values.deletedEntryIDs, values.addedEntryIDs); } } } /** * The buffered records stored as a set of buffered VLV values for each index. * <p> * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}). * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE). * IndexBuffer used during import which actually doesn't buffer modifications but forward those directly to the * supplied {@link WriteableTransaction}. */ private final SortedMap<VLVIndex, BufferedVLVIndexValues> bufferedVLVIndexes = new TreeMap<>(); /** * A simple class representing a pair of added and deleted indexed IDs. Initially both addedIDs * and deletedIDs are {@code null} indicating that that the whole record should be deleted. */ private static class BufferedIndexValues private static final class ImportIndexBuffer implements IndexBufferImplementor { private EntryIDSet addedEntryIDs; private EntryIDSet deletedEntryIDs; private final WriteableTransaction txn; private final EntryID expectedEntryID; private final ByteString encodedEntryID; void addEntryID(EntryID entryID) ImportIndexBuffer(WriteableTransaction txn, EntryID expectedEntryID) { if (!remove(deletedEntryIDs, entryID)) { if (this.addedEntryIDs == null) { this.addedEntryIDs = newDefinedSet(); } this.addedEntryIDs.add(entryID); } this.txn = txn; this.expectedEntryID = expectedEntryID; this.encodedEntryID = CODEC_V2.encode(EntryIDSet.newDefinedSet(expectedEntryID.longValue())); } void deleteEntryID(EntryID entryID) @Override public void put(Index index, ByteString key, EntryID entryID) { if (!remove(addedEntryIDs, entryID)) { if (this.deletedEntryIDs == null) { this.deletedEntryIDs = newDefinedSet(); } this.deletedEntryIDs.add(entryID); } Reject.ifFalse(this.expectedEntryID.equals(entryID), "Unexpected entryID"); txn.put(index.getName(), key, encodedEntryID); } private static boolean remove(EntryIDSet entryIDs, EntryID entryID) @Override public void put(VLVIndex index, ByteString sortKey) { return entryIDs != null ? entryIDs.remove(entryID) : false; txn.put(index.getName(), sortKey, index.toValue()); } @Override public void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException { // Nothing to do } @Override public void remove(VLVIndex index, ByteString sortKey) { throw new UnsupportedOperationException(); } @Override public void remove(Index index, ByteString key, EntryID entryID) { throw new UnsupportedOperationException(); } } /** A simple class representing a pair of added and deleted VLV values. */ private static class BufferedVLVIndexValues private final IndexBufferImplementor impl; static IndexBuffer newImportIndexBuffer(WriteableTransaction txn, EntryID entryID) { private TreeSet<ByteString> addedSortKeys; private TreeSet<ByteString> deletedSortKeys; void addSortKey(ByteString sortKey) { if (!remove(deletedSortKeys, sortKey)) { if (addedSortKeys == null) { addedSortKeys = new TreeSet<>(); } addedSortKeys.add(sortKey); } } void deleteSortKey(ByteString sortKey) { if (!remove(addedSortKeys, sortKey)) { if (deletedSortKeys == null) { deletedSortKeys = new TreeSet<>(); } deletedSortKeys.add(sortKey); } } private static boolean remove(TreeSet<ByteString> sortKeys, ByteString sortKey) { return sortKeys != null ? sortKeys.remove(sortKey) : false; } return new IndexBuffer(new ImportIndexBuffer(txn, entryID)); } private BufferedVLVIndexValues createOrGetBufferedVLVIndexValues(VLVIndex vlvIndex) public IndexBuffer() { BufferedVLVIndexValues bufferedValues = bufferedVLVIndexes.get(vlvIndex); if (bufferedValues == null) { bufferedValues = new BufferedVLVIndexValues(); bufferedVLVIndexes.put(vlvIndex, bufferedValues); } return bufferedValues; this(new DefaultIndexBuffer()); } private BufferedIndexValues createOrGetBufferedIndexValues(Index index, ByteString keyBytes) private IndexBuffer(IndexBufferImplementor impl) { Map<ByteString, BufferedIndexValues> bufferedOperations = createOrGetBufferedOperations(index); BufferedIndexValues values = bufferedOperations.get(keyBytes); if (values == null) { values = new BufferedIndexValues(); bufferedOperations.put(keyBytes, values); } return values; } private Map<ByteString, BufferedIndexValues> createOrGetBufferedOperations(Index index) { SortedMap<ByteString, BufferedIndexValues> bufferedOperations = bufferedIndexes.get(index); if (bufferedOperations == null) { bufferedOperations = new TreeMap<>(); bufferedIndexes.put(index, bufferedOperations); } return bufferedOperations; this.impl = impl; } /** * Flush the buffered index changes to storage. * * @param txn a non null transaction * @throws StorageRuntimeException If an error occurs in the storage. * @throws DirectoryException If a Directory Server error occurs. * @param txn * a non null transaction * @throws StorageRuntimeException * If an error occurs in the storage. * @throws DirectoryException * If a Directory Server error occurs. */ void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException { // Indexes are stored in sorted map to prevent deadlock during flush with DB using pessimistic lock strategies. for (Entry<Index, SortedMap<ByteString, BufferedIndexValues>> entry : bufferedIndexes.entrySet()) { flushIndex(entry.getKey(), txn, entry.getValue()); } for (Entry<VLVIndex, BufferedVLVIndexValues> entry : bufferedVLVIndexes.entrySet()) { entry.getKey().updateIndex(txn, entry.getValue().addedSortKeys, entry.getValue().deletedSortKeys); } impl.flush(txn); } void put(Index index, ByteString key, EntryID entryID) { createOrGetBufferedIndexValues(index, key).addEntryID(entryID); impl.put(index, key, entryID); } void put(VLVIndex index, ByteString sortKey) { createOrGetBufferedVLVIndexValues(index).addSortKey(sortKey); impl.put(index, sortKey); } void remove(VLVIndex index, ByteString sortKey) { createOrGetBufferedVLVIndexValues(index).deleteSortKey(sortKey); impl.remove(index, sortKey); } void remove(Index index, ByteString key, EntryID entryID) { createOrGetBufferedIndexValues(index, key).deleteEntryID(entryID); impl.remove(index, key, entryID); } private static void flushIndex(Index index, WriteableTransaction txn, Map<ByteString, BufferedIndexValues> bufferedValues) { for (Entry<ByteString, BufferedIndexValues> entry : bufferedValues.entrySet()) { final BufferedIndexValues values = entry.getValue(); index.update(txn, entry.getKey(), values.deletedEntryIDs, values.addedEntryIDs); } } } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexInputBuffer.java
File was deleted opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexOutputBuffer.java
File was deleted opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeBufferImporter.java
File was deleted opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
New file @@ -0,0 +1,3531 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2015 ForgeRock AS. */ package org.opends.server.backends.pluggable; import static java.nio.channels.FileChannel.*; import static org.forgerock.util.Utils.*; import static org.opends.messages.BackendMessages.*; import static org.opends.server.util.DynamicConstants.*; import static org.opends.server.util.StaticUtils.*; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.nio.file.FileAlreadyExistsException; import java.nio.file.StandardOpenOption; import java.util.AbstractList; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.TimerTask; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import java.util.WeakHashMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.config.server.ConfigException; import org.forgerock.opendj.ldap.ByteSequence; import org.forgerock.opendj.ldap.ByteString; import org.forgerock.opendj.ldap.ResultCode; import org.forgerock.util.Reject; import org.forgerock.util.Utils; import org.forgerock.util.promise.PromiseImpl; import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType; import org.opends.server.admin.std.server.BackendIndexCfg; import org.opends.server.admin.std.server.PluggableBackendCfg; import org.opends.server.api.CompressedSchema; import org.opends.server.backends.RebuildConfig; import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex; import org.opends.server.backends.pluggable.CursorTransformer.SequentialCursorAdapter; import org.opends.server.backends.pluggable.DN2ID.TreeVisitor; import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation; import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk; import org.opends.server.backends.pluggable.spi.Cursor; import org.opends.server.backends.pluggable.spi.Importer; import org.opends.server.backends.pluggable.spi.ReadableTransaction; import org.opends.server.backends.pluggable.spi.SequentialCursor; import org.opends.server.backends.pluggable.spi.StorageRuntimeException; import org.opends.server.backends.pluggable.spi.TreeName; import org.opends.server.backends.pluggable.spi.UpdateFunction; import org.opends.server.backends.pluggable.spi.WriteableTransaction; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ServerContext; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.InitializationException; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.LDIFImportResult; import org.opends.server.util.Platform; import com.forgerock.opendj.util.PackedLong; // @Checkstyle:ignore import sun.misc.Unsafe; /** * Imports LDIF data contained in files into the database. Because of the B-Tree structure used in backend, import is * faster when records are inserted in ascending order. This prevents node locking/re-writing due to B-Tree inner nodes * split. This is why import is performed in two phases: the first phase encode and sort all records while the second * phase copy the sorted records into the database. Entries are read from an LDIF file by the {@link ImportLDIFReader}. * Then, each entry are optionally validated and finally imported into a {@link Chunk} by the {@link EntryContainer} * using a {@link PhaseOneWriteableTransaction}. Once all entries have been processed, * {@link PhaseOneWriteableTransaction#getChunks()} get all the chunks which will be copied into the database * concurrently using tasks created by the {@link ImporterTaskFactory}. */ final class OnDiskMergeImporter { private static final String DEFAULT_TMP_DIR = "import-tmp"; private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); /** * Shim that allows properly constructing an {@link OnDiskMergeImporter} without polluting {@link ImportStrategy} and * {@link RootContainer} with this importer inner workings. */ static class StrategyImpl implements ImportStrategy { private static final String PHASE1_REBUILDER_THREAD_NAME = "PHASE1-REBUILDER-%d"; private static final String PHASE2_REBUILDER_THREAD_NAME = "PHASE2-REBUILDER-%d"; private static final String PHASE1_IMPORTER_THREAD_NAME = "PHASE1-IMPORTER-%d"; private static final String PHASE2_IMPORTER_THREAD_NAME = "PHASE2-IMPORTER-%d"; private static final String SORTER_THREAD_NAME = "PHASE1-SORTER-%d"; /** Small heap threshold used to give more memory to JVM to attempt OOM errors. */ private static final int SMALL_HEAP_SIZE = 256 * MB; private final ServerContext serverContext; private final RootContainer rootContainer; private final PluggableBackendCfg backendCfg; StrategyImpl(ServerContext serverContext, RootContainer rootContainer, PluggableBackendCfg backendCfg) { this.serverContext = serverContext; this.rootContainer = rootContainer; this.backendCfg = backendCfg; } @Override public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws Exception { final long availableMemory = calculateAvailableMemory(); final int threadCount = importConfig.getThreadCount() == 0 ? Runtime.getRuntime().availableProcessors() : importConfig.getThreadCount(); final int indexCount = getIndexCount(); final int nbBuffer = threadCount * indexCount * 2; logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer); final int bufferSize = computeBufferSize(nbBuffer, availableMemory); logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize); logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION); logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount); final long startTime = System.currentTimeMillis(); final OnDiskMergeImporter importer; final ExecutorService sorter = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), newThreadFactory(null, SORTER_THREAD_NAME, true)); final LDIFReaderSource source = new LDIFReaderSource(rootContainer, importConfig, PHASE1_IMPORTER_THREAD_NAME, threadCount); try (final Importer dbStorage = rootContainer.getStorage().startImport(); final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize)) { final File tempDir = prepareTempDir(backendCfg, importConfig.getTmpDirectory()); final Collection<EntryContainer> entryContainers = rootContainer.getEntryContainers(); final AbstractTwoPhaseImportStrategy importStrategy = importConfig.getSkipDNValidation() ? new SortAndImportWithoutDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter) : new SortAndImportWithDNValidation(entryContainers, dbStorage, tempDir, bufferPool, sorter); importer = new OnDiskMergeImporter(PHASE2_IMPORTER_THREAD_NAME, importStrategy); importer.doImport(source); } finally { sorter.shutdown(); } logger.info(NOTE_IMPORT_PHASE_STATS, importer.getTotalTimeInMillis() / 1000, importer.getPhaseOneTimeInMillis() / 1000, importer.getPhaseTwoTimeInMillis() / 1000); final long importTime = System.currentTimeMillis() - startTime; float rate = 0; if (importTime > 0) { rate = 1000f * source.getEntriesRead() / importTime; } logger.info(NOTE_IMPORT_FINAL_STATUS, source.getEntriesRead(), importer.getImportedCount(), source .getEntriesIgnored(), source.getEntriesRejected(), 0, importTime / 1000, rate); return new LDIFImportResult(source.getEntriesRead(), source.getEntriesRejected(), source .getEntriesIgnored()); } private int getIndexCount() throws ConfigException { int indexCount = 2; // dn2id, dn2uri for (String indexName : backendCfg.listBackendIndexes()) { final BackendIndexCfg index = backendCfg.getBackendIndex(indexName); final SortedSet<IndexType> types = index.getIndexType(); if (types.contains(IndexType.EXTENSIBLE)) { indexCount += types.size() - 1 + index.getIndexExtensibleMatchingRule().size(); } else { indexCount += types.size(); } } indexCount += backendCfg.listBackendVLVIndexes().length; return indexCount; } @Override public void rebuildIndex(final RebuildConfig rebuildConfig) throws Exception { final long availableMemory = calculateAvailableMemory(); // Rebuild indexes final OnDiskMergeImporter importer; final ExecutorService sorter = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), newThreadFactory(null, SORTER_THREAD_NAME, true)); try (final Importer dbStorage = rootContainer.getStorage().startImport()) { final EntryContainer entryContainer = rootContainer.getEntryContainer(rebuildConfig.getBaseDN()); final long totalEntries = entryContainer.getID2Entry().getRecordCount(new ImporterToWriteableTransactionAdapter(dbStorage)); final Set<String> indexesToRebuild = selectIndexesToRebuild(entryContainer, rebuildConfig, totalEntries); if (rebuildConfig.isClearDegradedState()) { visitIndexes(entryContainer, new SpecificIndexFilter(new TrustModifier(dbStorage, true), indexesToRebuild)); logger.info(NOTE_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList()); return; } final int threadCount = Runtime.getRuntime().availableProcessors(); final int nbBuffer = 2 * indexesToRebuild.size() * threadCount; final int bufferSize = computeBufferSize(nbBuffer, availableMemory); try (final BufferPool bufferPool = new BufferPool(nbBuffer, bufferSize)) { final File tempDir = prepareTempDir(backendCfg, rebuildConfig.getTmpDirectory()); final AbstractTwoPhaseImportStrategy strategy = new RebuildIndexStrategy(rootContainer.getEntryContainers(), dbStorage, tempDir, bufferPool, sorter, indexesToRebuild); importer = new OnDiskMergeImporter(PHASE2_REBUILDER_THREAD_NAME, strategy); importer.doImport( new ID2EntrySource(entryContainer, dbStorage, PHASE1_REBUILDER_THREAD_NAME, threadCount, totalEntries)); } } finally { sorter.shutdown(); } final long totalTime = importer.getTotalTimeInMillis(); final float rate = totalTime > 0 ? 1000f * importer.getImportedCount() / totalTime : 0; logger.info(NOTE_REBUILD_FINAL_STATUS, importer.getImportedCount(), totalTime / 1000, rate); } private final static Set<String> selectIndexesToRebuild(EntryContainer entryContainer, RebuildConfig rebuildConfig, long totalEntries) { final SelectIndexName selector = new SelectIndexName(); switch (rebuildConfig.getRebuildMode()) { case ALL: visitIndexes(entryContainer, selector); logger.info(NOTE_REBUILD_ALL_START, totalEntries); break; case DEGRADED: visitIndexes(entryContainer, new DegradedIndexFilter(selector)); logger.info(NOTE_REBUILD_DEGRADED_START, totalEntries); break; case USER_DEFINED: visitIndexes(entryContainer, new SpecificIndexFilter(selector, rebuildConfig.getRebuildList())); if (!rebuildConfig.isClearDegradedState()) { logger.info(NOTE_REBUILD_START, Utils.joinAsString(", ", rebuildConfig.getRebuildList()), totalEntries); } break; } return selector.getSelectedIndexNames(); } private static File prepareTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) throws InitializationException { final File tempDir = new File(getFileForPath(tmpDirectory != null ? tmpDirectory : DEFAULT_TMP_DIR), backendCfg.getBackendId()); recursiveDelete(tempDir); if (!tempDir.exists() && !tempDir.mkdirs()) { throw new InitializationException(ERR_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir)); } return tempDir; } private static int computeBufferSize(int nbBuffer, long availableMemory) throws InitializationException { if (BufferPool.supportOffHeap()) { return MAX_BUFFER_SIZE; } final int bufferSize = Math.min((int) (availableMemory / nbBuffer), MAX_BUFFER_SIZE); if (bufferSize < MIN_BUFFER_SIZE) { // Not enough memory. throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(availableMemory, nbBuffer * MIN_BUFFER_SIZE + REQUIRED_FREE_MEMORY)); } return bufferSize; } /** * Calculates the amount of available memory which can be used by this import, taking into account whether or not * the import is running offline or online as a task. */ private long calculateAvailableMemory() { final Runtime runtime = Runtime.getRuntime(); // call twice gc to ensure finalizers are called // and young to old gen references are properly gc'd runtime.gc(); runtime.gc(); final long totalAvailableMemory; if (DirectoryServer.isRunning()) { // Online import/rebuild. final long availableMemory = serverContext.getMemoryQuota().getAvailableMemory(); totalAvailableMemory = Math.max(availableMemory, 16 * MB); } else { // Offline import/rebuild. totalAvailableMemory = Platform.getUsableMemoryForCaching(); } // Now take into account various fudge factors. int importMemPct = 90; if (totalAvailableMemory <= SMALL_HEAP_SIZE) { // Be pessimistic when memory is low. importMemPct -= 25; } final long usedMemory = runtime.totalMemory() - runtime.freeMemory() + DB_CACHE_SIZE + REQUIRED_FREE_MEMORY; return (totalAvailableMemory * importMemPct / 100) - usedMemory; } } /** Source of LDAP {@link Entry}s to process. */ private interface Source { /** Process {@link Entry}s extracted from a {@link Source} */ interface EntryProcessor { void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws Exception; } void processAllEntries(EntryProcessor processor) throws Exception; boolean isCancelled(); } /** Extract LDAP {@link Entry}s from an LDIF file. */ private static final class LDIFReaderSource implements Source { private static final String PHASE1_REPORTER_THREAD_NAME = "PHASE1-REPORTER-%d"; private final Map<DN, EntryContainer> entryContainers; private final LDIFImportConfig importConfig; private final ImportLDIFReader reader; private final ExecutorService executor; private final int nbThreads; LDIFReaderSource(RootContainer rootContainer, LDIFImportConfig importConfig, String threadNameTemplate, int nbThreads) throws IOException { this.importConfig = importConfig; this.reader = new ImportLDIFReader(importConfig, rootContainer); this.entryContainers = new HashMap<>(); for (EntryContainer container : rootContainer.getEntryContainers()) { this.entryContainers.put(container.getBaseDN(), container); } this.nbThreads = nbThreads; this.executor = Executors.newFixedThreadPool(nbThreads, newThreadFactory(null, threadNameTemplate, true)); } @Override public void processAllEntries(final EntryProcessor entryProcessor) throws Exception { final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true)); scheduler.scheduleAtFixedRate(new PhaseOneProgressReporter(), 10, 10, TimeUnit.SECONDS); final CompletionService<Void> completion = new ExecutorCompletionService<>(executor); try { for (int i = 0; i < nbThreads; i++) { completion.submit(new Callable<Void>() { @Override public Void call() throws Exception { EntryInformation entryInfo; while ((entryInfo = reader.readEntry(entryContainers)) != null && !importConfig.isCancelled()) { final EntryContainer entryContainer = entryInfo.getEntryContainer(); final Entry entry = entryInfo.getEntry(); final DN entryDN = entry.getName(); final DN parentDN = entryContainer.getParentWithinBase(entryDN); if (parentDN != null) { reader.waitIfPending(parentDN); } try { entryProcessor.processEntry(entryContainer, entryInfo.getEntryID(), entry); } catch (DirectoryException e) { reader.rejectEntry(entry, e.getMessageObject()); } catch (Exception e) { reader.rejectEntry(entry, ERR_EXECUTION_ERROR.get(e)); } finally { reader.removePending(entry.getName()); } } return null; } }); } waitTasksTermination(completion, nbThreads); } finally { scheduler.shutdown(); executor.shutdown(); } } long getEntriesRead() { return reader.getEntriesRead(); } long getEntriesIgnored() { return reader.getEntriesIgnored(); } long getEntriesRejected() { return reader.getEntriesRejected(); } @Override public boolean isCancelled() { return importConfig.isCancelled(); } /** This class reports progress of first phase of import processing at fixed intervals. */ private final class PhaseOneProgressReporter extends TimerTask { /** The number of entries that had been read at the time of the previous progress report. */ private long previousCount; /** The time in milliseconds of the previous progress report. */ private long previousTime; /** Create a new import progress task. */ public PhaseOneProgressReporter() { previousTime = System.currentTimeMillis(); } /** The action to be performed by this timer task. */ @Override public void run() { long entriesRead = reader.getEntriesRead(); long entriesIgnored = reader.getEntriesIgnored(); long entriesRejected = reader.getEntriesRejected(); long deltaCount = entriesRead - previousCount; long latestTime = System.currentTimeMillis(); long deltaTime = latestTime - previousTime; if (deltaTime == 0) { return; } float rate = 1000f * deltaCount / deltaTime; logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate); previousCount = entriesRead; previousTime = latestTime; } } } /** Extract LDAP {@link Entry}s from an existing database. */ private static final class ID2EntrySource implements Source { private static final String PHASE1_REPORTER_THREAD_NAME = "REPORTER-%d"; private final EntryContainer entryContainer; private final CompressedSchema schema; private final Importer importer; private final ExecutorService executor; private final long nbTotalEntries; private final AtomicLong nbEntriesProcessed = new AtomicLong(); private volatile boolean interrupted; ID2EntrySource(EntryContainer entryContainer, Importer importer, String threadNameTemplate, int nbThread, long nbTotalEntries) { this.nbTotalEntries = nbTotalEntries; this.entryContainer = entryContainer; this.importer = importer; this.schema = entryContainer.getRootContainer().getCompressedSchema(); // by default (unfortunately) the ThreadPoolExecutor will throw an exception when queue is full. this.executor = new ThreadPoolExecutor(nbThread, nbThread, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(nbThread * 2), newThreadFactory(null, threadNameTemplate, true), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // this will block if the queue is full try { executor.getQueue().put(r); } catch (InterruptedException e) { } } }); } @Override public void processAllEntries(final EntryProcessor entryProcessor) throws Exception { final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE1_REPORTER_THREAD_NAME, true)); scheduler.scheduleAtFixedRate(new PhaseOneProgressReporter(), 10, 10, TimeUnit.SECONDS); final PromiseImpl<Void, Exception> promise = PromiseImpl.create(); try (final SequentialCursor<ByteString, ByteString> cursor = importer.openCursor(entryContainer.getID2Entry().getName())) { while (cursor.next()) { final ByteString key = cursor.getKey(); final ByteString value = cursor.getValue(); executor.submit(new Runnable() { @Override public void run() { try { entryProcessor.processEntry(entryContainer, new EntryID(key), ID2Entry.entryFromDatabase(value, schema)); nbEntriesProcessed.incrementAndGet(); } catch (Exception e) { interrupted = true; promise.handleException(e); } } }); } } finally { executor.shutdown(); executor.awaitTermination(30, TimeUnit.SECONDS); scheduler.shutdown(); } // Forward exception if any if (promise.isDone()) { promise.getOrThrow(0, TimeUnit.SECONDS); } } @Override public boolean isCancelled() { return interrupted; } /** This class reports progress of first phase of import processing at fixed intervals. */ private final class PhaseOneProgressReporter extends TimerTask { /** The number of entries that had been read at the time of the previous progress report. */ private long previousCount; /** The time in milliseconds of the previous progress report. */ private long previousTime; /** Create a new import progress task. */ public PhaseOneProgressReporter() { previousTime = System.currentTimeMillis(); } /** The action to be performed by this timer task. */ @Override public void run() { long entriesRead = nbEntriesProcessed.get(); long deltaCount = entriesRead - previousCount; long latestTime = System.currentTimeMillis(); long deltaTime = latestTime - previousTime; final float progressPercent = nbTotalEntries > 0 ? Math.round((100f * entriesRead) / nbTotalEntries) : 0; if (deltaTime == 0) { return; } float rate = 1000f * deltaCount / deltaTime; logger.info(NOTE_REBUILD_PROGRESS_REPORT, progressPercent, entriesRead, nbTotalEntries, rate); previousCount = entriesRead; previousTime = latestTime; } } } /** Max size of phase one buffer. */ private static final int MAX_BUFFER_SIZE = 2 * MB; /** Min size of phase one buffer. */ private static final int MIN_BUFFER_SIZE = 4 * KB; /** DB cache size to use during import */ private static final int DB_CACHE_SIZE = 4 * MB; /** DB cache size to use during import */ /** Required free memory for this importer */ private static final int REQUIRED_FREE_MEMORY = 50 * MB; /** LDIF reader. */ /** Map of DNs to Suffix objects. */ private final AbstractTwoPhaseImportStrategy importStrategy; private final String phase2ThreadNameTemplate; private final AtomicLong importedCount = new AtomicLong(); private long phaseOneTimeMs; private long phaseTwoTimeMs; private OnDiskMergeImporter(String phase2ThreadNameTemplate, AbstractTwoPhaseImportStrategy importStrategy) { this.phase2ThreadNameTemplate = phase2ThreadNameTemplate; this.importStrategy = importStrategy; } public void doImport(final Source source) throws Exception { final long phaseOneStartTime = System.currentTimeMillis(); final PhaseOneWriteableTransaction transaction = new PhaseOneWriteableTransaction(importStrategy); importedCount.set(0); final ConcurrentMap<EntryContainer, CountDownLatch> importedContainers = new ConcurrentHashMap<>(); // Start phase one source.processAllEntries(new Source.EntryProcessor() { @Override public void processEntry(EntryContainer container, EntryID entryID, Entry entry) throws DirectoryException, InterruptedException { CountDownLatch latch = importedContainers.get(container); if (latch == null) { final CountDownLatch newLatch = new CountDownLatch(1); if (importedContainers.putIfAbsent(container, newLatch) == null) { importStrategy.beforeImport(container); newLatch.countDown(); } latch = importedContainers.get(container); } latch.await(); importStrategy.validate(container, entryID, entry); container.importEntry(transaction, entryID, entry); importedCount.incrementAndGet(); } }); phaseOneTimeMs = System.currentTimeMillis() - phaseOneStartTime; if (source.isCancelled()) { throw new InterruptedException("Import processing canceled."); } // Start phase two final long phaseTwoStartTime = System.currentTimeMillis(); try (final PhaseTwoProgressReporter progressReporter = new PhaseTwoProgressReporter()) { final List<Callable<Void>> tasks = new ArrayList<>(); final Set<String> importedBaseDNs = new HashSet<>(); for (Map.Entry<TreeName, Chunk> treeChunk : transaction.getChunks().entrySet()) { importedBaseDNs.add(treeChunk.getKey().getBaseDN()); tasks.add(importStrategy.newPhaseTwoTask(treeChunk.getKey(), treeChunk.getValue(), progressReporter)); } invokeParallel(phase2ThreadNameTemplate, tasks); } // Finish import for(EntryContainer entryContainer : importedContainers.keySet()) { importStrategy.afterImport(entryContainer); } phaseTwoTimeMs = System.currentTimeMillis() - phaseTwoStartTime; } public long getImportedCount() { return importedCount.get(); } public long getPhaseOneTimeInMillis() { return phaseOneTimeMs; } public long getPhaseTwoTimeInMillis() { return phaseTwoTimeMs; } public long getTotalTimeInMillis() { return phaseOneTimeMs + phaseTwoTimeMs; } /** Create {@link Chunk} depending on the {@link TreeName}. */ private interface ChunkFactory { Chunk newChunk(TreeName treeName) throws Exception; } /** Provides default behavior for two phases strategies. */ private static abstract class AbstractTwoPhaseImportStrategy implements ChunkFactory { protected final Map<String, EntryContainer> entryContainers; protected final Executor sorter; protected final Importer importer; protected final BufferPool bufferPool; protected final File tempDir; AbstractTwoPhaseImportStrategy(Collection<EntryContainer> entryContainers, Importer importer, File tempDir, BufferPool bufferPool, Executor sorter) { this.entryContainers = new HashMap<>(entryContainers.size()); for (EntryContainer container : entryContainers) { this.entryContainers.put(container.getTreePrefix(), container); } this.importer = importer; this.tempDir = tempDir; this.bufferPool = bufferPool; this.sorter = sorter; } abstract void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException; void beforeImport(EntryContainer entryContainer) { clearEntryContainerTrees(entryContainer); visitIndexes(entryContainer, new TrustModifier(importer, false)); } abstract Callable<Void> newPhaseTwoTask(TreeName treeName, Chunk chunk, PhaseTwoProgressReporter progressReporter); void afterImport(EntryContainer entryContainer) { visitIndexes(entryContainer, new TrustModifier(importer, true)); } final Set<EntryContainer> extractEntryContainers(Collection<TreeName> treeNames) { final Set<EntryContainer> containers = new HashSet<>(); for(TreeName treeName : treeNames) { containers.add(entryContainers.get(treeName.getBaseDN())); } return containers; } final void clearEntryContainerTrees(EntryContainer entryContainer) { for(Tree tree : entryContainer.listTrees()) { importer.clearTree(tree.getName()); } } final Chunk newExternalSortChunk(TreeName treeName) throws Exception { return new ExternalSortChunk(tempDir, treeName.toString(), bufferPool, newCollector(entryContainers.get(treeName.getBaseDN()), treeName), sorter); } final Callable<Void> newChunkCopierTask(TreeName treeName, final Chunk chunk, PhaseTwoProgressReporter progressReporter) { return new CleanImportTask(progressReporter, chunk, treeName, importer); } final Callable<Void> newDN2IDImporterTask(TreeName treeName, final Chunk chunk, PhaseTwoProgressReporter progressReporter, boolean dn2idAlreadyImported) { final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN()); final ID2Count id2count = entryContainer.getID2ChildrenCount(); return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), chunk, id2count, newCollector(entryContainer, id2count.getName()), dn2idAlreadyImported); } final static Callable<Void> newFlushTask(final Chunk chunk) { return new Callable<Void>() { @Override public Void call() throws Exception { try (final MeteredCursor<ByteString, ByteString> unusued = chunk.flip()) { // force flush } return null; } }; } } /** * No validation is performed, every {@link TreeName} (but id2entry) are imported into dedicated * {@link ExternalSortChunk} before being imported into the {@link Importer}. id2entry which is directly copied into * the database through {@link ImporterToChunkAdapter}. */ private static final class SortAndImportWithoutDNValidation extends AbstractTwoPhaseImportStrategy { SortAndImportWithoutDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir, BufferPool bufferPool, Executor sorter) { super(entryContainers, importer, tempDir, bufferPool, sorter); } @Override public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) { // No validation performed. All entries are considered valid. } @Override public Chunk newChunk(TreeName treeName) throws Exception { if (isID2Entry(treeName)) { importer.clearTree(treeName); return new MostlyOrderedChunk(asChunk(treeName, importer)); } return newExternalSortChunk(treeName); } @Override public Callable<Void> newPhaseTwoTask(TreeName treeName, final Chunk chunk, PhaseTwoProgressReporter progressReporter) { if (isID2Entry(treeName)) { return newFlushTask(chunk); } else if (isDN2ID(treeName)) { return newDN2IDImporterTask(treeName, chunk, progressReporter, false); } return newChunkCopierTask(treeName, chunk, progressReporter); } } /** * This strategy performs two validations by ensuring that there is no duplicate entry (entry with same DN) and that * the given entry has an existing parent. To do so, the dn2id is directly imported into the database in addition of * id2entry. Others tree are externally sorted before being imported into the database. */ private static final class SortAndImportWithDNValidation extends AbstractTwoPhaseImportStrategy implements ReadableTransaction { private static final int DN_CACHE_SIZE = 16; private final LRUPresenceCache<DN> dnCache = new LRUPresenceCache<>(DN_CACHE_SIZE); SortAndImportWithDNValidation(Collection<EntryContainer> entryContainers, Importer importer, File tempDir, BufferPool bufferPool, Executor sorter) { super(entryContainers, importer, tempDir, bufferPool, sorter); } @Override public Chunk newChunk(TreeName treeName) throws Exception { if (isID2Entry(treeName)) { importer.clearTree(treeName); return new MostlyOrderedChunk(asChunk(treeName, importer)); } else if (isDN2ID(treeName)) { importer.clearTree(treeName); return asChunk(treeName, importer); } return newExternalSortChunk(treeName); } @Override public Callable<Void> newPhaseTwoTask(TreeName treeName, final Chunk chunk, PhaseTwoProgressReporter progressReporter) { if (isID2Entry(treeName)) { return newFlushTask(chunk); } else if (isDN2ID(treeName)) { return newDN2IDImporterTask(treeName, chunk, progressReporter, true); } return newChunkCopierTask(treeName, chunk, progressReporter); } @Override public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException { final DN2ID dn2Id = entryContainer.getDN2ID(); final DN entryDN = entry.getName(); final DN parentDN = entryContainer.getParentWithinBase(entryDN); if (parentDN != null && !dnCache.contains(parentDN) && dn2Id.get(this, parentDN) == null) { throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN)); } if (dn2Id.get(this, entryDN) != null) { throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, ERR_ADD_ENTRY_ALREADY_EXISTS.get(entry)); } dnCache.add(entryDN); } @Override public ByteString read(TreeName treeName, ByteSequence key) { return importer.read(treeName, key); } @Override public Cursor<ByteString, ByteString> openCursor(TreeName treeName) { throw new UnsupportedOperationException(); } @Override public long getRecordCount(TreeName treeName) { throw new UnsupportedOperationException(); } } /** Import only a specific indexes list while ignoring everything else. */ private static final class RebuildIndexStrategy extends AbstractTwoPhaseImportStrategy { private final Set<String> indexNames; RebuildIndexStrategy(Collection<EntryContainer> entryContainers, Importer importer, File tempDir, BufferPool bufferPool, Executor sorter, Collection<String> indexNames) { super(entryContainers, importer, tempDir, bufferPool, sorter); this.indexNames = new HashSet<>(indexNames.size()); for(String indexName : indexNames) { this.indexNames.add(indexName.toLowerCase()); } } @Override void beforeImport(EntryContainer entryContainer) { visitIndexes(entryContainer, new SpecificIndexFilter(new TrustModifier(importer, false), indexNames)); visitIndexes(entryContainer, new SpecificIndexFilter(new ClearDatabase(importer), indexNames)); } @Override void afterImport(EntryContainer entryContainer) { visitIndexes(entryContainer, new SpecificIndexFilter(new TrustModifier(importer, true), indexNames)); } @Override public Chunk newChunk(TreeName treeName) throws Exception { if (indexNames.contains(treeName.getIndexId().toLowerCase())) { return newExternalSortChunk(treeName); } // Ignore return nullChunk(); } @Override public Callable<Void> newPhaseTwoTask(TreeName treeName, Chunk chunk, PhaseTwoProgressReporter progressReporter) { if (indexNames.contains(treeName.getIndexId().toLowerCase())) { if (isDN2ID(treeName)) { return newDN2IDImporterTask(treeName, chunk, progressReporter, false); } return newChunkCopierTask(treeName, chunk, progressReporter); } // Do nothing (flush null chunk) return newFlushTask(chunk); } @Override public void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException { // No validation performed. All entries are considered valid. } } private static <V> List<V> invokeParallel(String threadNameTemplate, Collection<Callable<V>> tasks) throws InterruptedException, ExecutionException { final ExecutorService executor = Executors.newCachedThreadPool(newThreadFactory(null, threadNameTemplate, true)); try { final CompletionService<V> completionService = new ExecutorCompletionService<>(executor); for (Callable<V> task : tasks) { completionService.submit(task); } return waitTasksTermination(completionService, tasks.size()); } finally { executor.shutdown(); } } /** * A {@link WriteableTransaction} delegates the storage of data to {@link Chunk}s which are created on-demand for each * {@link TreeName} through the provided {@link ChunkFactory}. Once there is no more data to import, call * {@link #getChunks()} to get the resulting {@link Chunk}s containing the sorted data to import into database. * {@link #put(TreeName, ByteSequence, ByteSequence)} is thread-safe. Since there is only one {@link Chunk} created * per {@link TreeName}, the {@link Chunk#put(ByteSequence, ByteSequence)} method of returned {@link Chunk} must be * thread-safe. */ private static final class PhaseOneWriteableTransaction implements WriteableTransaction { private final ConcurrentMap<TreeName, Chunk> chunks = new ConcurrentHashMap<>(); private final ChunkFactory chunkFactory; PhaseOneWriteableTransaction(ChunkFactory chunkFactory) { this.chunkFactory = chunkFactory; } Map<TreeName, Chunk> getChunks() { return chunks; } /** * Store record into a {@link Chunk}. Creating one if none is existing for the given treeName. This method is * thread-safe. */ @Override public void put(final TreeName treeName, ByteSequence key, ByteSequence value) { try { getOrCreateChunk(treeName).put(key, value); } catch (Exception e) { throw new StorageRuntimeException(e); } } private Chunk getOrCreateChunk(final TreeName treeName) throws Exception { Chunk alreadyExistingChunk = chunks.get(treeName); if (alreadyExistingChunk != null) { return alreadyExistingChunk; } final Chunk newChunk = chunkFactory.newChunk(treeName); alreadyExistingChunk = chunks.putIfAbsent(treeName, newChunk); if (alreadyExistingChunk != null) { // Another thread was faster at creating a new chunk, close this one. newChunk.delete(); return alreadyExistingChunk; } return newChunk; } @Override public ByteString read(TreeName treeName, ByteSequence key) { throw new UnsupportedOperationException(); } @Override public boolean update(TreeName treeName, ByteSequence key, UpdateFunction f) { throw new UnsupportedOperationException(); } @Override public Cursor<ByteString, ByteString> openCursor(TreeName treeName) { throw new UnsupportedOperationException(); } @Override public long getRecordCount(TreeName treeName) { throw new UnsupportedOperationException(); } @Override public void openTree(TreeName name, boolean createOnDemand) { throw new UnsupportedOperationException(); } @Override public void deleteTree(TreeName name) { throw new UnsupportedOperationException(); } @Override public boolean delete(TreeName treeName, ByteSequence key) { throw new UnsupportedOperationException(); } } /** * Chunk implementations are a data storage with an optional limited capacity. Chunk are typically used by first * adding data to the storage using {@link put(ByteSequence, ByteSequence)} later on data can be sequentially accessed * using {@link flip()}. */ interface Chunk { /** * Add data to the storage. Wherever this method is thread-safe or not is implementation dependent. * * @return true if the data were added to the storage, false if the chunk is full. */ boolean put(ByteSequence key, ByteSequence value); /** * Flip this chunk from write-only to read-only in order to get the previously stored data. This method must be * called only once. After flip is called, Chunk instance must not be used anymore. * * @return a {@link MeteredCursor} to access the data */ MeteredCursor<ByteString, ByteString> flip(); /** * Return size of data contained in this chunk. This size is guaranteed to be consistent only if there is no pending * {@link #put(ByteSequence, ByteSequence)} operations. */ long size(); /** * While chunk's memory and files are automatically garbage collected/deleted at exit, this method can be called to * clean things now. */ void delete(); } /** * Store and sort data into multiple chunks. Thanks to the chunk rolling mechanism, this chunk can sort and store an * unlimited amount of data. This class uses double-buffering: data are firstly stored in a * {@link InMemorySortedChunk} which, once full, will be asynchronously sorted and copied into a * {@link FileRegionChunk}. Duplicate keys are reduced by a {@link Collector}. {@link #put(ByteSequence, * ByteSequence))} is thread-safe. This class is used in phase-one. There is one {@link ExternalSortChunk} per * database tree, shared across all phase-one importer threads, in charge of storing/sorting records. */ static final class ExternalSortChunk implements Chunk { /** Name reported by the {@link MeteredCursor} after {@link #flip()}. */ private final String name; /** Provides buffer used to store and sort chunk of data. */ private final BufferPool bufferPool; /** File containing the regions used to store the data. */ private final File file; private final FileChannel channel; /** Pointer to the next available region in the file, typically at end of file. */ private final AtomicLong filePosition = new AtomicLong(); /** Collector used to reduces the number of duplicate keys during sort. */ private final Collector<?, ByteString> deduplicator; /** Keep track of pending sorting tasks. */ private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter; /** Keep track of currently opened chunks. */ private final Set<Chunk> activeChunks = Collections.synchronizedSet(new HashSet<Chunk>()); /** Keep track of the number of chunks created. */ private final AtomicInteger nbSortedChunks = new AtomicInteger(); /** Size approximation of data contained in this chunk. */ private final AtomicLong size = new AtomicLong(); /** Active chunk for the current thread. */ private final ThreadLocal<Chunk> currentChunk = new ThreadLocal<Chunk>() { @Override protected Chunk initialValue() { return nullChunk(); } }; ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> collector, Executor sortExecutor) throws IOException { FileChannel candidateChannel = null; File candidateFile = null; while (candidateChannel == null) { candidateFile = new File(tempDir, (name + UUID.randomUUID()).replaceAll("\\W+", "_")); try { candidateChannel = open(candidateFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE); candidateFile.deleteOnExit(); } catch (FileAlreadyExistsException ignore) { // someone else got it } } this.name = name; this.bufferPool = bufferPool; this.deduplicator = collector; this.file = candidateFile; this.channel = candidateChannel; this.sorter = new ExecutorCompletionService<>(sortExecutor); } @Override public boolean put(final ByteSequence key, final ByteSequence value) { final Chunk chunk = currentChunk.get(); if (!chunk.put(key, value)) { sortAndAppendChunkAsync(chunk); activeChunks.remove(chunk); final Chunk newChunk = new InMemorySortedChunk(name, bufferPool); activeChunks.add(newChunk); currentChunk.set(newChunk); newChunk.put(key, value); } return true; } @Override public MeteredCursor<ByteString, ByteString> flip() { for (Chunk chunk : activeChunks) { sortAndAppendChunkAsync(chunk); } try { return new CollectorCursor<>( new CompositeCursor<>(name, waitTasksTermination(sorter, nbSortedChunks.get())), deduplicator); } catch (ExecutionException | InterruptedException e) { throw new StorageRuntimeException(e); } } @Override public long size() { long activeSize = 0; for (Chunk chunk : activeChunks) { activeSize += chunk.size(); } return size.get() + activeSize; } @Override public void delete() { closeSilently(channel); file.delete(); } int getNbSortedChunks() { return nbSortedChunks.get(); } private void sortAndAppendChunkAsync(final Chunk chunk) { size.addAndGet(chunk.size()); final long startOffset = filePosition.getAndAdd(chunk.size()); nbSortedChunks.incrementAndGet(); sorter.submit(new Callable<MeteredCursor<ByteString, ByteString>>() { @Override public MeteredCursor<ByteString, ByteString> call() throws Exception { /* * NOTE: The resulting size of the FileRegionChunk might be less than chunk.size() because of key * de-duplication performed by the CollectorCursor. Thanks to SPARSE_FILE option, the delta between size * allocated and the size actually used is not wasted. */ final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size()); try (final SequentialCursor<ByteString, ByteString> source = new CollectorCursor<>(chunk.flip(), deduplicator)) { copyIntoChunk(source, persistentChunk); } return persistentChunk.flip(); } }); } /** * Store data inside fixed-size byte arrays. Data stored in this chunk are sorted by key during the flip() so that * they can be cursored ascendantly. Byte arrays are supplied through a {@link BufferPool}. To allow sort operation, * data must be accessible randomly. To do so, offsets of each key/value records are stored in the buffer. To * maximize space occupation, buffer content is split in two parts: one contains records offset, the other contains * the records themselves: * * <pre> * ----------> offset writer direction ----------------> |<- free ->| <---- record writer direction --- * +-----------------+-----------------+-----------------+----------+----------+----------+----------+ * | offset record 1 | offset record 2 | offset record n | .........| record n | record 2 | record 1 | * +-----------------+-----------------+-----------------+----------+----------+----------+----------+ * </pre> * * Each record is the concatenation of a key/value (length are encoded using {@link PackedLong} representation) * * <pre> * +------------+--------------+--------------+----------------+ * | key length | key bytes... | value length | value bytes... | * +------------+--------------+--------------+----------------+ * </pre> */ static final class InMemorySortedChunk implements Chunk, Comparator<Integer> { private static final int INT_SIZE = Integer.SIZE / Byte.SIZE; private final String metricName; private final BufferPool bufferPool; private final Buffer buffer; private long totalBytes; private int indexPos; private int dataPos; private int nbRecords; InMemorySortedChunk(String name, BufferPool bufferPool) { this.metricName = name; this.bufferPool = bufferPool; this.buffer = bufferPool.get(); this.dataPos = buffer.length(); } @Override public boolean put(ByteSequence key, ByteSequence value) { final int keyHeaderSize = PackedLong.getEncodedSize(key.length()); final int valueHeaderSize = PackedLong.getEncodedSize(value.length()); final int keyRecordSize = keyHeaderSize + key.length(); final int recordSize = keyRecordSize + valueHeaderSize + value.length(); dataPos -= recordSize; final int recordDataPos = dataPos; final int recordIndexPos = indexPos; indexPos += INT_SIZE; if (indexPos > dataPos) { // Chunk is full return false; } nbRecords++; totalBytes += recordSize; // Write record offset buffer.writeInt(recordIndexPos, recordDataPos); final int valuePos = writeDataAt(recordDataPos, key); writeDataAt(valuePos, value); return true; } private int writeDataAt(int offset, ByteSequence data) { final int headerSize = buffer.writeCompactUnsignedLong(offset, data.length()); buffer.writeByteSequence(offset + headerSize, data); return offset + headerSize + data.length(); } @Override public long size() { return totalBytes; } @Override public MeteredCursor<ByteString, ByteString> flip() { Collections.sort(new AbstractList<Integer>() { @Override public Integer get(int index) { return getOffsetAtPosition(index * INT_SIZE); } private Integer getOffsetAtPosition(int pos) { return (int) buffer.readInt(pos); } @Override public Integer set(int index, Integer element) { final int pos = index * INT_SIZE; final Integer valueA = getOffsetAtPosition(pos); buffer.writeInt(pos, element); return valueA; } @Override public int size() { return nbRecords; } }, this); return new InMemorySortedChunkCursor(); } @Override public int compare(Integer offsetA, Integer offsetB) { final int iOffsetA = offsetA.intValue(); final int iOffsetB = offsetB.intValue(); if (iOffsetA == iOffsetB) { return 0; } // Compare Keys final int keyLengthA = (int) buffer.readCompactUnsignedLong(iOffsetA); final int keyOffsetA = iOffsetA + PackedLong.getEncodedSize(keyLengthA); final int keyLengthB = (int) buffer.readCompactUnsignedLong(iOffsetB); final int keyOffsetB = iOffsetB + PackedLong.getEncodedSize(keyLengthB); return buffer.compare(keyOffsetA, keyLengthA, keyOffsetB, keyLengthB); } @Override public void delete() { bufferPool.release(buffer); } /** Cursor of the in-memory chunk */ private final class InMemorySortedChunkCursor implements MeteredCursor<ByteString, ByteString> { private ByteString key; private ByteString value; private volatile long bytesRead; private int indexOffset; @Override public boolean next() { if (bytesRead >= totalBytes) { key = value = null; return false; } final int recordOffset = buffer.readInt(indexOffset); final int keyLength = (int) buffer.readCompactUnsignedLong(recordOffset); final int keyHeaderSize = PackedLong.getEncodedSize(keyLength); key = buffer.readByteString(recordOffset + keyHeaderSize, keyLength); final int valueOffset = recordOffset + keyHeaderSize + keyLength; final int valueLength = (int) buffer.readCompactUnsignedLong(valueOffset); final int valueHeaderSize = PackedLong.getEncodedSize(valueLength); value = buffer.readByteString(valueOffset + valueHeaderSize, valueLength); indexOffset += INT_SIZE; bytesRead += keyHeaderSize + keyLength + valueHeaderSize + valueLength; return true; } @Override public boolean isDefined() { return key != null; } @Override public ByteString getKey() throws NoSuchElementException { throwIfUndefined(this); return key; } @Override public ByteString getValue() throws NoSuchElementException { throwIfUndefined(this); return value; } @Override public void close() { key = value = null; bufferPool.release(buffer); } @Override public String getMetricName() { return metricName; } @Override public long getNbBytesRead() { return bytesRead; } @Override public long getNbBytesTotal() { return totalBytes; } } } /** * Store data inside a region contained in a file. A regions is delimited by an offset and a length. The region is * memory-mapped and the data are appended in the memory-mapped region until it is full. Region store a * concatenation of key/value records: (Key & value sizes are stored using {@link PackedLong} format.) * * <pre> * +------------+--------------+--------------+----------------+ * | key length | value length | key bytes... | value bytes... | * +------------+--------------+--------------+----------------+ * </pre> */ static final class FileRegionChunk implements Chunk { private final String metricName; private final FileChannel channel; private final long startOffset; private long size; private MappedByteBuffer mmapBuffer; private OutputStream mmapBufferOS = new OutputStream() { @Override public void write(int arg0) throws IOException { mmapBuffer.put((byte) arg0); } }; FileRegionChunk(String name, FileChannel channel, long startOffset, long size) throws IOException { this.metricName = name; this.channel = channel; this.startOffset = startOffset; if (size > 0) { /* * Make sure that the file is big-enough to encapsulate this memory-mapped region. Thanks to SPARSE_FILE this * operation should be fast even for big region. */ channel.write(ByteBuffer.wrap(new byte[] { 0 }), (startOffset + size) - 1); } this.mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size); } @Override public boolean put(ByteSequence key, ByteSequence value) { final int recordSize = PackedLong.getEncodedSize(key.length()) + key.length() + PackedLong.getEncodedSize(value.length()) + value .length(); if (mmapBuffer.remaining() < recordSize) { // The regions is full return false; } try { PackedLong.writeCompactUnsigned(mmapBufferOS, key.length()); PackedLong.writeCompactUnsigned(mmapBufferOS, value.length()); } catch (IOException e) { throw new StorageRuntimeException(e); } key.copyTo(mmapBuffer); value.copyTo(mmapBuffer); return true; } @Override public long size() { return mmapBuffer == null ? size : mmapBuffer.position(); } @Override public MeteredCursor<ByteString, ByteString> flip() { size = mmapBuffer.position(); /* * We force OS to write dirty pages now so that they don't accumulate. Indeed, huge number of dirty pages might * cause the OS to freeze the producer of those dirty pages (this importer) while it is swapping-out the pages. */ mmapBuffer.force(); mmapBuffer = null; try { return new FileRegionChunkCursor(channel.map(MapMode.READ_ONLY, startOffset, size)); } catch (IOException e) { throw new StorageRuntimeException(e); } } @Override public void delete() { // Nothing to do } /** Cursor through the specific memory-mapped file's region. */ private final class FileRegionChunkCursor implements MeteredCursor<ByteString, ByteString> { private final ByteBuffer region; private final InputStream asInputStream = new InputStream() { @Override public int read() throws IOException { return region.get() & 0xFF; } }; private ByteString key, value; FileRegionChunkCursor(MappedByteBuffer data) { this.region = data; } @Override public boolean next() { if (!region.hasRemaining()) { key = value = null; return false; } final int keyLength; final int valueLength; try { keyLength = (int) PackedLong.readCompactUnsignedLong(asInputStream); valueLength = (int) PackedLong.readCompactUnsignedLong(asInputStream); } catch (IOException e) { throw new StorageRuntimeException(e); } final int recordSize = keyLength + valueLength; final byte[] keyValueData = new byte[recordSize]; region.get(keyValueData, 0, recordSize); key = ByteString.wrap(keyValueData, 0, keyLength); value = ByteString.wrap(keyValueData, keyLength, valueLength); return true; } @Override public boolean isDefined() { return key != null; } @Override public ByteString getKey() throws NoSuchElementException { throwIfUndefined(this); return key; } @Override public ByteString getValue() throws NoSuchElementException { throwIfUndefined(this); return value; } @Override public void close() { key = value = null; } @Override public String getMetricName() { return metricName; } @Override public long getNbBytesRead() { return region.position(); } @Override public long getNbBytesTotal() { return region.limit(); } } } /** A cursor de-duplicating data with the same keys from a sorted cursor. */ static final class CollectorCursor<A, K, V> implements MeteredCursor<K, V> { private final MeteredCursor<K, ? extends V> delegate; private final Collector<A, V> collector; private boolean isDefined; private K key; private V value; CollectorCursor(MeteredCursor<K, ? extends V> cursor, Collector<A, V> collector) { this.delegate = cursor; this.collector = collector; if (!delegate.isDefined()) { delegate.next(); } } @Override public boolean next() { isDefined = delegate.isDefined(); if (isDefined) { key = delegate.getKey(); accumulateValues(); } return isDefined; } private void accumulateValues() { throwIfUndefined(this); A resultContainer = collector.get(); do { resultContainer = collector.accept(resultContainer, delegate.getValue()); } while (delegate.next() && key.equals(delegate.getKey())); value = collector.merge(resultContainer); // Delegate is one step beyond. When delegate.isDefined() return false, we have to return true once more. isDefined = true; } @Override public boolean isDefined() { return isDefined; } @Override public K getKey() throws NoSuchElementException { throwIfUndefined(this); return key; } @Override public V getValue() throws NoSuchElementException { throwIfUndefined(this); return value; } @Override public void close() { key = null; delegate.close(); } @Override public String getMetricName() { return delegate.getMetricName(); } @Override public long getNbBytesRead() { return delegate.getNbBytesRead(); } @Override public long getNbBytesTotal() { return delegate.getNbBytesTotal(); } } /** Provides a globally sorted cursor from multiple sorted cursors. */ static final class CompositeCursor<K extends Comparable<? super K>, V> implements MeteredCursor<K, V> { /** Contains the non empty and sorted cursors ordered in regards of their current key. */ private final NavigableSet<MeteredCursor<K, V>> orderedCursors; private final String metricName; private final long totalBytes; private volatile long bytesRead; private K key; private V value; CompositeCursor(String metricName, Collection<MeteredCursor<K, V>> cursors) { this.metricName = metricName; this.orderedCursors = new TreeSet<>(new Comparator<MeteredCursor<K, V>>() { @Override public int compare(MeteredCursor<K, V> o1, MeteredCursor<K, V> o2) { final int cmp = o1.getKey().compareTo(o2.getKey()); // Never return 0. Otherwise both cursors are considered equal and only one of them is kept by this set return cmp == 0 ? Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)) : cmp; } }); long totalBytesSum = 0; for (MeteredCursor<K, V> cursor : cursors) { long previousBytesRead = cursor.getNbBytesRead(); if (cursor.isDefined() || cursor.next()) { if (orderedCursors.add(cursor)) { bytesRead += (cursor.getNbBytesRead() - previousBytesRead); totalBytesSum += cursor.getNbBytesTotal(); } } else { cursor.close(); } } this.totalBytes = totalBytesSum; } /** * Try to get the next record from the cursor containing the lowest entry. If it reaches the end of the lowest * cursor, it calls the close method and begins reading from the next lowest cursor. */ @Override public boolean next() { final MeteredCursor<K, V> lowestCursor = orderedCursors.pollFirst(); if (lowestCursor == null) { key = null; value = null; return false; } key = lowestCursor.getKey(); value = lowestCursor.getValue(); long previousBytesRead = lowestCursor.getNbBytesRead(); if (lowestCursor.next()) { bytesRead += (lowestCursor.getNbBytesRead() - previousBytesRead); orderedCursors.add(lowestCursor); } else { lowestCursor.close(); } return true; } @Override public boolean isDefined() { return key != null; } @Override public K getKey() throws NoSuchElementException { throwIfUndefined(this); return key; } @Override public V getValue() throws NoSuchElementException { throwIfUndefined(this); return value; } @Override public void close() { closeSilently(orderedCursors); } @Override public String getMetricName() { return metricName; } @Override public long getNbBytesRead() { return bytesRead; } @Override public long getNbBytesTotal() { return totalBytes; } } } private static Chunk asChunk(TreeName treeName, Importer importer) { return new ImporterToChunkAdapter(treeName, importer); } /** * Task to copy one {@link Chunk} into a database tree through an {@link Importer}. The specified tree is cleaned * before receiving the data contained in the {@link Chunk}. */ private static final class CleanImportTask implements Callable<Void> { private final PhaseTwoProgressReporter reporter; private final TreeName treeName; private final Importer destination; private final Chunk source; CleanImportTask(PhaseTwoProgressReporter reporter, Chunk source, TreeName treeName, Importer destination) { this.source = source; this.treeName = treeName; this.destination = destination; this.reporter = reporter; } @Override public Void call() { destination.clearTree(treeName); try (final SequentialCursor<ByteString, ByteString> sourceCursor = trackCursorProgress(reporter, source.flip())) { copyIntoChunk(sourceCursor, asChunk(treeName, destination)); } return null; } } private static void copyIntoChunk(SequentialCursor<ByteString, ByteString> source, Chunk destination) { while (source.next()) { destination.put(source.getKey(), source.getValue()); } } /** * This task optionally copy the dn2id chunk into the database and takes advantages of it's cursoring to compute the * {@link ID2Count} index. */ private static final class DN2IDImporterTask implements Callable<Void> { private final PhaseTwoProgressReporter reporter; private final Importer importer; private final File tempDir; private final BufferPool bufferPool; private final DN2ID dn2id; private final ID2Count id2count; private final Collector<?, ByteString> id2countCollector; private final Chunk dn2IdSourceChunk; private final Chunk dn2IdDestination; DN2IDImporterTask(PhaseTwoProgressReporter progressReporter, Importer importer, File tempDir, BufferPool bufferPool, DN2ID dn2id, Chunk dn2IdChunk, ID2Count id2count, Collector<?, ByteString> id2countCollector, boolean dn2idAlreadyImported) { this.reporter = progressReporter; this.importer = importer; this.tempDir = tempDir; this.bufferPool = bufferPool; this.dn2id = dn2id; this.dn2IdSourceChunk = dn2IdChunk; this.id2count = id2count; this.id2countCollector = id2countCollector; this.dn2IdDestination = dn2idAlreadyImported ? nullChunk() : asChunk(dn2id.getName(), importer); } @Override public Void call() throws Exception { final Chunk id2CountChunk = new ExternalSortChunk(tempDir, id2count.getName().toString(), bufferPool, id2countCollector, sameThreadExecutor()); long totalNumberOfEntries = 0; final TreeVisitor<ChildrenCount> visitor = new ID2CountTreeVisitorImporter(asImporter(id2CountChunk)); try (final MeteredCursor<ByteString, ByteString> chunkCursor = dn2IdSourceChunk.flip(); final SequentialCursor<ByteString, ByteString> dn2idCursor = dn2id.openCursor(trackCursorProgress(reporter, chunkCursor), visitor)) { while (dn2idCursor.next()) { dn2IdDestination.put(dn2idCursor.getKey(), dn2idCursor.getValue()); totalNumberOfEntries++; } } // -1 because baseDN is not counted id2count.importPutTotalCount(asImporter(id2CountChunk), Math.max(0, totalNumberOfEntries - 1)); new CleanImportTask(reporter, id2CountChunk, id2count.getName(), importer).call(); return null; } /** TreeVisitor computing and importing the number of children per parent. */ private final class ID2CountTreeVisitorImporter implements TreeVisitor<ChildrenCount> { private final Importer importer; ID2CountTreeVisitorImporter(Importer importer) { this.importer = importer; } @Override public ChildrenCount beginParent(EntryID parentID) { return new ChildrenCount(parentID); } @Override public void onChild(ChildrenCount parent, EntryID childID) { parent.numberOfChildren++; } @Override public void endParent(ChildrenCount parent) { if (parent.numberOfChildren > 0) { id2count.importPut(importer, parent.parentEntryID, parent.numberOfChildren); } } } /** Keep track of the number of children during the dn2id visit. */ private static final class ChildrenCount { private final EntryID parentEntryID; private long numberOfChildren; private ChildrenCount(EntryID id) { this.parentEntryID = id; } } } private static Importer asImporter(Chunk chunk) { return new ChunkToImporterAdapter(chunk); } /** * Delegates the storage of data to the {@link Importer}. This class has same thread-safeness as the supplied * importer. */ private static final class ImporterToChunkAdapter implements Chunk { private final TreeName treeName; private final Importer importer; private final AtomicLong size = new AtomicLong(); ImporterToChunkAdapter(TreeName treeName, Importer importer) { this.treeName = treeName; this.importer = importer; } @Override public boolean put(ByteSequence key, ByteSequence value) { importer.put(treeName, key, value); size.addAndGet(key.length() + value.length()); return true; } @Override public MeteredCursor<ByteString, ByteString> flip() { return asProgressCursor(importer.openCursor(treeName), treeName.toString(), size.get()); } @Override public long size() { return size.get(); } @Override public void delete() { // Nothing to do } } /** * Delegates the {@link #put(TreeName, ByteSequence, ByteSequence)} method of {@link Importer} to a {@link Chunk}. * {@link #createTree(TreeName)} is a no-op, other methods throw {@link UnsupportedOperationException}. This class has * same thread-safeness as the supplied {@link Chunk}. */ private static final class ChunkToImporterAdapter implements Importer { private final Chunk chunk; ChunkToImporterAdapter(Chunk chunk) { this.chunk = chunk; } @Override public void put(TreeName treeName, ByteSequence key, ByteSequence value) { try { chunk.put(key, value); } catch (Exception e) { throw new StorageRuntimeException(e); } } @Override public void clearTree(TreeName treeName) { throw new UnsupportedOperationException(); } @Override public ByteString read(TreeName treeName, ByteSequence key) { throw new UnsupportedOperationException(); } @Override public SequentialCursor<ByteString, ByteString> openCursor(TreeName treeName) { throw new UnsupportedOperationException(); } @Override public void close() { } } /** * Write records into a delegated {@link Chunk} after performing a reordering of those records in regards of their key * by using a best-effort algorithm. This class is intended to be used when records are initially ordered but might * actually hit a chunk slightly disordered due to scheduling occurring in a multi-threaded environment. Records are * buffered and sorted before being written to the delegated chunk. Because of the buffer mechanism, records might be * written into the chunk after some delay. It's guaranteed that all entries will be written into the chunk only after * the flip() method has been called. {@link #put(TreeName, ByteSequence, ByteSequence)} is thread-safe. */ private static final class MostlyOrderedChunk implements Chunk { /** * Number of items to queue before writing them to the storage. This number must be at least equal to the number of * threads which will access the put() method. If underestimated, {@link #put(ByteSequence, ByteSequence)} might * lead to unordered copy. If overestimated, extra memory is wasted. */ private static final int QUEUE_SIZE = 1024; private final NavigableMap<ByteSequence, ByteSequence> pendingRecords = new TreeMap<>(); private final int queueSize; private final Chunk delegate; MostlyOrderedChunk(Chunk delegate) { this.delegate = delegate; this.queueSize = QUEUE_SIZE; } @Override public void delete() { // Nothing to do } @Override public synchronized boolean put(ByteSequence key, ByteSequence value) { pendingRecords.put(key, value); if (pendingRecords.size() == queueSize) { /* * Maximum size reached, take the record with the smallest key and persist it in the delegate chunk. this * ensures records are (mostly) inserted in ascending key order, which is the optimal insert order for B-trees. */ final Map.Entry<ByteSequence, ByteSequence> lowestEntry = pendingRecords.pollFirstEntry(); return delegate.put(lowestEntry.getKey(), lowestEntry.getValue()); } return true; } @Override public MeteredCursor<ByteString, ByteString> flip() { // Purge pending entries for (Map.Entry<ByteSequence, ByteSequence> lowestEntry : pendingRecords.entrySet()) { delegate.put(lowestEntry.getKey(), lowestEntry.getValue()); } return delegate.flip(); } @Override public long size() { return delegate.size(); } } private static Chunk nullChunk() { return NullChunk.INSTANCE; } /** An empty Chunk which cannot store data. */ private static final class NullChunk implements Chunk { private static final Chunk INSTANCE = new NullChunk(); @Override public boolean put(ByteSequence key, ByteSequence value) { return false; } @Override public long size() { return 0; } @Override public void delete() { // Nothing to do } @Override public MeteredCursor<ByteString, ByteString> flip() { return new MeteredCursor<ByteString, ByteString>() { @Override public boolean next() { return false; } @Override public boolean isDefined() { return false; } @Override public ByteString getKey() throws NoSuchElementException { throw new NoSuchElementException(); } @Override public ByteString getValue() throws NoSuchElementException { throw new NoSuchElementException(); } @Override public void close() { } @Override public String getMetricName() { return NullChunk.class.getSimpleName(); } @Override public long getNbBytesRead() { return 0; } @Override public long getNbBytesTotal() { return 0; } }; } } /** Executor delegating the execution of task to the current thread. */ private static Executor sameThreadExecutor() { return new Executor() { @Override public void execute(Runnable command) { command.run(); } }; } /** Collect the results of asynchronous tasks. */ private static <K> List<K> waitTasksTermination(CompletionService<K> completionService, int nbTasks) throws InterruptedException, ExecutionException { final List<K> results = new ArrayList<>(nbTasks); for (int i = 0; i < nbTasks; i++) { results.add(completionService.take().get()); } return results; } /** Regularly report progress statistics from the registered list of {@link ProgressMetric} */ private static final class PhaseTwoProgressReporter implements Runnable, Closeable { private static final String PHASE2_REPORTER_THREAD_NAME = "PHASE2-REPORTER-%d"; private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(newThreadFactory(null, PHASE2_REPORTER_THREAD_NAME, true)); private final Map<MeteredCursor<?, ?>, Long> lastValues = new WeakHashMap<>(); private ScheduledFuture<?> scheduledTask; private long lastRun = System.currentTimeMillis(); synchronized void addCursor(MeteredCursor<?, ?> cursor) { if (lastValues.put(cursor, 0L) == null) { logger.info(NOTE_IMPORT_LDIF_INDEX_STARTED, cursor.getMetricName(), 1, 1); } if (scheduledTask == null) { scheduledTask = scheduler.scheduleAtFixedRate(this, 10, 10, TimeUnit.SECONDS); } } synchronized void removeCursor(MeteredCursor<?, ?> cursor) { if (lastValues.remove(cursor) != null) { logger.info(NOTE_IMPORT_LDIF_INDEX_CLOSE, cursor.getMetricName()); } } @Override public synchronized void run() { final long deltaTime = System.currentTimeMillis() - lastRun; if (deltaTime == 0) { return; } for (Map.Entry<MeteredCursor<?, ?>, Long> metricLastValue : lastValues.entrySet()) { final MeteredCursor<?, ?> cursor = metricLastValue.getKey(); final long newValue = cursor.getNbBytesRead(); final long totalBytes = cursor.getNbBytesTotal(); final long valueProgress = newValue - metricLastValue.getValue(); final int progressPercent = totalBytes > 0 ? Math.round((100f * newValue) / cursor.getNbBytesTotal()) : 0; final long progressRate = valueProgress / deltaTime; final long progressRemaining = (cursor.getNbBytesTotal() - newValue) / 1024; logger.info(NOTE_IMPORT_LDIF_PHASE_TWO_REPORT, cursor.getMetricName(), progressPercent, progressRemaining, progressRate, 1, 1); lastValues.put(cursor, newValue); } lastRun = System.currentTimeMillis(); } @Override public synchronized void close() { scheduledTask = null; scheduler.shutdown(); } } /** Buffer used by {@link InMemorySortedChunk} to store and sort data. */ private interface Buffer extends Closeable { void writeInt(final int position, final int value); int readInt(final int position); long readCompactUnsignedLong(final int position); ByteString readByteString(int position, int length); int writeCompactUnsignedLong(final int position, long value); void writeByteSequence(int position, ByteSequence data); int length(); int compare(int offsetA, int lengthA, int offsetB, int lengthB); } /** * Pre-allocate and maintain a fixed number of re-usable {@code Buffer}s. This allow to keep controls of heap memory * consumption and prevents the significant object allocation cost occurring for huge objects. */ static final class BufferPool implements Closeable { private final BlockingQueue<Buffer> pool; private final int bufferSize; private static final Unsafe unsafe; private static final long BYTE_ARRAY_OFFSET; static { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); unsafe = (Unsafe)field.get(null); BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class); } catch (Exception e) { throw new RuntimeException(e); } } static boolean supportOffHeap() { return unsafe != null; } BufferPool(int nbBuffer, int bufferSize) { this.pool = new ArrayBlockingQueue<>(nbBuffer); this.bufferSize = bufferSize; for (int i = 0; i < nbBuffer; i++) { pool.offer(supportOffHeap() ? new OffHeapBuffer(bufferSize) : new HeapBuffer(bufferSize)); } } public int getBufferSize() { return bufferSize; } public Buffer get() { try { return pool.take(); } catch (InterruptedException e) { throw new StorageRuntimeException(e); } } private void release(Buffer buffer) { try { pool.put(buffer); } catch (InterruptedException e) { throw new StorageRuntimeException(e); } } public void setSize(int size) { while (pool.size() > size) { get(); } } @Override public void close() { Buffer buffer; while ((buffer = pool.poll()) != null) { closeSilently(buffer); } } /** Off-heap buffer using Unsafe memory access. */ private final class OffHeapBuffer implements Buffer { private final long address; private final int size; private int position; private final InputStream asInputStream = new InputStream() { @Override public int read() throws IOException { return unsafe.getByte(address + position++); } }; private final OutputStream asOutputStream = new OutputStream() { @Override public void write(int value) throws IOException { unsafe.putByte(address + position++, (byte) (value & 0xFF)); } }; private boolean closed; OffHeapBuffer(int size) { this.size = size; this.address = unsafe.allocateMemory(size); } @Override public void writeInt(final int position, final int value) { unsafe.putInt(address + position, value); } @Override public int readInt(final int position) { return unsafe.getInt(address + position); } @Override public int writeCompactUnsignedLong(final int position, long value) { try { this.position = position; return PackedLong.writeCompactUnsigned(asOutputStream, value); } catch (IOException e) { throw new StorageRuntimeException(e); } } @Override public long readCompactUnsignedLong(final int position) { this.position = position; try { return PackedLong.readCompactUnsignedLong(asInputStream); } catch (IOException e) { throw new IllegalStateException(e); } } @Override public void writeByteSequence(int position, ByteSequence data) { Reject.ifFalse(position + data.length() <= size); long offset = address + position; for(int i = 0 ; i < data.length() ; i++) { unsafe.putByte(offset++, data.byteAt(i)); } } @Override public int length() { return size; } @Override public ByteString readByteString(int position, int length) { Reject.ifFalse(position + length <= size); final byte[] data = new byte[length]; unsafe.copyMemory(null, address + position, data, BYTE_ARRAY_OFFSET, length); return ByteString.wrap(data); } @Override public int compare(int offsetA, int lengthA, int offsetB, int lengthB) { final int len = Math.min(lengthA, lengthB); for(int i = 0 ; i < len ; i++) { final int a = unsafe.getByte(address + offsetA + i) & 0xFF; final int b = unsafe.getByte(address + offsetB + i) & 0xFF; if ( a != b ) { return a - b; } } return lengthA - lengthB; } @Override public void close() throws IOException { if (!closed) { unsafe.freeMemory(address); } closed = true; } } /** Off-heap buffer using Unsafe memory access. */ private final class HeapBuffer implements Buffer { private final ByteBuffer buffer; private final OutputStream asOutputStream = new OutputStream() { @Override public void write(int b) throws IOException { buffer.put((byte) (b & 0xFF)); } }; private final InputStream asInputStream = new InputStream() { @Override public int read() throws IOException { return buffer.get() & 0xFF; } }; HeapBuffer(int size) { this.buffer = ByteBuffer.allocate(size); } @Override public void writeInt(final int position, final int value) { buffer.putInt(position, value); } @Override public int readInt(final int position) { return buffer.getInt(position); } @Override public int writeCompactUnsignedLong(final int position, long value) { buffer.position(position); try { return PackedLong.writeCompactUnsigned(asOutputStream, value); } catch (IOException e) { throw new StorageRuntimeException(e); } } @Override public long readCompactUnsignedLong(final int position) { buffer.position(position); try { return PackedLong.readCompactUnsignedLong(asInputStream); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public void writeByteSequence(int position, ByteSequence data) { buffer.position(position); data.copyTo(buffer); } @Override public int length() { return buffer.capacity(); } @Override public ByteString readByteString(int position, int length) { return ByteString.wrap(buffer.array(), buffer.arrayOffset() + position, length); } @Override public int compare(int offsetA, int lengthA, int offsetB, int lengthB) { return readByteString(offsetA, lengthA).compareTo(readByteString(offsetB, lengthB)); } @Override public void close() { // Nothing to do } } } /** Extends {@link SequentialCursor} by providing metric related to cursor's progress. */ interface MeteredCursor<K, V> extends SequentialCursor<K, V> { String getMetricName(); long getNbBytesRead(); long getNbBytesTotal(); } /** Add the cursor to the reporter and remove it once closed. */ private static <K, V> SequentialCursor<K, V> trackCursorProgress(final PhaseTwoProgressReporter reporter, final MeteredCursor<K, V> cursor) { reporter.addCursor(cursor); return new SequentialCursorDecorator<MeteredCursor<K, V>, K, V>(cursor) { @Override public void close() { reporter.removeCursor(cursor); cursor.close(); } }; } private static void throwIfUndefined(SequentialCursor<?, ?> cursor) { if (!cursor.isDefined()) { throw new NoSuchElementException(); } } /** * Get a new {@link Collector} which can be used to merge encoded values. The types of values to merged is deduced * from the {@link TreeName} */ private static Collector<?, ByteString> newCollector(final EntryContainer entryContainer, final TreeName treeName) { final DefaultIndex index = getIndex(entryContainer, treeName); if (index != null) { // key conflicts == merge EntryIDSets return new EntryIDSetsCollector(index); } else if (isID2ChildrenCount(treeName)) { // key conflicts == sum values return new AddLongCollector(entryContainer.getID2ChildrenCount()); } else if (isDN2ID(treeName) || isDN2URI(treeName) || isVLVIndex(entryContainer, treeName)) { // key conflicts == exception return UniqueValueCollector.getInstance(); } throw new IllegalArgumentException("Unknown tree: " + treeName); } private static boolean isDN2ID(TreeName treeName) { return SuffixContainer.DN2ID_INDEX_NAME.equals(treeName.getIndexId()); } private static boolean isDN2URI(TreeName treeName) { return SuffixContainer.DN2URI_INDEX_NAME.equals(treeName.getIndexId()); } private static boolean isID2Entry(TreeName treeName) { return SuffixContainer.ID2ENTRY_INDEX_NAME.equals(treeName.getIndexId()); } private static boolean isID2ChildrenCount(TreeName treeName) { return SuffixContainer.ID2CHILDREN_COUNT_NAME.equals(treeName.getIndexId()); } private static boolean isVLVIndex(EntryContainer entryContainer, TreeName treeName) { for (VLVIndex vlvIndex : entryContainer.getVLVIndexes()) { if (treeName.equals(vlvIndex.getName())) { return true; } } return false; } private static DefaultIndex getIndex(EntryContainer entryContainer, TreeName treeName) { for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes()) { for (MatchingRuleIndex index : attrIndex.getNameToIndexes().values()) { if (treeName.equals(index.getName())) { return index; } } } return null; } /** * A mutable reduction operation that accumulates input elements into a mutable result container, optionally * transforming the accumulated result into a final representation after all input elements have been processed. * Reduction operations can be performed either sequentially or in parallel. A Collector is specified by three * functions that work together to accumulate entries into a mutable result container, and optionally perform a final * transform on the result. They are: Creation of a new result container (get()), incorporating a new data element * into a result container (accept()), performing an optional final transform on the container (merge) * * @param <A> * Accumulator type * @param <R> * Result type * @see java.util.stream.Collector */ interface Collector<A, R> { /** * Creates and returns a new mutable result container. Equivalent to A java.util.function.Collector.supplier().get() */ A get(); /** * Accepts two partial results and merges them. The combiner function may fold state from one argument into the * other and return that, or may return a new result container. Equivalent to * java.util.function.Collector.accumulator().accept(A, R) */ A accept(A resultContainer, R value); /** * Perform the final transformation from the intermediate accumulation type A to the final result type R. Equivalent * to R java.util.function.Collector.finisher().apply(A) */ R merge(A resultContainer); } /** {@link Collector} that throws an exception if multiple values have to be merged. */ static final class UniqueValueCollector<V> implements Collector<V, V> { private static final Collector<Object, Object> INSTANCE = new UniqueValueCollector<>(); static <V> Collector<V, V> getInstance() { return (Collector<V, V>) INSTANCE; } @Override public V get() { return null; } @Override public V accept(V previousValue, V value) { if (previousValue != null) { throw new IllegalArgumentException("Cannot accept multiple values (current=" + previousValue + ", new=" + value + ")"); } return value; } @Override public V merge(V latestValue) { if (latestValue == null) { throw new IllegalArgumentException("No value to merge but expected one"); } return latestValue; } } /** * {@link Collector} that accepts encoded {@link EntryIDSet} objects and produces a {@link ByteString} representing * the merged {@link EntryIDSet}. */ static final class EntryIDSetsCollector implements Collector<Collection<ByteString>, ByteString> { private final DefaultIndex index; private final int indexLimit; EntryIDSetsCollector(DefaultIndex index) { this.index = index; this.indexLimit = index.getIndexEntryLimit(); } @Override public Collection<ByteString> get() { // LinkedList is used for it's O(1) add method (while ArrayList is O(n) when resize is required). return new LinkedList<>(); } @Override public Collection<ByteString> accept(Collection<ByteString> resultContainer, ByteString value) { if (resultContainer.size() < indexLimit) { resultContainer.add(value); } /* * else EntryIDSet is above index entry limits, discard additional values to avoid blowing up memory now, then * discard all entries in merge() */ return resultContainer; } @Override public ByteString merge(Collection<ByteString> resultContainer) { if (resultContainer.size() >= indexLimit) { return index.toValue(EntryIDSet.newUndefinedSet()); } else if (resultContainer.size() == 1) { // Avoids unnecessary decoding + encoding return resultContainer.iterator().next(); } return index.toValue(buildEntryIDSet(resultContainer)); } private EntryIDSet buildEntryIDSet(Collection<ByteString> encodedIDSets) { final long[] entryIDs = new long[indexLimit]; // accumulate in array int i = 0; for (ByteString encodedIDSet : encodedIDSets) { final EntryIDSet entryIDSet = index.decodeValue(ByteString.empty(), encodedIDSet); if (!entryIDSet.isDefined() || i + entryIDSet.size() >= indexLimit) { // above index entry limit return EntryIDSet.newUndefinedSet(); } for (EntryID entryID : entryIDSet) { entryIDs[i++] = entryID.longValue(); } } Arrays.sort(entryIDs, 0, i); return EntryIDSet.newDefinedSet(Arrays.copyOf(entryIDs, i)); } } /** * {@link Collector} that accepts {@code long} values encoded into {@link ByteString} objects and produces a * {@link ByteString} representing the sum of the supplied {@code long}s. */ static final class AddLongCollector implements Collector<Long, ByteString> { private final ID2Count id2count; AddLongCollector(ID2Count id2count) { this.id2count = id2count; } @Override public Long get() { return 0L; } @Override public Long accept(Long resultContainer, ByteString value) { return resultContainer + id2count.fromValue(value); } @Override public ByteString merge(Long resultContainer) { return id2count.toValue(resultContainer); } } private static MeteredCursor<ByteString, ByteString> asProgressCursor( SequentialCursor<ByteString, ByteString> delegate, String metricName, long totalSize) { return new MeteredSequentialCursorDecorator(delegate, metricName, totalSize); } /** Decorate {@link SequentialCursor} by providing progress information while cursoring. */ private static final class MeteredSequentialCursorDecorator extends SequentialCursorDecorator<SequentialCursor<ByteString, ByteString>, ByteString, ByteString>implements MeteredCursor<ByteString, ByteString> { private final String metricName; private final long totalSize; private volatile long bytesRead; private MeteredSequentialCursorDecorator(SequentialCursor<ByteString, ByteString> delegate, String metricName, long totalSize) { super(delegate); this.metricName = metricName; this.totalSize = totalSize; } @Override public boolean next() { if (delegate.next()) { bytesRead += delegate.getKey().length() + delegate.getValue().length(); return true; } return false; } @Override public long getNbBytesRead() { return bytesRead; } @Override public String getMetricName() { return metricName; } @Override public long getNbBytesTotal() { return totalSize; } } /** Helper allowing to create {@link SequentialCursor} decorator without having to re-implement all methods. */ static abstract class SequentialCursorDecorator<D extends SequentialCursor<K, V>, K, V> implements SequentialCursor<K, V> { protected final D delegate; SequentialCursorDecorator(D delegate) { this.delegate = delegate; } @Override public boolean next() { return delegate.next(); } @Override public boolean isDefined() { return delegate.isDefined(); } @Override public K getKey() throws NoSuchElementException { return delegate.getKey(); } @Override public V getValue() throws NoSuchElementException { return delegate.getValue(); } @Override public void close() { delegate.close(); } } private static int visitIndexes(final EntryContainer entryContainer, IndexVisitor visitor) { int nbVisited = 0; for (AttributeIndex attribute : entryContainer.getAttributeIndexes()) { for (MatchingRuleIndex index : attribute.getNameToIndexes().values()) { visitor.visitIndex(index); visitor.visitIndexTree(index); nbVisited++; } } for (VLVIndex index : entryContainer.getVLVIndexes()) { visitor.visitIndex(index); visitor.visitIndexTree(index); nbVisited++; } visitor.visitIndexTree(entryContainer.getDN2ID()); visitor.visitIndexTree(entryContainer.getDN2URI()); nbVisited += 2; return nbVisited; } /** Visitor pattern allowing to process all type of indexes. */ private static abstract class IndexVisitor { void visitIndex(DefaultIndex index) { } void visitIndex(VLVIndex index) { } void visitSystemIndex(Tree index) { } void visitIndexTree(Tree tree) { } } /** Update the trust state of the visited indexes. */ private static final class TrustModifier extends IndexVisitor { private final WriteableTransaction txn; private final boolean trustValue; TrustModifier(Importer importer, boolean trustValue) { this.txn = new ImporterToWriteableTransactionAdapter(importer); this.trustValue = trustValue; } @Override public void visitIndex(DefaultIndex index) { index.setTrusted(txn, trustValue); } @Override public void visitIndex(VLVIndex index) { index.setTrusted(txn, trustValue); } } /** Delete & recreate the database of the visited indexes. */ private static final class ClearDatabase extends IndexVisitor { private final Importer importer; ClearDatabase(Importer importer) { this.importer = importer; } @Override public void visitIndexTree(Tree index) { importer.clearTree(index.getName()); } } /** Visit indexes which are in a degraded state. */ private static final class DegradedIndexFilter extends IndexVisitor { private final IndexVisitor delegate; DegradedIndexFilter(IndexVisitor delegate) { this.delegate = delegate; } @Override public void visitIndex(DefaultIndex index) { if (!index.isTrusted()) { delegate.visitIndexTree(index); delegate.visitIndex(index); } } @Override public void visitIndex(VLVIndex index) { if (!index.isTrusted()) { delegate.visitIndexTree(index); delegate.visitIndex(index); } } } /** Maintain a list containing the names of the visited indexes. */ private static final class SelectIndexName extends IndexVisitor { private final Set<String> indexNames; SelectIndexName() { this.indexNames = new HashSet<>(); } public Set<String> getSelectedIndexNames() { return indexNames; } @Override public void visitIndexTree(Tree index) { indexNames.add(index.getName().getIndexId()); } } /** Visit indexes only if their name match one contained in a list. */ private static final class SpecificIndexFilter extends IndexVisitor { private final IndexVisitor delegate; private final Collection<String> indexNames; SpecificIndexFilter(IndexVisitor delegate, Collection<String> indexNames) { this.delegate = delegate; this.indexNames = new HashSet<>(indexNames.size()); for(String indexName : indexNames) { this.indexNames.add(indexName.toLowerCase()); } } @Override public void visitIndex(DefaultIndex index) { if (indexNames.contains(index.getName().getIndexId())) { delegate.visitIndex(index); } } @Override public void visitIndex(VLVIndex index) { if (indexNames.contains(index.getName().getIndexId())) { delegate.visitIndex(index); } } @Override public void visitSystemIndex(Tree index) { if (indexNames.contains(index.getName().getIndexId())) { delegate.visitSystemIndex(index); } } @Override public void visitIndexTree(Tree index) { if (indexNames.contains(index.getName().getIndexId())) { delegate.visitIndexTree(index); } } } /** * Thread-safe fixed-size cache which, once full, remove the least recently accessed entry. Composition is used here * to ensure that only methods generating entry-access in the LinkedHashMap are actually used. Otherwise, the least * recently used property of the cache would not be respected. */ private static final class LRUPresenceCache<T> { private final Map<T, Object> cache; LRUPresenceCache(final int maxEntries) { // +1 because newly added entry is added before the least recently one is removed. this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true) { @Override protected boolean removeEldestEntry(Map.Entry<T, Object> eldest) { return size() >= maxEntries; } }); } public boolean contains(T object) { return cache.get(object) != null; } public void add(T object) { cache.put(object, null); } } /** Adapter allowing to use an {@link Importer} as a {@link WriteableTransaction} */ private final static class ImporterToWriteableTransactionAdapter implements WriteableTransaction { private final Importer importer; ImporterToWriteableTransactionAdapter(Importer importer) { this.importer = importer; } @Override public ByteString read(TreeName treeName, ByteSequence key) { return importer.read(treeName, key); } @Override public void put(TreeName treeName, ByteSequence key, ByteSequence value) { importer.put(treeName, key, value); } @Override public boolean update(TreeName treeName, ByteSequence key, UpdateFunction f) { final ByteString value = importer.read(treeName, key); final ByteSequence newValue = f.computeNewValue(value); Reject.ifNull(newValue, "Importer cannot delete records."); if (!Objects.equals(value, newValue)) { importer.put(treeName, key, newValue); return true; } return false; } @Override public Cursor<ByteString, ByteString> openCursor(TreeName treeName) { return new SequentialCursorAdapter<>(importer.openCursor(treeName)); } @Override public long getRecordCount(TreeName treeName) { long counter = 0; try (final SequentialCursor<ByteString, ByteString> cursor = importer.openCursor(treeName)) { while (cursor.next()) { counter++; } } return counter; } @Override public void openTree(TreeName name, boolean createOnDemand) { throw new UnsupportedOperationException(); } @Override public void deleteTree(TreeName name) { throw new UnsupportedOperationException(); } @Override public boolean delete(TreeName treeName, ByteSequence key) { throw new UnsupportedOperationException(); } } } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
File was deleted opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/SuccessiveAddsImportStrategy.java
@@ -26,22 +26,25 @@ import static org.opends.messages.BackendMessages.*; import static org.opends.messages.UtilityMessages.*; import static org.opends.server.core.DirectoryServer.*; import static org.opends.server.util.StaticUtils.*; import java.io.IOException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.opends.server.admin.std.server.PluggableBackendCfg; import org.opends.server.backends.RebuildConfig; import org.opends.server.backends.pluggable.spi.StorageRuntimeException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ServerContext; import org.opends.server.types.CanceledOperationException; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.LDIFImportResult; import org.opends.server.types.OpenDsException; import org.opends.server.util.LDIFException; import org.opends.server.util.LDIFReader; @@ -89,129 +92,130 @@ private static final int IMPORT_PROGRESS_INTERVAL = 10000; private final ServerContext serverContext; private final RootContainer rootContainer; private final PluggableBackendCfg backendCfg; SuccessiveAddsImportStrategy(ServerContext serverContext, RootContainer rootContainer, PluggableBackendCfg backendCfg) { this.serverContext = serverContext; this.rootContainer = rootContainer; this.backendCfg = backendCfg; } /** {@inheritDoc} */ @Override public LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer, ServerContext serverContext) throws DirectoryException public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws DirectoryException, IOException, CanceledOperationException, StorageRuntimeException, InterruptedException { ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); try { ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); final LDIFReader reader; try { final LDIFReader reader; reader = new LDIFReader(importConfig); } catch (Exception e) { LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e)); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e); } long importCount = 0; final long startTime = System.currentTimeMillis(); timerService.scheduleAtFixedRate(new ImportProgress(reader), IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS); while (true) { final Entry entry; try { reader = new LDIFReader(importConfig); entry = reader.readEntry(); if (entry == null) { break; } } catch (Exception e) catch (LDIFException le) { LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e)); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e); if (!le.canContinueReading()) { LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le)); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le); } continue; } long importCount = 0; final long startTime = System.currentTimeMillis(); timerService.scheduleAtFixedRate(new ImportProgress(reader), IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS); while (true) final DN dn = entry.getName(); final EntryContainer ec = rootContainer.getEntryContainer(dn); if (ec == null) { final Entry entry; try { entry = reader.readEntry(); if (entry == null) { break; } } catch (LDIFException le) { if (!le.canContinueReading()) { LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le)); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le); } continue; } final DN dn = entry.getName(); final EntryContainer ec = rootContainer.getEntryContainer(dn); if (ec == null) { final LocalizableMessage m = ERR_LDIF_SKIP.get(dn); logger.error(m); reader.rejectLastEntry(m); continue; } try { ec.addEntry(entry, null); importCount++; } catch (DirectoryException e) { switch (e.getResultCode().asEnum()) { case ENTRY_ALREADY_EXISTS: if (importConfig.replaceExistingEntries()) { final Entry oldEntry = ec.getEntry(entry.getName()); ec.replaceEntry(oldEntry, entry, null); } else { reader.rejectLastEntry(WARN_IMPORT_ENTRY_EXISTS.get()); } break; case NO_SUCH_OBJECT: reader.rejectLastEntry(ERR_IMPORT_PARENT_NOT_FOUND.get(dn.parent())); break; default: // Not sure why it failed. reader.rejectLastEntry(e.getMessageObject()); break; } } final LocalizableMessage m = ERR_LDIF_SKIP.get(dn); logger.error(m); reader.rejectLastEntry(m); continue; } final long finishTime = System.currentTimeMillis(); waitForShutdown(timerService); final long importTime = finishTime - startTime; float rate = 0; if (importTime > 0) try { rate = 1000f * reader.getEntriesRead() / importTime; ec.addEntry(entry, null); importCount++; } logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount, reader.getEntriesIgnored(), reader.getEntriesRejected(), 0, importTime / 1000, rate); return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored()); catch (DirectoryException e) { switch (e.getResultCode().asEnum()) { case ENTRY_ALREADY_EXISTS: if (importConfig.replaceExistingEntries()) { final Entry oldEntry = ec.getEntry(entry.getName()); ec.replaceEntry(oldEntry, entry, null); } else { reader.rejectLastEntry(WARN_IMPORT_ENTRY_EXISTS.get()); } break; case NO_SUCH_OBJECT: reader.rejectLastEntry(ERR_IMPORT_PARENT_NOT_FOUND.get(dn.parent())); break; default: // Not sure why it failed. reader.rejectLastEntry(e.getMessageObject()); break; } } } finally final long finishTime = System.currentTimeMillis(); waitForShutdown(timerService); final long importTime = finishTime - startTime; float rate = 0; if (importTime > 0) { rootContainer.close(); // if not already stopped, then stop it waitForShutdown(timerService); rate = 1000f * reader.getEntriesRead() / importTime; } logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount, reader.getEntriesIgnored(), reader.getEntriesRejected(), 0, importTime / 1000, rate); return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored()); } catch (DirectoryException e) finally { logger.traceException(e); throw e; rootContainer.close(); // if not already stopped, then stop it waitForShutdown(timerService); } catch (OpenDsException e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject()); } catch (Exception e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage())); } } @Override public void rebuildIndex(RebuildConfig rebuildConfig) throws Exception { new OnDiskMergeImporter.StrategyImpl(serverContext, rootContainer, backendCfg).rebuildIndex(rebuildConfig); } private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Suffix.java
File was deleted opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
@@ -36,6 +36,7 @@ import org.opends.server.backends.pluggable.spi.Importer; import org.opends.server.backends.pluggable.spi.ReadOperation; import org.opends.server.backends.pluggable.spi.ReadableTransaction; import org.opends.server.backends.pluggable.spi.SequentialCursor; import org.opends.server.backends.pluggable.spi.Storage; import org.opends.server.backends.pluggable.spi.StorageRuntimeException; import org.opends.server.backends.pluggable.spi.StorageStatus; @@ -63,11 +64,10 @@ } @Override public void createTree(final TreeName name) public void clearTree(final TreeName name) { importer.createTree(name); logger.trace("Storage@%s.Importer@%s.createTree(%s, %s)", storageId(), id(), backendId, name); importer.clearTree(name); logger.trace("Storage@%s.Importer@%s.clearTree(%s, %s)", storageId(), id(), backendId, name); } @Override @@ -88,15 +88,6 @@ } @Override public boolean delete(TreeName name, ByteSequence key) { final boolean delete = importer.delete(name, key); logger.trace("Storage@%s.Importer@%s.delete(%s, %s, %s) = %b", storageId(), id(), backendId, name, hex(key), delete); return delete; } @Override public void close() { importer.close(); @@ -108,6 +99,13 @@ { return System.identityHashCode(this); } @Override public SequentialCursor<ByteString, ByteString> openCursor(TreeName name) { logger.trace("Storage@%s.Importer@%s.openCursor(%s,%s)", storageId(), id(), backendId, name); return importer.openCursor(name); } } /** Decorates an {@link ReadableTransaction} with additional trace logging. */ @@ -224,14 +222,6 @@ } @Override public void renameTree(final TreeName oldName, final TreeName newName) { txn.renameTree(oldName, newName); logger.trace("Storage@%s.WriteableTransaction@%s.renameTree(%s, %s, %s)", storageId(), id(), backendId, oldName, newName); } @Override public boolean update(final TreeName name, final ByteSequence key, final UpdateFunction f) { final boolean isUpdated = txn.update(name, key, f); @@ -389,7 +379,7 @@ return results; } private String hex(final ByteSequence bytes) private static String hex(final ByteSequence bytes) { return bytes != null ? bytes.toByteString().toHexString() : null; } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java
@@ -36,15 +36,17 @@ public interface Importer extends Closeable { /** * Creates a new tree identified by the provided name. * Clear the tree whose name is provided. Ensure that an empty tree with the given name exists. If the tree already * exists, all the data it contains will be deleted. If not, an empty tree will be created. * * @param name * the tree name * @param treeName name of the tree to clear */ void createTree(TreeName name); void clearTree(TreeName treeName); /** * Creates a record with the provided key and value in the tree identified by the provided name. * Creates a record with the provided key and value in the tree identified by the provided name. At the end of this * method, the record is visible by {@link read(TreeName, ByteSequence)} and {@link openCursor(TreeName)} methods of * this instance. The record is guaranteed to be persisted only after {@link #close()}. * * @param treeName * the tree name @@ -56,17 +58,6 @@ void put(TreeName treeName, ByteSequence key, ByteSequence value); /** * Deletes the record with the provided key, in the tree whose name is provided. * * @param treeName * the tree name * @param key * the key of the record to delete * @return {@code true} if the record could be deleted, {@code false} otherwise */ boolean delete(TreeName treeName, ByteSequence key); /** * Reads the record's value associated to the provided key, in the tree whose name is provided. * * @param treeName @@ -77,7 +68,17 @@ */ ByteString read(TreeName treeName, ByteSequence key); /** {@inheritDoc} */ /** * Opens a cursor on the tree whose name is provided. Cursors are predictable only if there is no pending * {@link put(TreeName, ByteSequence, ByteSequence)} operations. Indeed, once opened, cursors might not reflect * changes. * * @param treeName * the tree name * @return a new cursor */ SequentialCursor<ByteString, ByteString> openCursor(TreeName treeName); @Override void close(); } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
@@ -43,7 +43,6 @@ /** * Starts the import operation. * * @return a new Importer object which must be closed to release all resources * @throws ConfigException * if there is a problem with the configuration opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/WriteableTransaction.java
@@ -42,16 +42,6 @@ void openTree(TreeName name, boolean createOnDemand); /** * Renames the tree from the old to the new name. * * @param oldName * the old tree name * @param newName * the new tree name */ void renameTree(TreeName oldName, TreeName newName); /** * Deletes the tree identified by the provided name. * * @param name opendj-server-legacy/src/main/java/org/opends/server/loggers/LoggingCategoryNames.java
@@ -86,6 +86,12 @@ // The category used for messages associated with the JE backend. NAMES.put("org.opends.server.backends.jeb", "JEB"); // The category used for messages associated with the pluggable backend. NAMES.put("org.opends.server.backends.pluggable", "PLUGGABLE"); // The category used for messages associated with the PDB backend. NAMES.put("org.opends.server.backends.pdb", "PDB"); // The category used for messages associated with generic backends. NAMES.put("org.opends.server.backends", "BACKEND"); opendj-server-legacy/src/main/java/org/opends/server/types/RDN.java
@@ -1176,6 +1176,7 @@ .onUnmappableCharacter(CodingErrorAction.REPORT); if (value.copyTo(buffer, decoder)) { buffer.flip(); try { // URL encoding encodes space char as '+' instead of using hex code opendj-server-legacy/src/main/resources/org/forgerock/checkstyle/opends-checkstyle.xml
@@ -1,18 +1,28 @@ <?xml version="1.0"?> <!-- The contents of this file are subject to the terms of the Common Development and Distribution License (the License). You may not use this file except in compliance with the License. You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the specific language governing permission and limitations under the License. When distributing Covered Software, include this CDDL Header Notice in each file and include the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL Header, with the fields enclosed by brackets [] replaced by your own identifying information: "Portions Copyrighted [year] [name of copyright owner]". Copyright 2012-2015 ForgeRock AS. All rights reserved. ! CDDL HEADER START ! ! The contents of this file are subject to the terms of the ! Common Development and Distribution License, Version 1.0 only ! (the "License"). You may not use this file except in compliance ! with the License. ! ! You can obtain a copy of the license at legal-notices/CDDLv1_0.txt ! or http://forgerock.org/license/CDDLv1.0.html. ! See the License for the specific language governing permissions ! and limitations under the License. ! ! When distributing Covered Code, include this CDDL HEADER in each ! file and include the License file at ! legal-notices/CDDLv1_0.txt. If applicable, ! add the following below this CDDL HEADER, with the fields enclosed ! by brackets "[]" replaced with your own identifying information: ! Portions Copyright [yyyy] [name of copyright owner] ! ! CDDL HEADER END ! ! Copyright 2012-2015 ForgeRock AS. ! --> <!DOCTYPE module PUBLIC "-//Puppy Crawl//DTD Check Configuration 1.2//EN" @@ -20,6 +30,31 @@ <module name="Checker"> <!-- Toggle Checkstyle on/off --> <module name="SuppressionCommentFilter"> <property name="offCommentFormat" value="@Checkstyle:off" /> <property name="onCommentFormat" value="@Checkstyle:on" /> </module> <!-- Instruct Checkstyle to ignore a specific check for a whole file --> <module name="SuppressWithNearbyCommentFilter"> <property name="commentFormat" value="@Checkstyle:ignore ([\w\|]+)"/> <property name="checkFormat" value="$1"/> <property name="influenceFormat" value="1000000" /> </module> <!-- Instruct Checkstyle to ignore next line --> <module name="SuppressWithNearbyCommentFilter"> <property name="commentFormat" value="@Checkstyle:ignore" /> <property name="influenceFormat" value="1" /> </module> <!-- Instruct Checkstyle to ignore next N lines (-ve means previous lines) --> <module name="SuppressWithNearbyCommentFilter"> <property name="commentFormat" value="@Checkstyle:ignoreFor (\d+)" /> <property name="influenceFormat" value="$1" /> </module> <!-- Ensure that each source file starts with the appropriate header --> <module name="RegexpHeader"> <property name="headerFile" value="${basedir}/src/main/resources/org/forgerock/checkstyle/opendj.sourceheader" /> @@ -39,7 +74,9 @@ <module name="TreeWalker"> <property name="cacheFile" value="${basedir}/.checkstyle-cache/opends-checkstyle.cache" /> <module name="FileContentsHolder"/> <!-- Ensure that all classes and interfaces are documented --> <module name="JavadocType"> <property name="scope" value="private" /> opendj-server-legacy/src/test/java/org/opends/server/TestListener.java
@@ -325,7 +325,11 @@ @Override public void onTestFailure(ITestResult tr) { super.onTestFailure(tr); reportTestFailed(tr); } private void reportTestFailed(ITestResult tr) { IClass cls = tr.getTestClass(); ITestNGMethod method = tr.getMethod(); @@ -350,7 +354,7 @@ appendFailureInfo(failureInfo); failureInfo.append(EOL).append(EOL); originalSystemErr.print(EOL + EOL + EOL + " T E S T F A I L U R E ! ! !" + EOL + EOL); originalSystemErr.print(EOL + EOL + EOL + " T E S T S K I P P E D ! ! !" + EOL + EOL); originalSystemErr.print(failureInfo); originalSystemErr.print(DIVIDER_LINE + EOL + EOL); @@ -365,8 +369,6 @@ onTestFinished(tr); } public static void pauseOnFailure() { File tempFile = null; try @@ -408,9 +410,13 @@ } @Override public void onConfigurationFailure(ITestResult tr) { public void onConfigurationSkip(ITestResult tr) { super.onConfigurationFailure(tr); reportConfigurationFailure(tr); } private void reportConfigurationFailure(ITestResult tr) { IClass cls = tr.getTestClass(); ITestNGMethod method = tr.getMethod(); @@ -427,13 +433,18 @@ appendFailureInfo(failureInfo); failureInfo.append(EOL).append(EOL); originalSystemErr.print(EOL + EOL + EOL + " C O N F I G U R A T I O N F A I L U R E ! ! !" + EOL + EOL); originalSystemErr.print(EOL + EOL + EOL + " C O N F I G U R A T I O N S K I P ! ! !" + EOL + EOL); originalSystemErr.print(failureInfo); originalSystemErr.print(DIVIDER_LINE + EOL + EOL); _bufferedTestFailures.append(failureInfo); } @Override public void onConfigurationFailure(ITestResult tr) { super.onConfigurationFailure(tr); reportConfigurationFailure(tr); } private String getTestngLessStack(Throwable t) { StackTraceElement[] elements = t.getStackTrace(); @@ -460,6 +471,8 @@ getTestngLessStack(invocation)); } t.printStackTrace(); return buffer.toString(); } @@ -475,11 +488,11 @@ return buffer.toString(); } @Override public void onTestSkipped(ITestResult tr) { super.onTestSkipped(tr); onTestFinished(tr); // Make sure skipped test appear as failure super.onTestFailure(tr); reportTestFailed(tr); } @Override opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/DN2IDTest.java
@@ -46,6 +46,7 @@ import org.opends.server.backends.pdb.PDBStorage; import org.opends.server.backends.pluggable.spi.AccessMode; import org.opends.server.backends.pluggable.spi.Cursor; import org.opends.server.backends.pluggable.spi.Importer; import org.opends.server.backends.pluggable.spi.ReadOperation; import org.opends.server.backends.pluggable.spi.ReadableTransaction; import org.opends.server.backends.pluggable.spi.SequentialCursor; @@ -87,11 +88,15 @@ when(serverContext.getDiskSpaceMonitor()).thenReturn(mock(DiskSpaceMonitor.class)); storage = new PDBStorage(createBackendCfg(), serverContext); try(final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport()) { importer.createTree(dn2IDTreeName); } storage.open(AccessMode.READ_WRITE); storage.write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { txn.openTree(dn2IDTreeName, true); } }); baseDN = dn("dc=example, dc=com"); dn2ID = new DN2ID(dn2IDTreeName, baseDN); opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/DefaultIndexTest.java
@@ -255,12 +255,6 @@ } @Override public void renameTree(TreeName oldName, TreeName newName) { storage.put(newName, storage.remove(oldName)); } @Override public void deleteTree(TreeName name) { storage.remove(name); opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/ID2CountTest.java
@@ -27,7 +27,7 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; import static org.opends.server.ConfigurationMock.legacyMockCfg; import static org.opends.server.ConfigurationMock.*; import static org.opends.server.util.CollectionUtils.*; import java.util.Random; @@ -89,11 +89,15 @@ when(serverContext.getDiskSpaceMonitor()).thenReturn(mock(DiskSpaceMonitor.class)); storage = new PDBStorage(createBackendCfg(), serverContext); try(final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport()) { importer.createTree(id2CountTreeName); } storage.open(AccessMode.READ_WRITE); storage.write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { txn.openTree(id2CountTreeName, true); } }); id2Count = new ID2Count(id2CountTreeName); opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
New file @@ -0,0 +1,501 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2015 ForgeRock AS. */ package org.opends.server.backends.pluggable; import static org.assertj.core.api.Assertions.*; import static org.opends.server.backends.pluggable.EntryIDSet.*; import java.io.File; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.ForkJoinPool; import org.forgerock.opendj.ldap.ByteSequence; import org.forgerock.opendj.ldap.ByteString; import org.forgerock.opendj.ldap.ByteStringBuilder; import org.forgerock.util.Pair; import org.mockito.Mockito; import org.opends.server.DirectoryServerTestCase; import org.opends.server.TestCaseUtils; import org.opends.server.backends.pluggable.OnDiskMergeImporter.AddLongCollector; import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool; import org.opends.server.backends.pluggable.OnDiskMergeImporter.Chunk; import org.opends.server.backends.pluggable.OnDiskMergeImporter.Collector; import org.opends.server.backends.pluggable.OnDiskMergeImporter.EntryIDSetsCollector; import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk; import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CollectorCursor; import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.CompositeCursor; import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.FileRegionChunk; import org.opends.server.backends.pluggable.OnDiskMergeImporter.ExternalSortChunk.InMemorySortedChunk; import org.opends.server.backends.pluggable.OnDiskMergeImporter.MeteredCursor; import org.opends.server.backends.pluggable.OnDiskMergeImporter.UniqueValueCollector; import org.opends.server.backends.pluggable.spi.ReadableTransaction; import org.opends.server.backends.pluggable.spi.SequentialCursor; import org.opends.server.backends.pluggable.spi.StorageRuntimeException; import org.opends.server.backends.pluggable.spi.TreeName; import org.opends.server.backends.pluggable.spi.WriteableTransaction; import org.testng.annotations.Test; import com.forgerock.opendj.util.PackedLong; public class OnDiskMergeImporterTest extends DirectoryServerTestCase { @Test @SuppressWarnings(value = "resource") public void testCollectCursor() { final MeteredCursor<ByteString, ByteString> source = cursorOf(content(new String[][] { { "key1", "value1key1" }, { "key1", "value2key1" }, { "key2", "value1key2" }, { "key3", "value1key3" }, { "key3", "value2key3" } })); final MeteredCursor<ByteString, ByteString> result = new CollectorCursor<>(source, StringConcatCollector.INSTANCE); assertThat(toPairs(result)).containsExactlyElementsOf(content(new String[][] { { "key1", "value1key1-value2key1" }, { "key2", "value1key2" }, { "key3", "value1key3-value2key3" } })); } @Test @SuppressWarnings(value = "resource") public void testCompositeCursor() { final Collection<MeteredCursor<ByteString, ByteString>> sources = new ArrayList<>(); sources.add(cursorOf(content(new String[][] { { "A", "value1" }, { "C", "value3" }, { "D", "value4" }, { "F", "value6" }, { "I", "value9" } }))); sources.add(cursorOf(content(new String[][] { { "B", "value2" } }))); sources.add(cursorOf(Collections.<Pair<ByteString, ByteString>> emptyList())); sources.add(cursorOf(content(new String[][] { { "A", "value1" }, { "E", "value5" }, { "G", "value7" }, { "H", "value8" } }))); final SequentialCursor<ByteString, ByteString> result = new CompositeCursor<>("name", sources); assertThat(toPairs(result)).containsExactlyElementsOf(content(new String[][] { { "A", "value1" }, { "A", "value1" }, { "B", "value2" }, { "C", "value3" }, { "D", "value4" }, { "E", "value5" }, { "F", "value6" }, { "G", "value7" }, { "H", "value8" }, { "I", "value9" } })); } @Test @SuppressWarnings(value = { "unchecked", "resource" }) public void testAddLongCollector() { final ID2Count id2count = new ID2Count(TreeName.valueOf("dummy/dummy")); final MeteredCursor<String, ByteString> source = cursorOf( Pair.of("key1", id2count.toValue(10)), Pair.of("key1", id2count.toValue(20)), Pair.of("key2", id2count.toValue(5)), Pair.of("key3", id2count.toValue(6)), Pair.of("key3", id2count.toValue(4))); final SequentialCursor<String, ByteString> expected = cursorOf( Pair.of("key1", id2count.toValue(30)), Pair.of("key2", id2count.toValue(5)), Pair.of("key3", id2count.toValue(10))); final SequentialCursor<String, ByteString> result = new CollectorCursor<>(source, new AddLongCollector(id2count)); assertThat(toPairs(result)).containsExactlyElementsOf(toPairs(expected)); } @Test @SuppressWarnings(value = { "unchecked", "resource" }) public void testEntryIDSetCollector() { final MeteredCursor<String, ByteString> source = cursorOf( Pair.of("key1", EntryIDSet.CODEC_V2.encode(newDefinedSet(2))), Pair.of("key1", EntryIDSet.CODEC_V2.encode(newDefinedSet(1))), Pair.of("key2", EntryIDSet.CODEC_V2.encode(newDefinedSet(1))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newDefinedSet(1))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newDefinedSet(2))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newDefinedSet(3))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newDefinedSet(4))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newDefinedSet(5))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newDefinedSet(6))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newDefinedSet(7))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newDefinedSet(8))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newDefinedSet(9))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newDefinedSet(10))), Pair.of("key4", EntryIDSet.CODEC_V2.encode(newDefinedSet(10))), Pair.of("key4", EntryIDSet.CODEC_V2.encode(newUndefinedSet()))); final SequentialCursor<String, ByteString> expected = cursorOf( Pair.of("key1", EntryIDSet.CODEC_V2.encode(newDefinedSet(1, 2))), Pair.of("key2", EntryIDSet.CODEC_V2.encode(newDefinedSet(1))), Pair.of("key3", EntryIDSet.CODEC_V2.encode(newUndefinedSet())), Pair.of("key4", EntryIDSet.CODEC_V2.encode(newUndefinedSet()))); final SequentialCursor<String, ByteString> result = new CollectorCursor<>(source, new EntryIDSetsCollector(new DummyIndex(10))); assertThat(toPairs(result)).containsExactlyElementsOf(toPairs(expected)); } @Test @SuppressWarnings(value = "resource") public void testUniqueValueCollectorAcceptUniqueValues() { final MeteredCursor<ByteString, ByteString> source = cursorOf(content(new String[][] { { "key1", "value1" }, { "key2", "value2" }, { "key3", "value3" }, })); final SequentialCursor<ByteString, ByteString> result = new CollectorCursor<>(source, UniqueValueCollector.<ByteString> getInstance()); assertThat(toPairs(result)).containsExactlyElementsOf(content(new String[][] { { "key1", "value1" }, { "key2", "value2" }, { "key3", "value3" }, })); } @Test(expectedExceptions = IllegalArgumentException.class) @SuppressWarnings(value = "resource") public void testUniqueValueCollectorDoesNotAcceptMultipleValues() { final MeteredCursor<ByteString, ByteString> source = cursorOf(content(new String[][] { { "key1", "value1" }, { "key2", "value2" }, { "key2", "value22" }, { "key3", "value3" } })); toPairs(new CollectorCursor<>(source, new UniqueValueCollector<ByteString>())); } @Test public void testInMemorySortedChunkSortUnsignedOnFlip() throws Exception { try(final BufferPool bufferPool = new BufferPool(1, 1024)) { final Chunk chunk = new InMemorySortedChunk("test", bufferPool); populate(chunk, content(new String[][] { { new String(new byte[] { (byte) 0xFF }), "value0xFF" }, { "key1", "value1" }, { "key2", "value2" }, { "key3", "value3" }, { new String(new byte[] { (byte) 0x00 }), "value0x00" } })); assertThat(toPairs(chunk.flip())).containsExactlyElementsOf(content(new String[][] { { new String(new byte[] { (byte) 0x00 }), "value0x00" }, { "key1", "value1" }, { "key2", "value2" }, { "key3", "value3" }, { new String( new byte[] { (byte) 0xFF }), "value0xFF" } })); } } @Test @SuppressWarnings("resource") public void testFileRegionChunk() throws Exception { final int NB_REGION = 10; final int NB_RECORDS = 15; final File tempDir = TestCaseUtils.createTemporaryDirectory("testFileRegionChunk"); final FileChannel channel = FileChannel.open(tempDir.toPath().resolve("region-chunk"), StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE, StandardOpenOption.READ, StandardOpenOption.WRITE); // Generate content final List<Chunk> memoryChunks = new ArrayList<>(NB_REGION); final String[][][] contents = new String[NB_REGION][NB_RECORDS][]; for (int region = 0; region < NB_REGION; region++) { for (int record = 0; record < NB_RECORDS; record++) { contents[region][record] = new String[] { String.format("key-%d-%d", region, record), String.format("value-%d", record) }; } final Chunk memoryChunk = new ArrayListChunk(); populate(memoryChunk, content(contents[region])); memoryChunks.add(memoryChunk); } // Copy content into file regions final List<Chunk> regionChunks = new ArrayList<>(memoryChunks.size()); long offset = 0; for (Chunk source : memoryChunks) { final Chunk region = new FileRegionChunk("test", channel, offset, source.size()); offset += source.size(); populate(region, toPairs(source.flip())); regionChunks.add(region); } // Verify file regions contents int regionNumber = 0; for (Chunk region : regionChunks) { assertThat(toPairs(region.flip())).containsExactlyElementsOf(content(contents[regionNumber])); regionNumber++; } } @Test public void testExternalSortChunk() throws Exception { final int NB_REGION = 10; final ByteString KEY = ByteString.valueOf("key"); final File tempDir = TestCaseUtils.createTemporaryDirectory("testExternalSortChunk"); try(final BufferPool bufferPool = new BufferPool(2, 4 + 1 + KEY.length() + 1 + 4)) { // 4: record offset, 1: key length, 1: value length, 4: value final ExternalSortChunk chunk = new ExternalSortChunk(tempDir, "test", bufferPool, StringConcatCollector.INSTANCE, new ForkJoinPool()); List<ByteString> expected = new ArrayList<>(NB_REGION); for (int i = 0; i < NB_REGION; i++) { final ByteString value = ByteString.valueOf(String.format("%02d", i)); chunk.put(KEY, value); expected.add(value); } assertThat(chunk.getNbSortedChunks()).isEqualTo(NB_REGION); try (final SequentialCursor<ByteString, ByteString> cursor = chunk.flip()) { assertThat(toPairs(cursor)).containsExactly(Pair.of(KEY, StringConcatCollector.INSTANCE.merge(expected))); } } } private final static List<Pair<ByteString, ByteString>> content(String[]... data) { final List<Pair<ByteString, ByteString>> content = new ArrayList<>(data.length); for (String[] keyValue : data) { content.add(Pair.of(ByteString.valueOf(keyValue[0]), ByteString.valueOf(keyValue[1]))); } return content; } private static void populate(Chunk chunk, Collection<Pair<ByteString, ByteString>> content) { for (Pair<ByteString, ByteString> keyValue : content) { chunk.put(keyValue.getFirst(), keyValue.getSecond()); } } private static <K, V> Collection<Pair<K, V>> toPairs(SequentialCursor<K, V> source) { final Collection<Pair<K, V>> collection = new LinkedList<>(); while (source.next()) { collection.add(Pair.of(source.getKey(), source.getValue())); } return collection; } private final static <K, V> MeteredCursor<K, V> cursorOf(@SuppressWarnings("unchecked") Pair<K, V>... pairs) { return cursorOf(Arrays.asList(pairs)); } private static final <K, V> MeteredCursor<K, V> cursorOf(Iterable<Pair<K, V>> pairs) { return new IteratorCursorAdapter<>(pairs.iterator()); } private static final class StringConcatCollector implements Collector<List<ByteString>, ByteString> { static Collector<List<ByteString>, ByteString> INSTANCE = new StringConcatCollector(); private StringConcatCollector() { } @Override public List<ByteString> get() { return new LinkedList<>(); } @Override public List<ByteString> accept(List<ByteString> resultContainer, ByteString value) { resultContainer.add(value); return resultContainer; } @Override public ByteString merge(List<ByteString> resultContainer) { final ByteStringBuilder builder = new ByteStringBuilder(); Collections.sort(resultContainer); for (ByteString s : resultContainer) { builder.append(s); builder.append(new char[] { '-' }); } builder.setLength(builder.length() - 1); return builder.toByteString(); } } private static final class IteratorCursorAdapter<K, V> implements MeteredCursor<K, V> { private final Iterator<Pair<K, V>> it; private Pair<K, V> entry; IteratorCursorAdapter(Iterator<Pair<K, V>> it) { this.it = it; } @Override public boolean next() { if (it.hasNext()) { entry = it.next(); return true; } entry = null; return false; } @Override public boolean isDefined() { return entry != null; } @Override public K getKey() throws NoSuchElementException { return entry.getFirst(); } @Override public V getValue() throws NoSuchElementException { return entry.getSecond(); } @Override public void close() { } @Override public long getNbBytesRead() { return 0; } @Override public long getNbBytesTotal() { return 0; } @Override public String getMetricName() { return "iterator-adapter"; } } private static final class DummyIndex extends DefaultIndex { private static final State state; private static final EntryContainer entryContainer; static { entryContainer = Mockito.mock(EntryContainer.class); Mockito.when(entryContainer.getHighestEntryID(Mockito.any(WriteableTransaction.class))).thenReturn(new EntryID( 1)); state = Mockito.mock(State.class); Mockito.when(state.getIndexFlags(Mockito.any(ReadableTransaction.class), Mockito.any(TreeName.class))).thenReturn( EnumSet.of(State.IndexFlag.COMPACTED)); }; DummyIndex(int indexEntryLimit) throws StorageRuntimeException { super(TreeName.valueOf("/dumy/dummy"), state, indexEntryLimit, entryContainer); open(Mockito.mock(WriteableTransaction.class), false); } } private static final class ArrayListChunk implements Chunk { private final List<Pair<ByteString, ByteString>> content = new ArrayList<>(); private long size; @Override public boolean put(ByteSequence key, ByteSequence value) { size += PackedLong.getEncodedSize(key.length()) + key.length() + PackedLong.getEncodedSize(value.length()) + value .length(); content.add(Pair.of(key.toByteString(), value.toByteString())); return true; } @Override public MeteredCursor<ByteString, ByteString> flip() { return cursorOf(content); } @Override public long size() { return size; } @Override public void delete() { } } } opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java
@@ -32,7 +32,7 @@ import static org.opends.server.protocols.internal.InternalClientConnection.getRootConnection; import static org.opends.server.protocols.internal.Requests.newSearchRequest; import static org.opends.server.types.Attributes.create; import static org.opends.server.types.IndexType.EQUALITY; import static org.opends.server.types.IndexType.*; import static org.opends.server.util.CollectionUtils.*; import static org.testng.Assert.*; @@ -41,14 +41,19 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import org.assertj.core.api.Assertions; import org.forgerock.opendj.ldap.ByteString; import org.forgerock.opendj.ldap.ConditionResult; import org.forgerock.opendj.ldap.ResultCode; import org.forgerock.opendj.ldap.SearchScope; import org.forgerock.util.Reject; import org.opends.server.DirectoryServerTestCase; import org.opends.server.TestCaseUtils; import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType; @@ -58,7 +63,9 @@ import org.opends.server.admin.std.server.PluggableBackendCfg; import org.opends.server.api.Backend.BackendOperation; import org.opends.server.api.ClientConnection; import org.opends.server.backends.RebuildConfig; import org.opends.server.backends.VerifyConfig; import org.opends.server.backends.RebuildConfig.RebuildMode; import org.opends.server.backends.pluggable.spi.AccessMode; import org.opends.server.backends.pluggable.spi.ReadOnlyStorageException; import org.opends.server.backends.pluggable.spi.ReadOperation; @@ -112,7 +119,16 @@ protected String[] ldifTemplate; protected int ldifNumberOfEntries; protected String backupID; protected String[] backendIndexes = { "sn" }; protected Map<String, IndexType[]> backendIndexes = new HashMap<>(); { backendIndexes.put("entryUUID", new IndexType[] { IndexType.EQUALITY }); backendIndexes.put("cn", new IndexType[] { IndexType.SUBSTRING }); backendIndexes.put("sn", new IndexType[] { IndexType.PRESENCE, IndexType.EQUALITY, IndexType.SUBSTRING }); backendIndexes.put("uid", new IndexType[] { IndexType.EQUALITY }); backendIndexes.put("telephoneNumber", new IndexType[] { IndexType.EQUALITY, IndexType.SUBSTRING }); backendIndexes.put("mail", new IndexType[] { IndexType.SUBSTRING }); }; protected String[] backendVlvIndexes = { "people" }; private AttributeType modifyAttribute; @@ -149,13 +165,22 @@ when(backendCfg.dn()).thenReturn(testBaseDN); when(backendCfg.getBackendId()).thenReturn(backendTestName); when(backendCfg.getBaseDN()).thenReturn(newTreeSet(testBaseDN)); when(backendCfg.listBackendIndexes()).thenReturn(backendIndexes); when(backendCfg.listBackendIndexes()).thenReturn(backendIndexes.keySet().toArray(new String[0])); when(backendCfg.listBackendVLVIndexes()).thenReturn(backendVlvIndexes); BackendIndexCfg indexCfg = mock(BackendIndexCfg.class); when(indexCfg.getIndexType()).thenReturn(newTreeSet(IndexType.PRESENCE, IndexType.EQUALITY)); when(indexCfg.getAttribute()).thenReturn(DirectoryServer.getAttributeType(backendIndexes[0])); when(backendCfg.getBackendIndex(backendIndexes[0])).thenReturn(indexCfg); for (Map.Entry<String, IndexType[]> index : backendIndexes.entrySet()) { final String attributeName = index.getKey().toLowerCase(); final AttributeType attribute = DirectoryServer.getAttributeType(attributeName); Reject.ifNull(attribute, "Attribute type '" + attributeName + "' doesn't exists."); BackendIndexCfg indexCfg = mock(BackendIndexCfg.class); when(indexCfg.getIndexType()).thenReturn(newTreeSet(index.getValue())); when(indexCfg.getAttribute()).thenReturn(attribute); when(indexCfg.getIndexEntryLimit()).thenReturn(4000); when(indexCfg.getSubstringLength()).thenReturn(6); when(backendCfg.getBackendIndex(index.getKey())).thenReturn(indexCfg); } BackendVLVIndexCfg vlvIndexCfg = mock(BackendVLVIndexCfg.class); when(vlvIndexCfg.getName()).thenReturn("people"); @@ -648,7 +673,15 @@ { assertEquals(backend.getEntryCount(), getTotalNumberOfLDIFEntries()); assertFalse(backend.isIndexed(modifyAttribute, EQUALITY)); assertTrue(backend.isIndexed(DirectoryServer.getAttributeType(backendIndexes[0]), EQUALITY)); for (Map.Entry<String, IndexType[]> index : backendIndexes.entrySet()) { for (IndexType type : index.getValue()) { final AttributeType attributeType = DirectoryServer.getAttributeType(index.getKey().toLowerCase()); assertTrue(backend.isIndexed(attributeType, org.opends.server.types.IndexType.valueOf(type.toString().toUpperCase()))); } } } private int getTotalNumberOfLDIFEntries() @@ -900,6 +933,8 @@ importConf.setClearBackend(true); importConf.writeRejectedEntries(rejectedEntries); importConf.setIncludeBranches(Collections.singleton(testBaseDN)); importConf.setSkipDNValidation(true); importConf.setThreadCount(0); backend.importLDIF(importConf, DirectoryServer.getInstance().getServerContext()); } assertEquals(rejectedEntries.size(), 0, "No entries should be rejected"); @@ -911,6 +946,142 @@ assertEquals(backend.getNumberOfChildren(testBaseDN), 1, "Not enough entries in DIT."); /** -2 for baseDn and People entry */ assertEquals(backend.getNumberOfChildren(testBaseDN.child(DN.valueOf("ou=People"))), ldifNumberOfEntries - 2, "Not enough entries in DIT."); VerifyConfig config = new VerifyConfig(); config.setBaseDN(DN.valueOf("dc=test,dc=com")); config.addCompleteIndex("dn2id"); for (String indexName : backendIndexes.keySet()) { config.addCompleteIndex(indexName); } assertThat(backend.verifyBackend(config)).isEqualTo(0); config = new VerifyConfig(); config.setBaseDN(DN.valueOf("dc=test,dc=com")); config.addCleanIndex("dn2id"); for (String indexName : backendIndexes.keySet()) { config.addCleanIndex(indexName); } assertThat(backend.verifyBackend(config)).isEqualTo(0); } @Test(dependsOnMethods = "testImportLDIF") public void testRebuildAllIndex() throws Exception { final EntryContainer entryContainer = backend.getRootContainer().getEntryContainers().iterator().next(); // Delete all the indexes backend.getRootContainer().getStorage().write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { entryContainer.getDN2ID().delete(txn); entryContainer.getDN2ID().open(txn, true); entryContainer.getDN2URI().delete(txn); entryContainer.getDN2URI().open(txn, true); entryContainer.getID2ChildrenCount().delete(txn); entryContainer.getID2ChildrenCount().open(txn, true); for(VLVIndex idx : entryContainer.getVLVIndexes()) { idx.setTrusted(txn, false); idx.delete(txn); idx.open(txn, true); } for(AttributeIndex attribute : entryContainer.getAttributeIndexes()) { for(Index idx : attribute.getNameToIndexes().values()) { idx.setTrusted(txn, false); idx.delete(txn); idx.open(txn, true); } } } }); RebuildConfig rebuildConf = new RebuildConfig(); rebuildConf.setBaseDN(DN.valueOf("dc=test,dc=com")); rebuildConf.setRebuildMode(RebuildMode.ALL); backend.closeBackend(); backend.rebuildBackend(rebuildConf, DirectoryServer.getInstance().getServerContext()); backend.openBackend(); VerifyConfig config = new VerifyConfig(); config.setBaseDN(DN.valueOf("dc=test,dc=com")); config.addCompleteIndex("dn2id"); for (String indexName : backendIndexes.keySet()) { config.addCompleteIndex(indexName); } assertThat(backend.verifyBackend(config)).isEqualTo(0); config = new VerifyConfig(); config.setBaseDN(DN.valueOf("dc=test,dc=com")); config.addCleanIndex("dn2id"); for (String indexName : backendIndexes.keySet()) { config.addCleanIndex(indexName); } assertThat(backend.verifyBackend(config)).isEqualTo(0); } @Test(dependsOnMethods = "testImportLDIF") public void testRebuildDegradedIndex() throws Exception { final EntryContainer entryContainer = backend.getRootContainer().getEntryContainers().iterator().next(); final Set<String> dirtyIndexes = new HashSet<>(Arrays.asList(new String[] { "sn", "uid", "telephoneNumber" })); assertThat(backendIndexes.keySet()).containsAll(dirtyIndexes); // Delete all the indexes backend.getRootContainer().getStorage().write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { for(AttributeIndex attribute : entryContainer.getAttributeIndexes()) { boolean trusted = !dirtyIndexes.contains(attribute.getAttributeType().getNameOrOID()); for(Index idx : attribute.getNameToIndexes().values()) { idx.setTrusted(txn, trusted); } } } }); RebuildConfig rebuildConf = new RebuildConfig(); rebuildConf.setBaseDN(DN.valueOf("dc=test,dc=com")); rebuildConf.setRebuildMode(RebuildMode.DEGRADED); backend.closeBackend(); backend.rebuildBackend(rebuildConf, DirectoryServer.getInstance().getServerContext()); backend.openBackend(); VerifyConfig config = new VerifyConfig(); config.setBaseDN(DN.valueOf("dc=test,dc=com")); config.addCompleteIndex("dn2id"); for (String indexName : backendIndexes.keySet()) { config.addCompleteIndex(indexName); } assertThat(backend.verifyBackend(config)).isEqualTo(0); config = new VerifyConfig(); config.setBaseDN(DN.valueOf("dc=test,dc=com")); config.addCleanIndex("dn2id"); for (String indexName : backendIndexes.keySet()) { config.addCleanIndex(indexName); } assertThat(backend.verifyBackend(config)).isEqualTo(0); // Put back the backend in its original state for the following tests // backend.openBackend(); } @Test(dependsOnMethods = "testImportLDIF") opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/StateTest.java
@@ -27,7 +27,7 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; import static org.opends.server.ConfigurationMock.legacyMockCfg; import static org.opends.server.ConfigurationMock.*; import static org.opends.server.backends.pluggable.State.IndexFlag.*; import static org.opends.server.util.CollectionUtils.*; @@ -89,11 +89,15 @@ when(serverContext.getDiskSpaceMonitor()).thenReturn(mock(DiskSpaceMonitor.class)); storage = new PDBStorage(createBackendCfg(), serverContext); try(final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport()) { importer.createTree(stateTreeName); } storage.open(AccessMode.READ_WRITE); storage.write(new WriteOperation() { @Override public void run(WriteableTransaction txn) throws Exception { txn.openTree(stateTreeName, true); } }); state = new State(stateTreeName); } opendj-server-legacy/src/test/java/org/opends/server/tasks/TasksTestCase.java
@@ -173,6 +173,7 @@ }); } @Test(enabled = false) // This isn't a test method, but TestNG thinks it is. public static void waitTaskCompletedSuccessfully(DN taskDN) throws Exception { Task task = getDoneTask(taskDN); opendj-server-legacy/src/test/java/org/opends/server/util/PackageInfoTestCase.java
@@ -34,7 +34,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import org.forgerock.util.Reject; import org.opends.server.TestCaseUtils; import static org.testng.Assert.*; @@ -82,7 +82,7 @@ @DataProvider(name = "adsSourceDirectories") public Object[][] getADSSourceDirectories() { File adsSourceRoot = new File(sourceRoot, "ads"); File adsSourceRoot = new File(sourceRoot, "main/java/org/opends/admin/ads"); ArrayList<File> sourceDirs = new ArrayList<>(); getSourceDirectories(adsSourceRoot, sourceDirs); @@ -182,7 +182,7 @@ @DataProvider(name = "guiToolsSourceDirectories") public Object[][] getGUIToolsSourceDirectories() { File guiToolsSourceRoot = new File(sourceRoot, "guitools"); File guiToolsSourceRoot = new File(sourceRoot, "main/java/org/opends/guitools"); ArrayList<File> sourceDirs = new ArrayList<>(); getSourceDirectories(guiToolsSourceRoot, sourceDirs); @@ -207,7 +207,7 @@ @DataProvider(name = "quickSetupSourceDirectories") public Object[][] getQuickSetupSourceDirectories() { File quickSetupSourceRoot = new File(sourceRoot, "quicksetup"); File quickSetupSourceRoot = new File(sourceRoot, "main/java/org/opends/quicksetup"); ArrayList<File> sourceDirs = new ArrayList<>(); getSourceDirectories(quickSetupSourceRoot, sourceDirs); @@ -232,7 +232,7 @@ @DataProvider(name = "serverSourceDirectories") public Object[][] getServerSourceDirectories() { File serverSourceRoot = new File(sourceRoot, "server"); File serverSourceRoot = new File(sourceRoot, "main/java"); ArrayList<File> sourceDirs = new ArrayList<>(); getSourceDirectories(serverSourceRoot, sourceDirs); @@ -259,6 +259,7 @@ private void getSourceDirectories(File startingPoint, ArrayList<File> sourceDirectories) { Reject.ifFalse(startingPoint.isDirectory(), startingPoint.getAbsolutePath() + " is not a directory."); boolean added = false; for (File f : startingPoint.listFiles()) {